Browse Source

调试S3的分片上传

gitlink
Sydonian 11 months ago
parent
commit
2bd3a55a45
16 changed files with 923 additions and 8 deletions
  1. +55
    -2
      client/internal/cmdline/test.go
  2. +1
    -1
      common/pkgs/storage/cos/multiPartUploader.go
  3. +2
    -1
      common/pkgs/storage/factory/factory.go
  4. +3
    -1
      common/pkgs/storage/local/local.go
  5. +1
    -1
      common/pkgs/storage/local/shard_store.go
  6. +1
    -1
      common/pkgs/storage/obs/multiPartUploader.go
  7. +17
    -0
      common/pkgs/storage/s3/client.go
  8. +139
    -0
      common/pkgs/storage/s3/multipart_upload.go
  9. +117
    -0
      common/pkgs/storage/s3/s3.go
  10. +47
    -0
      common/pkgs/storage/s3/s3_test.go
  11. +43
    -0
      common/pkgs/storage/s3/service.go
  12. +444
    -0
      common/pkgs/storage/s3/shard_store.go
  13. +41
    -0
      common/pkgs/storage/s3/utils.go
  14. +3
    -0
      common/pkgs/storage/types/s3_client.go
  15. +5
    -1
      go.mod
  16. +4
    -0
      go.sum

+ 55
- 2
client/internal/cmdline/test.go View File

@@ -16,6 +16,59 @@ import (
)

func init() {
rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
panic(err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{1, 2, 3}))
if err != nil {
panic(err)
}

ft := ioswitch2.NewFromTo()
ft.SegmentParam = cdssdk.NewSegmentRedundancy(1024*100*3, 3)
ft.AddFrom(ioswitch2.NewFromShardstore("E58B075E9F7C5744CB1C2CBBECC30F163DE699DCDA94641DDA34A0C2EB01E240", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0)))
ft.AddFrom(ioswitch2.NewFromShardstore("EA14D17544786427C3A766F0C5E6DEB221D00D3DE1875BBE3BD0AD5C8118C1A0", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1)))
ft.AddFrom(ioswitch2.NewFromShardstore("4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2)))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[2].MasterHub, *stgs.Storages[2], ioswitch2.RawStream(), "0"))
// ft.AddFrom(ioswitch2.NewFromShardstore("CA56E5934859E0220D1F3B848F41619D937D7B874D4EBF63A6CC98D2D8E3280F", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream()))
// ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0"))
// ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1}))
// ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0), "0"))
// ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1"))
// ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2), "2"))

plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
if err != nil {
panic(err)
}

fmt.Printf("plans: %v\n", plans)

exec := plans.Execute(exec.NewExecContext())

fut := future.NewSetVoid()
go func() {
mp, err := exec.Wait(context.Background())
if err != nil {
panic(err)
}

fmt.Printf("0: %v, 1: %v, 2: %v\n", mp["0"], mp["1"], mp["2"])
fut.SetVoid()
}()

fut.Wait(context.TODO())
},
})

