diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 1cd1141..f92d76b 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -16,6 +16,59 @@ import ( ) func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "test", + Short: "test", + Run: func(cmd *cobra.Command, args []string) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + panic(err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + stgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{1, 2, 3})) + if err != nil { + panic(err) + } + + ft := ioswitch2.NewFromTo() + ft.SegmentParam = cdssdk.NewSegmentRedundancy(1024*100*3, 3) + ft.AddFrom(ioswitch2.NewFromShardstore("E58B075E9F7C5744CB1C2CBBECC30F163DE699DCDA94641DDA34A0C2EB01E240", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0))) + ft.AddFrom(ioswitch2.NewFromShardstore("EA14D17544786427C3A766F0C5E6DEB221D00D3DE1875BBE3BD0AD5C8118C1A0", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) + ft.AddFrom(ioswitch2.NewFromShardstore("4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) + ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[2].MasterHub, *stgs.Storages[2], ioswitch2.RawStream(), "0")) + // ft.AddFrom(ioswitch2.NewFromShardstore("CA56E5934859E0220D1F3B848F41619D937D7B874D4EBF63A6CC98D2D8E3280F", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) + // ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1})) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0), "0")) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1")) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2), "2")) + + plans := exec.NewPlanBuilder() + err = parser.Parse(ft, plans) + if err != nil { + panic(err) + } + + fmt.Printf("plans: %v\n", plans) + + exec := plans.Execute(exec.NewExecContext()) + + fut := future.NewSetVoid() + go func() { + mp, err := exec.Wait(context.Background()) + if err != nil { + panic(err) + } + + fmt.Printf("0: %v, 1: %v, 2: %v\n", mp["0"], mp["1"], mp["2"]) + fut.SetVoid() + }() + + fut.Wait(context.TODO()) + }, + }) + rootCmd.AddCommand(&cobra.Command{ Use: "test32", Short: "test32", @@ -183,8 +236,8 @@ func init() { }) rootCmd.AddCommand(&cobra.Command{ - Use: "test", - Short: "test", + Use: "test11", + Short: "test11", Run: func(cmd *cobra.Command, args []string) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/common/pkgs/storage/cos/multiPartUploader.go b/common/pkgs/storage/cos/multiPartUploader.go index 696de55..4d161dc 100644 --- a/common/pkgs/storage/cos/multiPartUploader.go +++ b/common/pkgs/storage/cos/multiPartUploader.go @@ -16,7 +16,7 @@ type MultiPartUploader struct { client *cos.Client } -func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader { +func NewMultiPartUpload(address *cdssdk.COSType) *MultiPartUploader { // cos的endpoint已包含bucket名,会自动将桶解析出来 u, _ := url.Parse(address.Endpoint) b := &cos.BaseURL{BucketURL: u} diff --git a/common/pkgs/storage/factory/factory.go b/common/pkgs/storage/factory/factory.go index 89d202b..7b078de 100644 --- a/common/pkgs/storage/factory/factory.go +++ b/common/pkgs/storage/factory/factory.go @@ -9,8 +9,9 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - // 需要导入所有存储服务的包 + // !!! 需要导入所有存储服务的包 !!! _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local" + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" ) func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) { diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index be79eeb..061f288 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -18,7 +18,9 @@ func init() { } func createService(detail stgmod.StorageDetail) (types.StorageService, error) { - svc := &Service{} + svc := &Service{ + Detail: detail, + } if detail.Storage.ShardStore != nil { local, ok := detail.Storage.ShardStore.(*cdssdk.LocalShardStorage) diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 827af7a..4ec0059 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -218,7 +218,7 @@ func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdss return types.FileInfo{ Hash: hash, Size: size, - Description: tempFilePath, + Description: newPath, }, nil } diff --git a/common/pkgs/storage/obs/multiPartUploader.go b/common/pkgs/storage/obs/multiPartUploader.go index 26a18d4..9ce8a2e 100644 --- a/common/pkgs/storage/obs/multiPartUploader.go +++ b/common/pkgs/storage/obs/multiPartUploader.go @@ -15,7 +15,7 @@ type MultiPartUploader struct { bucket string } -func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader { +func NewMultiPartUpload(address *cdssdk.OBSType) *MultiPartUploader { client, err := obs.New(address.AK, address.SK, address.Endpoint) if err != nil { log.Fatalf("Error: %v", err) diff --git a/common/pkgs/storage/s3/client.go b/common/pkgs/storage/s3/client.go new file mode 100644 index 0000000..fdfa000 --- /dev/null +++ b/common/pkgs/storage/s3/client.go @@ -0,0 +1,17 @@ +package s3 + +// type S3Client interface { +// PutObject(ctx context.Context, bucket string, key string, body io.Reader) (PutObjectResp, error) +// GetObject(ctx context.Context, bucket string, key string, rng exec.Range) (io.ReadCloser, error) +// HeadObject(ctx context.Context, bucket string, key string) (HeadObjectResp, error) +// ListObjectsV2(ctx context.Context, bucket string, prefix string +// } + +// type PutObjectResp struct { +// Hash cdssdk.FileHash // 文件SHA256哈希值 +// Size int64 // 文件大小 +// } + +// type HeadObjectResp struct { +// Size int64 // 文件大小 +// } diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go new file mode 100644 index 0000000..e4c99aa --- /dev/null +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -0,0 +1,139 @@ +package s3 + +import ( + "context" + "io" + "path/filepath" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type MultipartInitiator struct { + cli *s3.Client + bucket string + tempDir string + tempFileName string + tempFilePath string + uploadID string +} + +func (i *MultipartInitiator) Initiate(ctx context.Context) (types.MultipartInitState, error) { + i.tempFileName = os2.GenerateRandomFileName(10) + i.tempFilePath = filepath.Join(i.tempDir, i.tempFileName) + + resp, err := i.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(i.bucket), + Key: aws.String(i.tempFilePath), + ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, + }) + if err != nil { + return types.MultipartInitState{}, err + } + + i.uploadID = *resp.UploadId + + return types.MultipartInitState{ + UploadID: *resp.UploadId, + Bucket: i.bucket, + Key: i.tempFilePath, + }, nil +} + +func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { + s3Parts := make([]s3types.CompletedPart, len(parts)) + for i, part := range parts { + s3Parts[i] = s3types.CompletedPart{ + ETag: aws.String(part.ETag), + PartNumber: aws.Int32(int32(part.PartNumber)), + } + } + + compResp, err := i.cli.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(i.bucket), + Key: aws.String(i.tempFilePath), + UploadId: aws.String(i.uploadID), + MultipartUpload: &s3types.CompletedMultipartUpload{ + Parts: s3Parts, + }, + }) + if err != nil { + return types.BypassFileInfo{}, err + } + + headResp, err := i.cli.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(i.bucket), + Key: aws.String(i.tempFilePath), + }) + if err != nil { + return types.BypassFileInfo{}, err + } + + var hash cdssdk.FileHash + // if compResp.ChecksumSHA256 == nil { + // hash = "4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD" + // } else { + // } + // TODO2 这里其实是单独上传的每一个分片的SHA256按顺序组成一个新字符串后,再计算得到的SHA256,不是完整文件的SHA256。 + // 这种Hash考虑使用特殊的格式来区分 + hash, err = DecodeBase64Hash(*compResp.ChecksumSHA256) + if err != nil { + return types.BypassFileInfo{}, err + } + + return types.BypassFileInfo{ + TempFilePath: i.tempFilePath, + Size: *headResp.ContentLength, + FileHash: hash, + }, nil + +} + +func (i *MultipartInitiator) Complete() { + +} + +func (i *MultipartInitiator) Abort() { + // TODO2 根据注释描述,Abort不能停止正在上传的分片,需要等待其上传完成才能彻底删除, + // 考虑增加定时任务去定时清理 + i.cli.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(i.bucket), + Key: aws.String(i.tempFilePath), + UploadId: aws.String(i.uploadID), + }) + i.cli.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(i.bucket), + Key: aws.String(i.tempFilePath), + }) +} + +type MultipartUploader struct { + cli *s3.Client + bucket string +} + +func (u *MultipartUploader) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { + resp, err := u.cli.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(init.Bucket), + Key: aws.String(init.Key), + UploadId: aws.String(init.UploadID), + PartNumber: aws.Int32(int32(partNumber)), + Body: stream, + }) + if err != nil { + return types.UploadedPartInfo{}, err + } + + return types.UploadedPartInfo{ + ETag: *resp.ETag, + PartNumber: partNumber, + }, nil +} + +func (u *MultipartUploader) Close() { + +} diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go new file mode 100644 index 0000000..e7e8f13 --- /dev/null +++ b/common/pkgs/storage/s3/s3.go @@ -0,0 +1,117 @@ +package s3 + +import ( + "fmt" + "reflect" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/reflect2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" +) + +func init() { + reg.RegisterBuilder[*cdssdk.COSType](createService, createComponent) + reg.RegisterBuilder[*cdssdk.OSSType](createService, createComponent) + reg.RegisterBuilder[*cdssdk.OBSType](createService, createComponent) +} + +func createService(detail stgmod.StorageDetail) (types.StorageService, error) { + svc := &Service{ + Detail: detail, + } + + if detail.Storage.ShardStore != nil { + cfg, ok := detail.Storage.ShardStore.(*cdssdk.S3ShardStorage) + if !ok { + return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore) + } + + cli, bkt, err := createS3Client(detail.Storage.Type) + if err != nil { + return nil, err + } + + store, err := NewShardStore(svc, cli, bkt, *cfg) + if err != nil { + return nil, err + } + + svc.ShardStore = store + } + + return svc, nil +} + +func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) { + switch typ { + case reflect2.TypeOf[types.MultipartInitiator](): + feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) + if feat == nil { + return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) + } + + cli, bkt, err := createS3Client(detail.Storage.Type) + if err != nil { + return nil, err + } + + return &MultipartInitiator{ + cli: cli, + bucket: bkt, + tempDir: feat.TempDir, + }, nil + + case reflect2.TypeOf[types.MultipartUploader](): + feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) + if feat == nil { + return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) + } + + cli, bkt, err := createS3Client(detail.Storage.Type) + if err != nil { + return nil, err + } + + return &MultipartUploader{ + cli: cli, + bucket: bkt, + }, nil + } + + return nil, fmt.Errorf("unsupported component type %v", typ) +} + +func createS3Client(addr cdssdk.StorageType) (*s3.Client, string, error) { + switch addr := addr.(type) { + // case *cdssdk.COSType: + + // case *cdssdk.OSSType: + + case *cdssdk.OBSType: + awsConfig := aws.Config{} + + cre := aws.Credentials{ + AccessKeyID: addr.AK, + SecretAccessKey: addr.SK, + } + awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} + awsConfig.Region = addr.Region + + options := []func(*s3.Options){} + options = append(options, func(s3Opt *s3.Options) { + s3Opt.BaseEndpoint = &addr.Endpoint + }) + + cli := s3.NewFromConfig(awsConfig, options...) + return cli, addr.Bucket, nil + + default: + return nil, "", fmt.Errorf("unsupported storage type %T", addr) + } +} diff --git a/common/pkgs/storage/s3/s3_test.go b/common/pkgs/storage/s3/s3_test.go new file mode 100644 index 0000000..a0765a1 --- /dev/null +++ b/common/pkgs/storage/s3/s3_test.go @@ -0,0 +1,47 @@ +package s3 + +import ( + "context" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + . "github.com/smartystreets/goconvey/convey" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func Test_S3(t *testing.T) { + Convey("OBS", t, func() { + cli, bkt, err := createS3Client(&cdssdk.OBSType{ + Region: "0", + AK: "0", + SK: "0", + Endpoint: "0", + Bucket: "0", + }) + So(err, ShouldEqual, nil) + + var marker *string + for { + resp, err := cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(bkt), + Prefix: aws.String("cds"), + MaxKeys: aws.Int32(5), + Marker: marker, + }) + So(err, ShouldEqual, nil) + + fmt.Printf("\n") + for _, obj := range resp.Contents { + fmt.Printf("%v, %v\n", *obj.Key, *obj.LastModified) + } + + if *resp.IsTruncated { + marker = resp.NextMarker + } else { + break + } + } + }) +} diff --git a/common/pkgs/storage/s3/service.go b/common/pkgs/storage/s3/service.go new file mode 100644 index 0000000..506ba37 --- /dev/null +++ b/common/pkgs/storage/s3/service.go @@ -0,0 +1,43 @@ +package s3 + +import ( + "reflect" + + "gitlink.org.cn/cloudream/common/utils/reflect2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type Service struct { + Detail stgmod.StorageDetail + ShardStore *ShardStore +} + +func (s *Service) Info() stgmod.StorageDetail { + return s.Detail +} + +func (s *Service) GetComponent(typ reflect.Type) (any, error) { + switch typ { + case reflect2.TypeOf[types.ShardStore](): + if s.ShardStore == nil { + return nil, types.ErrComponentNotFound + } + return s.ShardStore, nil + + default: + return nil, types.ErrComponentNotFound + } +} + +func (s *Service) Start(ch *types.StorageEventChan) { + if s.ShardStore != nil { + s.ShardStore.Start(ch) + } +} + +func (s *Service) Stop() { + if s.ShardStore != nil { + s.ShardStore.Stop() + } +} diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 3ed7f97..d694e13 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -1 +1,445 @@ package s3 + +import ( + "context" + "errors" + "fmt" + "io" + "path/filepath" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +const ( + TempDir = "tmp" + BlocksDir = "blocks" +) + +type ShardStore struct { + svc *Service + cli *s3.Client + bucket string + cfg cdssdk.S3ShardStorage + lock sync.Mutex + workingTempFiles map[string]bool + done chan any +} + +func NewShardStore(svc *Service, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage) (*ShardStore, error) { + return &ShardStore{ + svc: svc, + cli: cli, + bucket: bkt, + cfg: cfg, + workingTempFiles: make(map[string]bool), + done: make(chan any, 1), + }, nil +} + +func (s *ShardStore) Start(ch *types.StorageEventChan) { + s.getLogger().Infof("component start, root: %v", s.cfg.Root) + + go func() { + removeTempTicker := time.NewTicker(time.Minute * 10) + defer removeTempTicker.Stop() + + for { + select { + case <-removeTempTicker.C: + s.removeUnusedTempFiles() + case <-s.done: + return + } + } + }() +} + +func (s *ShardStore) removeUnusedTempFiles() { + s.lock.Lock() + defer s.lock.Unlock() + + log := s.getLogger() + + var deletes []s3types.ObjectIdentifier + deleteObjs := make(map[string]s3types.Object) + var marker *string + for { + resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + Prefix: aws.String(JoinKey(s.cfg.Root, TempDir, "/")), + Marker: marker, + }) + + if err != nil { + log.Warnf("read temp dir: %v", err) + return + } + + for _, obj := range resp.Contents { + objName := BaseKey(*obj.Key) + + if s.workingTempFiles[objName] { + continue + } + + deletes = append(deletes, s3types.ObjectIdentifier{ + Key: obj.Key, + }) + deleteObjs[*obj.Key] = obj + } + + if !*resp.IsTruncated { + break + } + + marker = resp.NextMarker + } + + resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ + Bucket: aws.String(s.bucket), + Delete: &s3types.Delete{ + Objects: deletes, + }, + }) + if err != nil { + log.Warnf("delete temp files: %v", err) + return + } + + for _, del := range resp.Deleted { + obj := deleteObjs[*del.Key] + log.Infof("remove unused temp file %v, size: %v, last mod time: %v", *obj.Key, *obj.Size, *obj.LastModified) + } +} + +func (s *ShardStore) Stop() { + s.getLogger().Infof("component stop") + + select { + case s.done <- nil: + default: + } +} + +func (s *ShardStore) Create(stream io.Reader) (types.FileInfo, error) { + log := s.getLogger() + + key, fileName := s.createTempFile() + + counter := io2.NewCounter(stream) + + resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: counter, + ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, + }) + if err != nil { + log.Warnf("uploading file %v: %v", key, err) + + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.workingTempFiles, fileName) + return types.FileInfo{}, err + } + + if resp.ChecksumSHA256 == nil { + log.Warnf("SHA256 checksum not found in response of uploaded file %v", key) + s.onCreateFailed(key, fileName) + return types.FileInfo{}, errors.New("SHA256 checksum not found in response") + } + + hash, err := DecodeBase64Hash(*resp.ChecksumSHA256) + if err != nil { + log.Warnf("decode SHA256 checksum %v: %v", *resp.ChecksumSHA256, err) + s.onCreateFailed(key, fileName) + return types.FileInfo{}, fmt.Errorf("decode SHA256 checksum: %v", err) + } + + return s.onCreateFinished(key, counter.Count(), hash) +} + +func (s *ShardStore) createTempFile() (string, string) { + s.lock.Lock() + defer s.lock.Unlock() + + tmpDir := JoinKey(s.cfg.Root, TempDir) + tmpName := os2.GenerateRandomFileName(20) + + s.workingTempFiles[tmpName] = true + return JoinKey(tmpDir, tmpName), tmpName +} + +func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + defer delete(s.workingTempFiles, filepath.Base(tempFilePath)) + defer func() { + // 不管是否成功。即使失败了也有定时清理机制去兜底 + s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(tempFilePath), + }) + }() + + log := s.getLogger() + + log.Debugf("write file %v finished, size: %v, hash: %v", tempFilePath, size, hash) + + blockDir := s.getFileDirFromHash(hash) + newPath := JoinKey(blockDir, string(hash)) + + _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ + Bucket: aws.String(s.bucket), + CopySource: aws.String(tempFilePath), + Key: aws.String(newPath), + }) + if err != nil { + log.Warnf("copy file %v to %v: %v", tempFilePath, newPath, err) + return types.FileInfo{}, err + } + + return types.FileInfo{ + Hash: hash, + Size: size, + Description: newPath, + }, nil +} + +func (s *ShardStore) onCreateFailed(key string, fileName string) { + // 不管是否成功。即使失败了也有定时清理机制去兜底 + s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }) + + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.workingTempFiles, fileName) +} + +// 使用NewOpen函数创建Option对象 +func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { + s.lock.Lock() + defer s.lock.Unlock() + + fileName := string(opt.FileHash) + if len(fileName) < 2 { + return nil, fmt.Errorf("invalid file name") + } + + filePath := s.getFilePathFromHash(cdssdk.FileHash(fileName)) + + rngStr := fmt.Sprintf("bytes=%d-", opt.Offset) + if opt.Length >= 0 { + rngStr += fmt.Sprintf("%d", opt.Offset+opt.Length-1) + } + + resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(filePath), + Range: aws.String(rngStr), + }) + if err != nil { + s.getLogger().Warnf("get file %v: %v", filePath, err) + return nil, err + } + + return resp.Body, nil +} + +func (s *ShardStore) Info(hash cdssdk.FileHash) (types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + + filePath := s.getFilePathFromHash(hash) + info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(filePath), + }) + if err != nil { + s.getLogger().Warnf("get file %v: %v", filePath, err) + return types.FileInfo{}, err + } + + return types.FileInfo{ + Hash: hash, + Size: *info.ContentLength, + Description: filePath, + }, nil +} + +func (s *ShardStore) ListAll() ([]types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + + var infos []types.FileInfo + + blockDir := JoinKey(s.cfg.Root, BlocksDir) + + var marker *string + for { + resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + Prefix: aws.String(blockDir), + Marker: marker, + }) + + if err != nil { + s.getLogger().Warnf("list objects: %v", err) + return nil, err + } + + for _, obj := range resp.Contents { + key := BaseKey(*obj.Key) + if len(key) != 64 { + continue + } + + infos = append(infos, types.FileInfo{ + Hash: cdssdk.FileHash(key), + Size: *obj.Size, + Description: *obj.Key, + }) + } + + if !*resp.IsTruncated { + break + } + + marker = resp.NextMarker + } + + return infos, nil +} + +func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { + s.lock.Lock() + defer s.lock.Unlock() + + avais := make(map[cdssdk.FileHash]bool) + for _, hash := range avaiables { + avais[hash] = true + } + + blockDir := JoinKey(s.cfg.Root, BlocksDir) + + var deletes []s3types.ObjectIdentifier + var marker *string + for { + resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + Prefix: aws.String(blockDir), + Marker: marker, + }) + + if err != nil { + s.getLogger().Warnf("list objects: %v", err) + return err + } + + for _, obj := range resp.Contents { + key := BaseKey(*obj.Key) + if len(key) != 64 { + continue + } + + if !avais[cdssdk.FileHash(key)] { + deletes = append(deletes, s3types.ObjectIdentifier{ + Key: obj.Key, + }) + } + } + + if !*resp.IsTruncated { + break + } + + marker = resp.NextMarker + } + + cnt := 0 + if len(deletes) > 0 { + resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ + Bucket: aws.String(s.bucket), + Delete: &s3types.Delete{ + Objects: deletes, + }, + }) + if err != nil { + s.getLogger().Warnf("delete objects: %v", err) + return err + } + + cnt = len(resp.Deleted) + } + + s.getLogger().Infof("purge %d files", cnt) + // TODO 无法保证原子性,所以删除失败只打日志 + return nil +} + +func (s *ShardStore) Stats() types.Stats { + // TODO 统计本地存储的相关信息 + return types.Stats{ + Status: types.StatusOK, + } +} + +func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error { + if info.FileHash == "" { + return fmt.Errorf("empty file hash is not allowed by this shard store") + } + + s.lock.Lock() + defer s.lock.Unlock() + defer func() { + // 不管是否成功。即使失败了也有定时清理机制去兜底 + s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(info.TempFilePath), + }) + }() + + log := s.getLogger() + + log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.TempFilePath, info.Size, info.FileHash) + + blockDir := s.getFileDirFromHash(info.FileHash) + newPath := JoinKey(blockDir, string(info.FileHash)) + + _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ + CopySource: aws.String(JoinKey(s.bucket, info.TempFilePath)), + Bucket: aws.String(s.bucket), + Key: aws.String(newPath), + }) + if err != nil { + log.Warnf("copy file %v to %v: %v", info.TempFilePath, newPath, err) + return fmt.Errorf("copy file: %w", err) + } + + return nil +} + +func (s *ShardStore) getLogger() logger.Logger { + return logger.WithField("ShardStore", "S3").WithField("Storage", s.svc.Detail.Storage.String()) +} + +func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string { + return JoinKey(s.cfg.Root, BlocksDir, string(hash)[:2]) +} + +func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string { + return JoinKey(s.cfg.Root, BlocksDir, string(hash)[:2], string(hash)) +} diff --git a/common/pkgs/storage/s3/utils.go b/common/pkgs/storage/s3/utils.go new file mode 100644 index 0000000..8d21dc9 --- /dev/null +++ b/common/pkgs/storage/s3/utils.go @@ -0,0 +1,41 @@ +package s3 + +import ( + "encoding/base64" + "fmt" + "strings" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func JoinKey(comps ...string) string { + sb := strings.Builder{} + + hasTrailingSlash := true + for _, comp := range comps { + if !hasTrailingSlash { + sb.WriteString("/") + } + sb.WriteString(comp) + hasTrailingSlash = strings.HasSuffix(comp, "/") + } + + return sb.String() +} + +func BaseKey(key string) string { + return key[strings.LastIndex(key, "/")+1:] +} + +func DecodeBase64Hash(hash string) (cdssdk.FileHash, error) { + hashBytes := make([]byte, 32) + n, err := base64.RawStdEncoding.Decode(hashBytes, []byte(hash)) + if err != nil { + return "", err + } + if n != 32 { + return "", fmt.Errorf("invalid hash length: %d", n) + } + + return cdssdk.FileHash(strings.ToUpper(string(hashBytes))), nil +} diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/s3_client.go index a945c17..03fb622 100644 --- a/common/pkgs/storage/types/s3_client.go +++ b/common/pkgs/storage/types/s3_client.go @@ -21,8 +21,11 @@ type MultipartUploader interface { Close() } +// TODO 重构成一个接口,支持不同的类型的分片有不同内容的实现 type MultipartInitState struct { UploadID string + Bucket string // TODO 临时使用 + Key string // TODO 临时使用 } type UploadedPartInfo struct { diff --git a/go.mod b/go.mod index 960b4d5..6a26085 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module gitlink.org.cn/cloudream/storage -go 1.20 +go 1.21 + +toolchain go1.23.2 replace gitlink.org.cn/cloudream/common v0.0.0 => ../common @@ -25,6 +27,8 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.32.6 // indirect + github.com/aws/smithy-go v1.22.1 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/google/go-querystring v1.0.0 // indirect github.com/google/uuid v1.3.1 // indirect diff --git a/go.sum b/go.sum index 6432180..3066c4f 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,10 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ= github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA= +github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4= +github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=