Browse Source

各个存储服务段合并接入实现

gitlink
JeshuaRen 1 year ago
parent
commit
d2334e54f9
9 changed files with 366 additions and 144 deletions
  1. +94
    -45
      common/pkgs/ioswitch2/ops2/multipart.go
  2. +81
    -0
      common/pkgs/storage/cos/multiPartUploader.go
  3. +5
    -0
      common/pkgs/storage/mgr/mgr.go
  4. +89
    -0
      common/pkgs/storage/obs/multiPartUploader.go
  5. +0
    -24
      common/pkgs/storage/obs/obs.go
  6. +87
    -0
      common/pkgs/storage/oss/multiPartUploader.go
  7. +0
    -56
      common/pkgs/storage/oss/oss.go
  8. +7
    -19
      common/pkgs/storage/types/s3_client.go
  9. +3
    -0
      common/pkgs/storage/types/temp_store.go

+ 94
- 45
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -1,14 +1,18 @@
package ops2

import (
"encoding/json"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

"fmt"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/cos"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"time"
)

func init() {
@@ -18,7 +22,8 @@ func init() {
}

type InitUploadValue struct {
UploadID string `json:"uploadID"`
Key string `xml:"Key"` // Object name to upload
UploadID string `xml:"UploadId"` // Generated UploadId
}

func (v *InitUploadValue) Clone() exec.VarValue {
@@ -26,42 +31,52 @@ func (v *InitUploadValue) Clone() exec.VarValue {
}

type MultipartManage struct {
Address cdssdk.StorageAddress `json:"address"`
UploadArgs exec.VarID `json:"uploadID"`
ObjectID exec.VarID `json:"objectID"`
Address cdssdk.StorageAddress `json:"address"`
UploadArgs exec.VarID `json:"uploadArgs"`
UploadOutput exec.VarID `json:"uploadOutput"`
StorageID cdssdk.StorageID `json:"storageID"`
}

func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
manager, err2 := exec.ValueByType[*mgr.Manager](ctx)
manager, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return err
}

var oss stgmod.ObjectStorage
var client types.MultipartUploader
switch addr := o.Address.(type) {
case *cdssdk.LocalStorageAddress:
err := json.Unmarshal([]byte(addr.String()), &oss)
if err != nil {
return err
}
case *cdssdk.OSSAddress:
client = oss.NewMultiPartUpload(addr)
case *cdssdk.OBSAddress:
client = obs.NewMultiPartUpload(addr)
case *cdssdk.COSAddress:
client = cos.NewMultiPartUpload(addr)
}
defer client.Close()

client, err := types.NewObjectStorageClient(oss)
tempStore, err := manager.GetTempStore(o.StorageID)
if err != nil {
return err
}
defer client.Close()
objName := tempStore.CreateTemp()

uploadID, err := client.InitiateMultipartUpload("")
uploadID, err := client.InitiateMultipartUpload(objName)
if err != nil {
return err
}
e.PutVar(o.UploadArgs, &InitUploadValue{UploadID: uploadID})
e.PutVar(o.UploadArgs, &InitUploadValue{
UploadID: uploadID,
Key: objName,
})

fileMD5, err := e.BindVar(ctx.Context, o.UploadID)
objectID, err := client.CompleteMultipartUpload()
parts, err := exec.BindVar[*UploadPartOutputValue](e, ctx.Context, o.UploadOutput)
if err != nil {
return err
}
err = client.CompleteMultipartUpload(uploadID, objName, parts.Parts)
if err != nil {
return err
}
o.ObjectID.Value = objectID
e.PutVars(o.ObjectID)

return nil
}
@@ -72,12 +87,14 @@ func (o *MultipartManage) String() string {

type MultipartManageNode struct {
dag.NodeBase
Address cdssdk.StorageAddress
Address cdssdk.StorageAddress
StorageID cdssdk.StorageID `json:"storageID"`
}

func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress) *MultipartManageNode {
func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress, storageID cdssdk.StorageID) *MultipartManageNode {
node := &MultipartManageNode{
Address: addr,
Address: addr,
StorageID: storageID,
}
b.AddNode(node)
return node
@@ -85,33 +102,59 @@ func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress) *Multi

func (t *MultipartManageNode) GenerateOp() (exec.Op, error) {
return &MultipartManage{
Address: t.Address,
Address: t.Address,
StorageID: t.StorageID,
}, nil
}

type MultipartUpload struct {
Address cdssdk.StorageAddress `json:"address"`
FileMD5 *exec.VarID `json:"fileMD5"`
UploadArgs exec.VarID `json:"uploadID"`
Address cdssdk.StorageAddress `json:"address"`
UploadArgs exec.VarID `json:"uploadArgs"`
UploadOutput exec.VarID `json:"uploadOutput"`
PartNumbers []int `json:"partNumbers"`
PartSize []int64 `json:"partSize"`
Input exec.VarID `json:"input"`
}