rootCmd.AddCommand(&cobra.Command{
Use: "test32",
Short: "test32",
@@ -183,8 +236,8 @@ func init() {
})

rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Use: "test11",
Short: "test11",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {


+ 1
- 1
common/pkgs/storage/cos/multiPartUploader.go View File

@@ -16,7 +16,7 @@ type MultiPartUploader struct {
client *cos.Client
}

func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader {
func NewMultiPartUpload(address *cdssdk.COSType) *MultiPartUploader {
// cos的endpoint已包含bucket名,会自动将桶解析出来
u, _ := url.Parse(address.Endpoint)
b := &cos.BaseURL{BucketURL: u}


+ 2
- 1
common/pkgs/storage/factory/factory.go View File

@@ -9,8 +9,9 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

// 需要导入所有存储服务的包
// !!! 需要导入所有存储服务的包 !!!
_ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local"
_ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3"
)

func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) {


+ 3
- 1
common/pkgs/storage/local/local.go View File

@@ -18,7 +18,9 @@ func init() {
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
svc := &Service{}
svc := &Service{
Detail: detail,
}

if detail.Storage.ShardStore != nil {
local, ok := detail.Storage.ShardStore.(*cdssdk.LocalShardStorage)


+ 1
- 1
common/pkgs/storage/local/shard_store.go View File

@@ -218,7 +218,7 @@ func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdss
return types.FileInfo{
Hash: hash,
Size: size,
Description: tempFilePath,
Description: newPath,
}, nil
}



+ 1
- 1
common/pkgs/storage/obs/multiPartUploader.go View File

@@ -15,7 +15,7 @@ type MultiPartUploader struct {
bucket string
}

func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader {
func NewMultiPartUpload(address *cdssdk.OBSType) *MultiPartUploader {
client, err := obs.New(address.AK, address.SK, address.Endpoint)
if err != nil {
log.Fatalf("Error: %v", err)


+ 17
- 0
common/pkgs/storage/s3/client.go View File

@@ -0,0 +1,17 @@
package s3

// type S3Client interface {
// PutObject(ctx context.Context, bucket string, key string, body io.Reader) (PutObjectResp, error)
// GetObject(ctx context.Context, bucket string, key string, rng exec.Range) (io.ReadCloser, error)
// HeadObject(ctx context.Context, bucket string, key string) (HeadObjectResp, error)
// ListObjectsV2(ctx context.Context, bucket string, prefix string
// }

// type PutObjectResp struct {
// Hash cdssdk.FileHash // 文件SHA256哈希值
// Size int64 // 文件大小
// }

// type HeadObjectResp struct {
// Size int64 // 文件大小
// }

+ 139
- 0
common/pkgs/storage/s3/multipart_upload.go View File

@@ -0,0 +1,139 @@
package s3

import (
"context"
"io"
"path/filepath"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/os2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultipartInitiator struct {
cli *s3.Client
bucket string
tempDir string
tempFileName string
tempFilePath string
uploadID string
}

func (i *MultipartInitiator) Initiate(ctx context.Context) (types.MultipartInitState, error) {
i.tempFileName = os2.GenerateRandomFileName(10)
i.tempFilePath = filepath.Join(i.tempDir, i.tempFileName)

resp, err := i.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(i.bucket),
Key: aws.String(i.tempFilePath),
ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256,
})
if err != nil {
return types.MultipartInitState{}, err
}

i.uploadID = *resp.UploadId

return types.MultipartInitState{
UploadID: *resp.UploadId,
Bucket: i.bucket,
Key: i.tempFilePath,
}, nil
}

func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) {
s3Parts := make([]s3types.CompletedPart, len(parts))
for i, part := range parts {
s3Parts[i] = s3types.CompletedPart{
ETag: aws.String(part.ETag),
PartNumber: aws.Int32(int32(part.PartNumber)),
}
}

compResp, err := i.cli.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(i.bucket),
Key: aws.String(i.tempFilePath),
UploadId: aws.String(i.uploadID),
MultipartUpload: &s3types.CompletedMultipartUpload{
Parts: s3Parts,
},
})
if err != nil {
return types.BypassFileInfo{}, err
}

headResp, err := i.cli.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(i.bucket),
Key: aws.String(i.tempFilePath),
})
if err != nil {
return types.BypassFileInfo{}, err
}

var hash cdssdk.FileHash
// if compResp.ChecksumSHA256 == nil {
// hash = "4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD"
// } else {
// }
// TODO2 这里其实是单独上传的每一个分片的SHA256按顺序组成一个新字符串后,再计算得到的SHA256,不是完整文件的SHA256。
// 这种Hash考虑使用特殊的格式来区分
hash, err = DecodeBase64Hash(*compResp.ChecksumSHA256)
if err != nil {
return types.BypassFileInfo{}, err
}

return types.BypassFileInfo{
TempFilePath: i.tempFilePath,
Size: *headResp.ContentLength,
FileHash: hash,
}, nil

}

func (i *MultipartInitiator) Complete() {

}

func (i *MultipartInitiator) Abort() {
// TODO2 根据注释描述,Abort不能停止正在上传的分片,需要等待其上传完成才能彻底删除,
// 考虑增加定时任务去定时清理
i.cli.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(i.bucket),
Key: aws.String(i.tempFilePath),
UploadId: aws.String(i.uploadID),
})
i.cli.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String(i.bucket),
Key: aws.String(i.tempFilePath),
})
}

