From d2334e54f978ee595f385d4ab63d4a1605dc7c4a Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 1 Nov 2024 17:42:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=84=E4=B8=AA=E5=AD=98=E5=82=A8=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E6=AE=B5=E5=90=88=E5=B9=B6=E6=8E=A5=E5=85=A5=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/multipart.go | 139 +++++++++++++------ common/pkgs/storage/cos/multiPartUploader.go | 81 +++++++++++ common/pkgs/storage/mgr/mgr.go | 5 + common/pkgs/storage/obs/multiPartUploader.go | 89 ++++++++++++ common/pkgs/storage/obs/obs.go | 24 ---- common/pkgs/storage/oss/multiPartUploader.go | 87 ++++++++++++ common/pkgs/storage/oss/oss.go | 56 -------- common/pkgs/storage/types/s3_client.go | 26 +--- common/pkgs/storage/types/temp_store.go | 3 + 9 files changed, 366 insertions(+), 144 deletions(-) create mode 100644 common/pkgs/storage/cos/multiPartUploader.go create mode 100644 common/pkgs/storage/obs/multiPartUploader.go delete mode 100644 common/pkgs/storage/obs/obs.go create mode 100644 common/pkgs/storage/oss/multiPartUploader.go delete mode 100644 common/pkgs/storage/oss/oss.go diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index bab6662..969cab5 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -1,14 +1,18 @@ package ops2 import ( - "encoding/json" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - + "fmt" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + log "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/cos" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "io" + "time" ) func init() { @@ -18,7 +22,8 @@ func init() { } type InitUploadValue struct { - UploadID string `json:"uploadID"` + Key string `xml:"Key"` // Object name to upload + UploadID string `xml:"UploadId"` // Generated UploadId } func (v *InitUploadValue) Clone() exec.VarValue { @@ -26,42 +31,52 @@ func (v *InitUploadValue) Clone() exec.VarValue { } type MultipartManage struct { - Address cdssdk.StorageAddress `json:"address"` - UploadArgs exec.VarID `json:"uploadID"` - ObjectID exec.VarID `json:"objectID"` + Address cdssdk.StorageAddress `json:"address"` + UploadArgs exec.VarID `json:"uploadArgs"` + UploadOutput exec.VarID `json:"uploadOutput"` + StorageID cdssdk.StorageID `json:"storageID"` } func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - manager, err2 := exec.ValueByType[*mgr.Manager](ctx) + manager, err := exec.GetValueByType[*mgr.Manager](ctx) + if err != nil { + return err + } - var oss stgmod.ObjectStorage + var client types.MultipartUploader switch addr := o.Address.(type) { - case *cdssdk.LocalStorageAddress: - err := json.Unmarshal([]byte(addr.String()), &oss) - if err != nil { - return err - } + case *cdssdk.OSSAddress: + client = oss.NewMultiPartUpload(addr) + case *cdssdk.OBSAddress: + client = obs.NewMultiPartUpload(addr) + case *cdssdk.COSAddress: + client = cos.NewMultiPartUpload(addr) } + defer client.Close() - client, err := types.NewObjectStorageClient(oss) + tempStore, err := manager.GetTempStore(o.StorageID) if err != nil { return err } - defer client.Close() + objName := tempStore.CreateTemp() - uploadID, err := client.InitiateMultipartUpload("") + uploadID, err := client.InitiateMultipartUpload(objName) if err != nil { return err } - e.PutVar(o.UploadArgs, &InitUploadValue{UploadID: uploadID}) + e.PutVar(o.UploadArgs, &InitUploadValue{ + UploadID: uploadID, + Key: objName, + }) - fileMD5, err := e.BindVar(ctx.Context, o.UploadID) - objectID, err := client.CompleteMultipartUpload() + parts, err := exec.BindVar[*UploadPartOutputValue](e, ctx.Context, o.UploadOutput) + if err != nil { + return err + } + err = client.CompleteMultipartUpload(uploadID, objName, parts.Parts) if err != nil { return err } - o.ObjectID.Value = objectID - e.PutVars(o.ObjectID) return nil } @@ -72,12 +87,14 @@ func (o *MultipartManage) String() string { type MultipartManageNode struct { dag.NodeBase - Address cdssdk.StorageAddress + Address cdssdk.StorageAddress + StorageID cdssdk.StorageID `json:"storageID"` } -func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress) *MultipartManageNode { +func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress, storageID cdssdk.StorageID) *MultipartManageNode { node := &MultipartManageNode{ - Address: addr, + Address: addr, + StorageID: storageID, } b.AddNode(node) return node @@ -85,33 +102,59 @@ func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress) *Multi func (t *MultipartManageNode) GenerateOp() (exec.Op, error) { return &MultipartManage{ - Address: t.Address, + Address: t.Address, + StorageID: t.StorageID, }, nil } type MultipartUpload struct { - Address cdssdk.StorageAddress `json:"address"` - FileMD5 *exec.VarID `json:"fileMD5"` - UploadArgs exec.VarID `json:"uploadID"` + Address cdssdk.StorageAddress `json:"address"` + UploadArgs exec.VarID `json:"uploadArgs"` + UploadOutput exec.VarID `json:"uploadOutput"` + PartNumbers []int `json:"partNumbers"` + PartSize []int64 `json:"partSize"` + Input exec.VarID `json:"input"` +} + +type UploadPartOutputValue struct { + Parts []*types.UploadPartOutput `json:"parts"` +} + +func (v *UploadPartOutputValue) Clone() exec.VarValue { + return &*v } func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - value, err2 := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs) + initUploadResult, err := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs) + if err == nil { + return err + } + + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) + if err != nil { + return err + } + defer input.Stream.Close() - var oss stgmod.ObjectStorage + var client types.MultipartUploader switch addr := o.Address.(type) { - case *cdssdk.LocalStorageAddress: - err := json.Unmarshal([]byte(addr.String()), &oss) + case *cdssdk.OSSAddress: + client = oss.NewMultiPartUpload(addr) + } + + var parts UploadPartOutputValue + for i := 0; i < len(o.PartNumbers); i++ { + startTime := time.Now() + uploadPart, err := client.UploadPart(initUploadResult.UploadID, initUploadResult.Key, o.PartSize[i], o.PartNumbers[i], io.LimitReader(input.Stream, o.PartSize[i])) + log.Debugf("upload multipart spend time: %v", time.Since(startTime)) if err != nil { - return err + return fmt.Errorf("failed to upload part: %w", err) } + parts.Parts = append(parts.Parts, uploadPart) } - client, err := types.NewObjectStorageClient(oss) - if err != nil { - return err - } - client.UploadPart() + e.PutVar(o.UploadOutput, &parts) + return nil } @@ -121,12 +164,16 @@ func (o *MultipartUpload) String() string { type MultipartUploadNode struct { dag.NodeBase - Address cdssdk.StorageAddress + Address cdssdk.StorageAddress + PartNumbers []int `json:"partNumbers"` + PartSize []int64 `json:"partSize"` } -func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress) *MultipartUploadNode { +func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress, partNumbers []int, partSize []int64) *MultipartUploadNode { node := &MultipartUploadNode{ - Address: addr, + Address: addr, + PartNumbers: partNumbers, + PartSize: partSize, } b.AddNode(node) return node @@ -134,6 +181,8 @@ func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress) *Multi func (t MultipartUploadNode) GenerateOp() (exec.Op, error) { return &MultipartUpload{ - Address: t.Address, + Address: t.Address, + PartNumbers: t.PartNumbers, + PartSize: t.PartSize, }, nil } diff --git a/common/pkgs/storage/cos/multiPartUploader.go b/common/pkgs/storage/cos/multiPartUploader.go new file mode 100644 index 0000000..3155bcb --- /dev/null +++ b/common/pkgs/storage/cos/multiPartUploader.go @@ -0,0 +1,81 @@ +package cos + +import ( + "context" + "fmt" + "github.com/tencentyun/cos-go-sdk-v5" + log "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "io" + "net/http" + "net/url" +) + +type MultiPartUploader struct { + client *cos.Client +} + +func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader { + // cos的endpoint已包含bucket名,会自动将桶解析出来 + u, _ := url.Parse(address.Endpoint) + b := &cos.BaseURL{BucketURL: u} + client := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: address.AK, + SecretKey: address.SK, + }, + }) + + return &MultiPartUploader{ + client: client, + } +} + +func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { + v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil) + if err != nil { + log.Error("Failed to initiate multipart upload: %v", err) + return "", err + } + return v.UploadID, nil +} + +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { + resp, err := c.client.Object.UploadPart( + context.Background(), key, uploadID, partNumber, stream, nil, + ) + if err != nil { + return nil, fmt.Errorf("failed to upload part: %w", err) + } + + result := &types.UploadPartOutput{ + ETag: resp.Header.Get("ETag"), + PartNumber: partNumber, + } + return result, nil +} + +func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string, parts []*types.UploadPartOutput) error { + opt := &cos.CompleteMultipartUploadOptions{} + for i := 0; i < len(parts); i++ { + opt.Parts = append(opt.Parts, cos.Object{ + PartNumber: parts[i].PartNumber, ETag: parts[i].ETag}, + ) + } + _, _, err := c.client.Object.CompleteMultipartUpload( + context.Background(), key, uploadID, opt, + ) + if err != nil { + return err + } + + return nil +} +func (c *MultiPartUploader) AbortMultipartUpload() { + +} + +func (c *MultiPartUploader) Close() { + +} diff --git a/common/pkgs/storage/mgr/mgr.go b/common/pkgs/storage/mgr/mgr.go index 95b2e8b..4c487c8 100644 --- a/common/pkgs/storage/mgr/mgr.go +++ b/common/pkgs/storage/mgr/mgr.go @@ -21,6 +21,7 @@ var ErrStorageExists = errors.New("storage already exists") type storage struct { Shard types.ShardStore Shared types.SharedStore + Temp types.TempStore Components []types.StorageComponent } @@ -122,6 +123,10 @@ func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, err return stg.Shared, nil } +func (m *Manager) GetTempStore(stgID cdssdk.StorageID) (types.TempStore, error) { + return nil, nil +} + // 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件 func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (types.StorageComponent, error) { m.lock.Lock() diff --git a/common/pkgs/storage/obs/multiPartUploader.go b/common/pkgs/storage/obs/multiPartUploader.go new file mode 100644 index 0000000..7d4b48e --- /dev/null +++ b/common/pkgs/storage/obs/multiPartUploader.go @@ -0,0 +1,89 @@ +package obs + +import ( + "fmt" + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" + log "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "io" +) + +type MultiPartUploader struct { + client *obs.ObsClient + bucket string +} + +func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader { + client, err := obs.New(address.AK, address.SK, address.Endpoint) + if err != nil { + log.Fatalf("Error: %v", err) + } + + return &MultiPartUploader{ + client: client, + bucket: address.Bucket, + } +} + +func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { + input := &obs.InitiateMultipartUploadInput{} + input.Bucket = c.bucket + input.Key = objectName + imur, err := c.client.InitiateMultipartUpload(input) + if err != nil { + return "", fmt.Errorf("failed to initiate multipart upload: %w", err) + } + return imur.UploadId, nil +} + +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { + uploadParam := &obs.UploadPartInput{ + Bucket: c.bucket, + Key: key, + UploadId: uploadID, + PartSize: partSize, + PartNumber: partNumber, + Body: stream, + } + + part, err := c.client.UploadPart(uploadParam) + if err != nil { + return nil, fmt.Errorf("failed to upload part: %w", err) + } + result := &types.UploadPartOutput{ + ETag: part.ETag, + PartNumber: partNumber, + } + return result, nil +} + +func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error { + var uploadPart []obs.Part + for i := 0; i < len(parts); i++ { + uploadPart = append(uploadPart, obs.Part{ + PartNumber: parts[i].PartNumber, + ETag: parts[i].ETag, + }) + } + + notifyParam := &obs.CompleteMultipartUploadInput{ + Bucket: c.bucket, + Key: Key, + UploadId: uploadID, + Parts: uploadPart, + } + + _, err := c.client.CompleteMultipartUpload(notifyParam) + if err != nil { + return err + } + return nil +} +func (c *MultiPartUploader) AbortMultipartUpload() { + +} + +func (c *MultiPartUploader) Close() { + +} diff --git a/common/pkgs/storage/obs/obs.go b/common/pkgs/storage/obs/obs.go deleted file mode 100644 index a42b8d0..0000000 --- a/common/pkgs/storage/obs/obs.go +++ /dev/null @@ -1,24 +0,0 @@ -package obs - -type OBSClient struct { -} - -func (c *OBSClient) InitiateMultipartUpload(objectName string) (string, error) { - return "", nil -} - -func (c *OBSClient) UploadPart() { - -} - -func (c *OBSClient) CompleteMultipartUpload() (string, error) { - return "", nil -} - -func (c *OBSClient) AbortMultipartUpload() { - -} - -func (c *OBSClient) Close() { - -} diff --git a/common/pkgs/storage/oss/multiPartUploader.go b/common/pkgs/storage/oss/multiPartUploader.go new file mode 100644 index 0000000..201e66a --- /dev/null +++ b/common/pkgs/storage/oss/multiPartUploader.go @@ -0,0 +1,87 @@ +package oss + +import ( + "fmt" + "github.com/aliyun/aliyun-oss-go-sdk/oss" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "io" + "log" +) + +type MultiPartUploader struct { + client *oss.Client + bucket *oss.Bucket +} + +func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader { + // 创建OSSClient实例。 + client, err := oss.New(address.Endpoint, address.AK, address.SK) + if err != nil { + log.Fatalf("Error: %v", err) + } + + bucket, err := client.Bucket(address.Bucket) + if err != nil { + log.Fatalf("Error: %v", err) + } + + return &MultiPartUploader{ + client: client, + bucket: bucket, + } +} + +func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { + imur, err := c.bucket.InitiateMultipartUpload(objectName) + if err != nil { + return "", fmt.Errorf("failed to initiate multipart upload: %w", err) + } + return imur.UploadID, nil +} + +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { + uploadParam := oss.InitiateMultipartUploadResult{ + UploadID: uploadID, + Key: key, + Bucket: c.bucket.BucketName, + } + part, err := c.bucket.UploadPart(uploadParam, stream, partSize, partNumber) + if err != nil { + return nil, fmt.Errorf("failed to upload part: %w", err) + } + result := &types.UploadPartOutput{ + ETag: part.ETag, + PartNumber: partNumber, + } + return result, nil +} + +func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error { + notifyParam := oss.InitiateMultipartUploadResult{ + UploadID: uploadID, + Key: Key, + Bucket: c.bucket.BucketName, + } + var uploadPart []oss.UploadPart + for i := 0; i < len(parts); i++ { + uploadPart = append(uploadPart, oss.UploadPart{ + PartNumber: parts[i].PartNumber, + ETag: parts[i].ETag, + }) + } + _, err := c.bucket.CompleteMultipartUpload(notifyParam, uploadPart) + if err != nil { + return err + } + return nil +} + +func (c *MultiPartUploader) AbortMultipartUpload() { + +} + +func (c *MultiPartUploader) Close() { + // 关闭client + +} diff --git a/common/pkgs/storage/oss/oss.go b/common/pkgs/storage/oss/oss.go deleted file mode 100644 index c0f9a0d..0000000 --- a/common/pkgs/storage/oss/oss.go +++ /dev/null @@ -1,56 +0,0 @@ -package oss - -import ( - "fmt" - "github.com/aliyun/aliyun-oss-go-sdk/oss" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "log" -) - -type OSSClient struct { - client *oss.Client - bucket *oss.Bucket -} - -func (c *OSSClient) InitiateMultipartUpload(objectName string) (string, error) { - imur, err := c.bucket.InitiateMultipartUpload(objectName) - if err != nil { - return "", fmt.Errorf("failed to initiate multipart upload: %w", err) - } - return imur.UploadID, nil -} - -func NewOSSClient(obs stgmod.ObjectStorage) *OSSClient { - // 创建OSSClient实例。 - client, err := oss.New(obs.Endpoint, obs.AK, obs.SK) - if err != nil { - log.Fatalf("Error: %v", err) - } - - bucket, err := client.Bucket(obs.Bucket) - if err != nil { - log.Fatalf("Error: %v", err) - } - - return &OSSClient{ - client: client, - bucket: bucket, - } -} - -func (c *OSSClient) UploadPart() { - -} - -func (c *OSSClient) CompleteMultipartUpload() (string, error) { - return "", nil -} - -func (c *OSSClient) AbortMultipartUpload() { - -} - -func (c *OSSClient) Close() { - // 关闭client - -} diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/s3_client.go index 5632239..4090218 100644 --- a/common/pkgs/storage/types/s3_client.go +++ b/common/pkgs/storage/types/s3_client.go @@ -1,30 +1,18 @@ package types import ( - "fmt" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss" + "io" ) -//type ObjectStorageInfo interface { -// NewClient() (ObjectStorageClient, error) -//} - -type ObjectStorageClient interface { +type MultipartUploader interface { InitiateMultipartUpload(objectName string) (string, error) - UploadPart() - CompleteMultipartUpload() (string, error) + UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*UploadPartOutput, error) + CompleteMultipartUpload(uploadID string, Key string, Parts []*UploadPartOutput) error AbortMultipartUpload() Close() } -func NewObjectStorageClient(info stgmod.ObjectStorage) (ObjectStorageClient, error) { - switch info.Manufacturer { - case stgmod.AliCloud: - return oss.NewOSSClient(info), nil - case stgmod.HuaweiCloud: - return &obs.OBSClient{}, nil - } - return nil, fmt.Errorf("unknown cloud storage manufacturer %s", info.Manufacturer) +type UploadPartOutput struct { + PartNumber int + ETag string } diff --git a/common/pkgs/storage/types/temp_store.go b/common/pkgs/storage/types/temp_store.go index d6dd620..ab67886 100644 --- a/common/pkgs/storage/types/temp_store.go +++ b/common/pkgs/storage/types/temp_store.go @@ -2,4 +2,7 @@ package types type TempStore interface { StorageComponent + CreateTemp() string + Commited(objectName string) + Drop(objectName string) }