type UploadPartOutputValue struct {
Parts []*types.UploadPartOutput `json:"parts"`
}

func (v *UploadPartOutputValue) Clone() exec.VarValue {
return &*v
}

func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
value, err2 := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs)
initUploadResult, err := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs)
if err == nil {
return err
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
if err != nil {
return err
}
defer input.Stream.Close()

var oss stgmod.ObjectStorage
var client types.MultipartUploader
switch addr := o.Address.(type) {
case *cdssdk.LocalStorageAddress:
err := json.Unmarshal([]byte(addr.String()), &oss)
case *cdssdk.OSSAddress:
client = oss.NewMultiPartUpload(addr)
}

var parts UploadPartOutputValue
for i := 0; i < len(o.PartNumbers); i++ {
startTime := time.Now()
uploadPart, err := client.UploadPart(initUploadResult.UploadID, initUploadResult.Key, o.PartSize[i], o.PartNumbers[i], io.LimitReader(input.Stream, o.PartSize[i]))
log.Debugf("upload multipart spend time: %v", time.Since(startTime))
if err != nil {
return err
return fmt.Errorf("failed to upload part: %w", err)
}
parts.Parts = append(parts.Parts, uploadPart)
}

client, err := types.NewObjectStorageClient(oss)
if err != nil {
return err
}
client.UploadPart()
e.PutVar(o.UploadOutput, &parts)

return nil
}

@@ -121,12 +164,16 @@ func (o *MultipartUpload) String() string {

type MultipartUploadNode struct {
dag.NodeBase
Address cdssdk.StorageAddress
Address cdssdk.StorageAddress
PartNumbers []int `json:"partNumbers"`
PartSize []int64 `json:"partSize"`
}

func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress) *MultipartUploadNode {
func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress, partNumbers []int, partSize []int64) *MultipartUploadNode {
node := &MultipartUploadNode{
Address: addr,
Address: addr,
PartNumbers: partNumbers,
PartSize: partSize,
}
b.AddNode(node)
return node
@@ -134,6 +181,8 @@ func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress) *Multi

func (t MultipartUploadNode) GenerateOp() (exec.Op, error) {
return &MultipartUpload{
Address: t.Address,
Address: t.Address,
PartNumbers: t.PartNumbers,
PartSize: t.PartSize,
}, nil
}

+ 81
- 0
common/pkgs/storage/cos/multiPartUploader.go View File

@@ -0,0 +1,81 @@
package cos

import (
"context"
"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"net/http"
"net/url"
)

type MultiPartUploader struct {
client *cos.Client
}

func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader {
// cos的endpoint已包含bucket名,会自动将桶解析出来
u, _ := url.Parse(address.Endpoint)
b := &cos.BaseURL{BucketURL: u}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: address.AK,
SecretKey: address.SK,
},
})

return &MultiPartUploader{
client: client,
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil)
if err != nil {
log.Error("Failed to initiate multipart upload: %v", err)
return "", err
}
return v.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
resp, err := c.client.Object.UploadPart(
context.Background(), key, uploadID, partNumber, stream, nil,
)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}

result := &types.UploadPartOutput{
ETag: resp.Header.Get("ETag"),
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string, parts []*types.UploadPartOutput) error {
opt := &cos.CompleteMultipartUploadOptions{}
for i := 0; i < len(parts); i++ {
opt.Parts = append(opt.Parts, cos.Object{
PartNumber: parts[i].PartNumber, ETag: parts[i].ETag},
)
}
_, _, err := c.client.Object.CompleteMultipartUpload(
context.Background(), key, uploadID, opt,
)
if err != nil {
return err
}

return nil
}
func (c *MultiPartUploader) AbortMultipartUpload() {

}

func (c *MultiPartUploader) Close() {

}

+ 5
- 0
common/pkgs/storage/mgr/mgr.go View File

@@ -21,6 +21,7 @@ var ErrStorageExists = errors.New("storage already exists")
type storage struct {
Shard types.ShardStore
Shared types.SharedStore
Temp types.TempStore
Components []types.StorageComponent
}

@@ -122,6 +123,10 @@ func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, err
return stg.Shared, nil
}