type MultipartUploader struct {
cli *s3.Client
bucket string
}

func (u *MultipartUploader) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) {
resp, err := u.cli.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(init.Bucket),
Key: aws.String(init.Key),
UploadId: aws.String(init.UploadID),
PartNumber: aws.Int32(int32(partNumber)),
Body: stream,
})
if err != nil {
return types.UploadedPartInfo{}, err
}

return types.UploadedPartInfo{
ETag: *resp.ETag,
PartNumber: partNumber,
}, nil
}

func (u *MultipartUploader) Close() {

}

+ 117
- 0
common/pkgs/storage/s3/s3.go View File

@@ -0,0 +1,117 @@
package s3

import (
"fmt"
"reflect"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

func init() {
reg.RegisterBuilder[*cdssdk.COSType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.OSSType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.OBSType](createService, createComponent)
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
svc := &Service{
Detail: detail,
}

if detail.Storage.ShardStore != nil {
cfg, ok := detail.Storage.ShardStore.(*cdssdk.S3ShardStorage)
if !ok {
return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore)
}

cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}

store, err := NewShardStore(svc, cli, bkt, *cfg)
if err != nil {
return nil, err
}

svc.ShardStore = store
}

return svc, nil
}

func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.MultipartInitiator]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}

return &MultipartInitiator{
cli: cli,
bucket: bkt,
tempDir: feat.TempDir,
}, nil

case reflect2.TypeOf[types.MultipartUploader]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}

return &MultipartUploader{
cli: cli,
bucket: bkt,
}, nil
}

return nil, fmt.Errorf("unsupported component type %v", typ)
}

func createS3Client(addr cdssdk.StorageType) (*s3.Client, string, error) {
switch addr := addr.(type) {
// case *cdssdk.COSType:

// case *cdssdk.OSSType:

case *cdssdk.OBSType:
awsConfig := aws.Config{}

cre := aws.Credentials{
AccessKeyID: addr.AK,
SecretAccessKey: addr.SK,
}
awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre}
awsConfig.Region = addr.Region

options := []func(*s3.Options){}
options = append(options, func(s3Opt *s3.Options) {
s3Opt.BaseEndpoint = &addr.Endpoint
})

cli := s3.NewFromConfig(awsConfig, options...)
return cli, addr.Bucket, nil

default:
return nil, "", fmt.Errorf("unsupported storage type %T", addr)
}
}

+ 47
- 0
common/pkgs/storage/s3/s3_test.go View File

@@ -0,0 +1,47 @@
package s3

import (
"context"
"fmt"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
. "github.com/smartystreets/goconvey/convey"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func Test_S3(t *testing.T) {
Convey("OBS", t, func() {
cli, bkt, err := createS3Client(&cdssdk.OBSType{
Region: "0",
AK: "0",
SK: "0",
Endpoint: "0",
Bucket: "0",
})
So(err, ShouldEqual, nil)

var marker *string
for {
resp, err := cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(bkt),
Prefix: aws.String("cds"),
MaxKeys: aws.Int32(5),
Marker: marker,
})
So(err, ShouldEqual, nil)

fmt.Printf("\n")
for _, obj := range resp.Contents {
fmt.Printf("%v, %v\n", *obj.Key, *obj.LastModified)
}

if *resp.IsTruncated {
marker = resp.NextMarker
} else {
break
}
}
})
}