func (m *Manager) GetTempStore(stgID cdssdk.StorageID) (types.TempStore, error) {
return nil, nil
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (types.StorageComponent, error) {
m.lock.Lock()


+ 89
- 0
common/pkgs/storage/obs/multiPartUploader.go View File

@@ -0,0 +1,89 @@
package obs

import (
"fmt"
"github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
)

type MultiPartUploader struct {
client *obs.ObsClient
bucket string
}

func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader {
client, err := obs.New(address.AK, address.SK, address.Endpoint)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &MultiPartUploader{
client: client,
bucket: address.Bucket,
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
input := &obs.InitiateMultipartUploadInput{}
input.Bucket = c.bucket
input.Key = objectName
imur, err := c.client.InitiateMultipartUpload(input)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadId, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
uploadParam := &obs.UploadPartInput{
Bucket: c.bucket,
Key: key,
UploadId: uploadID,
PartSize: partSize,
PartNumber: partNumber,
Body: stream,
}

part, err := c.client.UploadPart(uploadParam)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadPartOutput{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error {
var uploadPart []obs.Part
for i := 0; i < len(parts); i++ {
uploadPart = append(uploadPart, obs.Part{
PartNumber: parts[i].PartNumber,
ETag: parts[i].ETag,
})
}

notifyParam := &obs.CompleteMultipartUploadInput{
Bucket: c.bucket,
Key: Key,
UploadId: uploadID,
Parts: uploadPart,
}

_, err := c.client.CompleteMultipartUpload(notifyParam)
if err != nil {
return err
}
return nil
}
func (c *MultiPartUploader) AbortMultipartUpload() {

}

func (c *MultiPartUploader) Close() {

}

+ 0
- 24
common/pkgs/storage/obs/obs.go View File

@@ -1,24 +0,0 @@
package obs

type OBSClient struct {
}

func (c *OBSClient) InitiateMultipartUpload(objectName string) (string, error) {
return "", nil
}

func (c *OBSClient) UploadPart() {

}

func (c *OBSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OBSClient) AbortMultipartUpload() {

}

func (c *OBSClient) Close() {

}

+ 87
- 0
common/pkgs/storage/oss/multiPartUploader.go View File

@@ -0,0 +1,87 @@
package oss

import (
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"log"
)

type MultiPartUploader struct {
client *oss.Client
bucket *oss.Bucket
}

func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader {
// 创建OSSClient实例。
client, err := oss.New(address.Endpoint, address.AK, address.SK)
if err != nil {
log.Fatalf("Error: %v", err)
}

bucket, err := client.Bucket(address.Bucket)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &MultiPartUploader{
client: client,
bucket: bucket,
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
uploadParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: key,
Bucket: c.bucket.BucketName,
}
part, err := c.bucket.UploadPart(uploadParam, stream, partSize, partNumber)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadPartOutput{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error {
notifyParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: Key,
Bucket: c.bucket.BucketName,
}
var uploadPart []oss.UploadPart
for i := 0; i < len(parts); i++ {
uploadPart = append(uploadPart, oss.UploadPart{
PartNumber: parts[i].PartNumber,
ETag: parts[i].ETag,
})
}
_, err := c.bucket.CompleteMultipartUpload(notifyParam, uploadPart)
if err != nil {
return err
}
return nil
}

func (c *MultiPartUploader) AbortMultipartUpload() {

}

func (c *MultiPartUploader) Close() {
// 关闭client

}

+ 0
- 56
common/pkgs/storage/oss/oss.go View File

@@ -1,56 +0,0 @@
package oss

import (
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"log"
)

type OSSClient struct {
client *oss.Client
bucket *oss.Bucket
}

func (c *OSSClient) InitiateMultipartUpload(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadID, nil
}

func NewOSSClient(obs stgmod.ObjectStorage) *OSSClient {
// 创建OSSClient实例。
client, err := oss.New(obs.Endpoint, obs.AK, obs.SK)
if err != nil {
log.Fatalf("Error: %v", err)
}

bucket, err := client.Bucket(obs.Bucket)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &OSSClient{
client: client,
bucket: bucket,
}
}

func (c *OSSClient) UploadPart() {

}

func (c *OSSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OSSClient) AbortMultipartUpload() {

}

func (c *OSSClient) Close() {
// 关闭client

}

+ 7
- 19
common/pkgs/storage/types/s3_client.go View File

@@ -1,30 +1,18 @@
package types

import (
"fmt"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss"
"io"
)

//type ObjectStorageInfo interface {
// NewClient() (ObjectStorageClient, error)
//}

type ObjectStorageClient interface {
type MultipartUploader interface {
InitiateMultipartUpload(objectName string) (string, error)
UploadPart()
CompleteMultipartUpload() (string, error)
UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*UploadPartOutput, error)
CompleteMultipartUpload(uploadID string, Key string, Parts []*UploadPartOutput) error
AbortMultipartUpload()
Close()
}

func NewObjectStorageClient(info stgmod.ObjectStorage) (ObjectStorageClient, error) {
switch info.Manufacturer {
case stgmod.AliCloud:
return oss.NewOSSClient(info), nil
case stgmod.HuaweiCloud:
return &obs.OBSClient{}, nil
}
return nil, fmt.Errorf("unknown cloud storage manufacturer %s", info.Manufacturer)
type UploadPartOutput struct {
PartNumber int
ETag string
}

+ 3
- 0
common/pkgs/storage/types/temp_store.go View File

@@ -2,4 +2,7 @@ package types

type TempStore interface {
StorageComponent
CreateTemp() string
Commited(objectName string)
Drop(objectName string)
}

Loading…
Cancel
Save