+ 43
- 0
common/pkgs/storage/s3/service.go View File

@@ -0,0 +1,43 @@
package s3

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Service struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
}

func (s *Service) Info() stgmod.StorageDetail {
return s.Detail
}

func (s *Service) GetComponent(typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.ShardStore]():
if s.ShardStore == nil {
return nil, types.ErrComponentNotFound
}
return s.ShardStore, nil

default:
return nil, types.ErrComponentNotFound
}
}

func (s *Service) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}
}

func (s *Service) Stop() {
if s.ShardStore != nil {
s.ShardStore.Stop()
}
}

+ 444
- 0
common/pkgs/storage/s3/shard_store.go View File

@@ -1 +1,445 @@
package s3

import (
"context"
"errors"
"fmt"
"io"
"path/filepath"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/os2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

const (
TempDir = "tmp"
BlocksDir = "blocks"
)

type ShardStore struct {
svc *Service
cli *s3.Client
bucket string
cfg cdssdk.S3ShardStorage
lock sync.Mutex
workingTempFiles map[string]bool
done chan any
}

func NewShardStore(svc *Service, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage) (*ShardStore, error) {
return &ShardStore{
svc: svc,
cli: cli,
bucket: bkt,
cfg: cfg,
workingTempFiles: make(map[string]bool),
done: make(chan any, 1),
}, nil
}

func (s *ShardStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("component start, root: %v", s.cfg.Root)

go func() {
removeTempTicker := time.NewTicker(time.Minute * 10)
defer removeTempTicker.Stop()

for {
select {
case <-removeTempTicker.C:
s.removeUnusedTempFiles()
case <-s.done:
return
}
}
}()
}

func (s *ShardStore) removeUnusedTempFiles() {
s.lock.Lock()
defer s.lock.Unlock()

log := s.getLogger()

var deletes []s3types.ObjectIdentifier
deleteObjs := make(map[string]s3types.Object)
var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.bucket),
Prefix: aws.String(JoinKey(s.cfg.Root, TempDir, "/")),
Marker: marker,
})

if err != nil {
log.Warnf("read temp dir: %v", err)
return
}

for _, obj := range resp.Contents {
objName := BaseKey(*obj.Key)

if s.workingTempFiles[objName] {
continue
}

deletes = append(deletes, s3types.ObjectIdentifier{
Key: obj.Key,
})
deleteObjs[*obj.Key] = obj
}

if !*resp.IsTruncated {
break
}

marker = resp.NextMarker
}

resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String(s.bucket),
Delete: &s3types.Delete{
Objects: deletes,
},
})
if err != nil {
log.Warnf("delete temp files: %v", err)
return
}

for _, del := range resp.Deleted {
obj := deleteObjs[*del.Key]
log.Infof("remove unused temp file %v, size: %v, last mod time: %v", *obj.Key, *obj.Size, *obj.LastModified)
}
}

func (s *ShardStore) Stop() {
s.getLogger().Infof("component stop")

select {
case s.done <- nil:
default:
}
}

func (s *ShardStore) Create(stream io.Reader) (types.FileInfo, error) {
log := s.getLogger()

key, fileName := s.createTempFile()

counter := io2.NewCounter(stream)

resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: counter,
ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256,
})
if err != nil {
log.Warnf("uploading file %v: %v", key, err)

s.lock.Lock()
defer s.lock.Unlock()

delete(s.workingTempFiles, fileName)
return types.FileInfo{}, err
}

if resp.ChecksumSHA256 == nil {
log.Warnf("SHA256 checksum not found in response of uploaded file %v", key)
s.onCreateFailed(key, fileName)
return types.FileInfo{}, errors.New("SHA256 checksum not found in response")
}

hash, err := DecodeBase64Hash(*resp.ChecksumSHA256)
if err != nil {
log.Warnf("decode SHA256 checksum %v: %v", *resp.ChecksumSHA256, err)
s.onCreateFailed(key, fileName)
return types.FileInfo{}, fmt.Errorf("decode SHA256 checksum: %v", err)
}

return s.onCreateFinished(key, counter.Count(), hash)
}

func (s *ShardStore) createTempFile() (string, string) {
s.lock.Lock()
defer s.lock.Unlock()

tmpDir := JoinKey(s.cfg.Root, TempDir)
tmpName := os2.GenerateRandomFileName(20)

s.workingTempFiles[tmpName] = true
return JoinKey(tmpDir, tmpName), tmpName
}

func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()
defer delete(s.workingTempFiles, filepath.Base(tempFilePath))
defer func() {
// 不管是否成功。即使失败了也有定时清理机制去兜底
s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(tempFilePath),
})
}()

log := s.getLogger()

log.Debugf("write file %v finished, size: %v, hash: %v", tempFilePath, size, hash)

blockDir := s.getFileDirFromHash(hash)
newPath := JoinKey(blockDir, string(hash))

_, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{
Bucket: aws.String(s.bucket),
CopySource: aws.String(tempFilePath),
Key: aws.String(newPath),
})
if err != nil {
log.Warnf("copy file %v to %v: %v", tempFilePath, newPath, err)
return types.FileInfo{}, err
}

return types.FileInfo{
Hash: hash,
Size: size,
Description: newPath,
}, nil
}

func (s *ShardStore) onCreateFailed(key string, fileName string) {
// 不管是否成功。即使失败了也有定时清理机制去兜底
s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})

s.lock.Lock()
defer s.lock.Unlock()

delete(s.workingTempFiles, fileName)
}

// 使用NewOpen函数创建Option对象
func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) {
s.lock.Lock()
defer s.lock.Unlock()

fileName := string(opt.FileHash)
if len(fileName) < 2 {
return nil, fmt.Errorf("invalid file name")
}

filePath := s.getFilePathFromHash(cdssdk.FileHash(fileName))

rngStr := fmt.Sprintf("bytes=%d-", opt.Offset)
if opt.Length >= 0 {
rngStr += fmt.Sprintf("%d", opt.Offset+opt.Length-1)
}

resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(filePath),
Range: aws.String(rngStr),
})
if err != nil {
s.getLogger().Warnf("get file %v: %v", filePath, err)
return nil, err
}

return resp.Body, nil
}

func (s *ShardStore) Info(hash cdssdk.FileHash) (types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()

filePath := s.getFilePathFromHash(hash)
info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(filePath),
})
if err != nil {
s.getLogger().Warnf("get file %v: %v", filePath, err)
return types.FileInfo{}, err
}

return types.FileInfo{
Hash: hash,
Size: *info.ContentLength,
Description: filePath,
}, nil
}

func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
s.lock.Lock()
defer s.lock.Unlock()

var infos []types.FileInfo

blockDir := JoinKey(s.cfg.Root, BlocksDir)

var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.bucket),
Prefix: aws.String(blockDir),
Marker: marker,
})

if err != nil {
s.getLogger().Warnf("list objects: %v", err)
return nil, err
}

for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
if len(key) != 64 {
continue
}

infos = append(infos, types.FileInfo{
Hash: cdssdk.FileHash(key),
Size: *obj.Size,
Description: *obj.Key,
})
}

if !*resp.IsTruncated {
break
}

marker = resp.NextMarker
}

return infos, nil
}

func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error {
s.lock.Lock()
defer s.lock.Unlock()

avais := make(map[cdssdk.FileHash]bool)
for _, hash := range avaiables {
avais[hash] = true
}

blockDir := JoinKey(s.cfg.Root, BlocksDir)

var deletes []s3types.ObjectIdentifier
var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.bucket),
Prefix: aws.String(blockDir),
Marker: marker,
})

if err != nil {
s.getLogger().Warnf("list objects: %v", err)
return err
}

for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
if len(key) != 64 {
continue
}

if !avais[cdssdk.FileHash(key)] {
deletes = append(deletes, s3types.ObjectIdentifier{
Key: obj.Key,
})
}
}

if !*resp.IsTruncated {
break
}

marker = resp.NextMarker
}

cnt := 0
if len(deletes) > 0 {
resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String(s.bucket),
Delete: &s3types.Delete{
Objects: deletes,
},
})
if err != nil {
s.getLogger().Warnf("delete objects: %v", err)
return err
}

cnt = len(resp.Deleted)
}

s.getLogger().Infof("purge %d files", cnt)
// TODO 无法保证原子性,所以删除失败只打日志
return nil
}

func (s *ShardStore) Stats() types.Stats {
// TODO 统计本地存储的相关信息
return types.Stats{
Status: types.StatusOK,
}
}

func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error {
if info.FileHash == "" {
return fmt.Errorf("empty file hash is not allowed by this shard store")
}

s.lock.Lock()
defer s.lock.Unlock()
defer func() {
// 不管是否成功。即使失败了也有定时清理机制去兜底
s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(info.TempFilePath),
})
}()

log := s.getLogger()

log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.TempFilePath, info.Size, info.FileHash)

blockDir := s.getFileDirFromHash(info.FileHash)
newPath := JoinKey(blockDir, string(info.FileHash))

_, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{
CopySource: aws.String(JoinKey(s.bucket, info.TempFilePath)),
Bucket: aws.String(s.bucket),
Key: aws.String(newPath),
})
if err != nil {
log.Warnf("copy file %v to %v: %v", info.TempFilePath, newPath, err)
return fmt.Errorf("copy file: %w", err)
}

return nil
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("ShardStore", "S3").WithField("Storage", s.svc.Detail.Storage.String())
}

func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string {
return JoinKey(s.cfg.Root, BlocksDir, string(hash)[:2])
}

func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string {
return JoinKey(s.cfg.Root, BlocksDir, string(hash)[:2], string(hash))
}

+ 41
- 0
common/pkgs/storage/s3/utils.go View File

@@ -0,0 +1,41 @@
package s3

import (
"encoding/base64"
"fmt"
"strings"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func JoinKey(comps ...string) string {
sb := strings.Builder{}

hasTrailingSlash := true
for _, comp := range comps {
if !hasTrailingSlash {
sb.WriteString("/")
}
sb.WriteString(comp)
hasTrailingSlash = strings.HasSuffix(comp, "/")
}

return sb.String()
}

func BaseKey(key string) string {
return key[strings.LastIndex(key, "/")+1:]
}

func DecodeBase64Hash(hash string) (cdssdk.FileHash, error) {
hashBytes := make([]byte, 32)
n, err := base64.RawStdEncoding.Decode(hashBytes, []byte(hash))
if err != nil {
return "", err
}
if n != 32 {
return "", fmt.Errorf("invalid hash length: %d", n)
}

return cdssdk.FileHash(strings.ToUpper(string(hashBytes))), nil
}

+ 3
- 0
common/pkgs/storage/types/s3_client.go View File

@@ -21,8 +21,11 @@ type MultipartUploader interface {
Close()
}

// TODO 重构成一个接口,支持不同的类型的分片有不同内容的实现
type MultipartInitState struct {
UploadID string
Bucket string // TODO 临时使用
Key string // TODO 临时使用
}

type UploadedPartInfo struct {


+ 5
- 1
go.mod View File

@@ -1,6 +1,8 @@
module gitlink.org.cn/cloudream/storage

go 1.20
go 1.21

toolchain go1.23.2

replace gitlink.org.cn/cloudream/common v0.0.0 => ../common

@@ -25,6 +27,8 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.32.6 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/uuid v1.3.1 // indirect


+ 4
- 0
go.sum View File

@@ -3,6 +3,10 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=


Loading…
Cancel
Save