From 8e27676eebfe0e38dd306e88d47685d1a5646199 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 23 May 2025 14:39:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E8=B7=AF=E5=BE=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/types/types.go | 4 ++ common/pkgs/ioswitch2/parser/gen/generator.go | 2 +- .../ioswitch2/plans/complete_multipart.go | 2 +- common/pkgs/ioswitchlrc/parser/passes.go | 2 +- common/pkgs/storage/efile/ec_multiplier.go | 3 +- common/pkgs/storage/efile/efile.go | 3 +- common/pkgs/storage/local/base_store.go | 5 +- common/pkgs/storage/local/local.go | 5 +- common/pkgs/storage/local/multipart_upload.go | 44 +++++++++-------- common/pkgs/storage/local/shard_store.go | 25 +++++----- common/pkgs/storage/obs/obs.go | 7 ++- common/pkgs/storage/obs/obs_test.go | 4 +- common/pkgs/storage/obs/s2s.go | 10 ++-- common/pkgs/storage/s3/base_store.go | 2 +- common/pkgs/storage/s3/multipart_upload.go | 14 +++--- common/pkgs/storage/s3/s3.go | 3 +- common/pkgs/storage/s3/shard_store.go | 47 ++++++++----------- common/pkgs/storage/s3/utils.go | 23 --------- .../types/{s3_client.go => multiparter.go} | 0 common/pkgs/storage/types/shard_store.go | 30 +++++------- common/pkgs/storage/types/types.go | 6 ++- common/pkgs/storage/types/utils.go | 33 +++++++++++++ common/pkgs/storage/utils/utils.go | 18 ------- coordinator/types/storage.go | 3 +- coordinator/types/storage_feature.go | 36 -------------- 25 files changed, 141 insertions(+), 190 deletions(-) rename common/pkgs/storage/types/{s3_client.go => multiparter.go} (100%) create mode 100644 common/pkgs/storage/types/utils.go delete mode 100644 common/pkgs/storage/utils/utils.go diff --git a/client/types/types.go b/client/types/types.go index 8431f9f..6805910 100644 --- a/client/types/types.go +++ b/client/types/types.go @@ -82,6 +82,10 @@ type UserSpace struct { ShardStore *cotypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json; serializer:json" json:"shardStore"` // 存储服务特性功能的配置 Features []cotypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` + // 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根。 + WorkingDir string `gorm:"column:WorkingDir; type:varchar(1024); not null" json:"workingDir"` + // 工作目录在存储系统中的真实路径。当工作路径在挂载点内时,这个字段记录的是挂载背后的真实路径。部分直接与存储系统交互的组件需要知道真实路径。 + // RealWorkingDir string `gorm:"column:RealWorkingDir; type:varchar(1024); not null" json:"realWorkingDir"` // 用户空间信息的版本号,每一次更改都需要更新版本号 Revision int64 `gorm:"column:Revision; type:bigint; not null" json:"revision"` } diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index 77d25e6..b40ae19 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -354,7 +354,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToShardStore: - tempFileName := os2.GenerateRandomFileName(20) + tempFileName := types.MakeTempDirPath(&t.Space, os2.GenerateRandomFileName(20)) write := ctx.DAG.NewBaseWrite(t, t.Space, tempFileName) if err := setEnvByAddress(write, t.Space.RecommendHub, t.Space.RecommendHub.Address); err != nil { diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index 2860a0b..aa58de7 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -32,7 +32,7 @@ func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.Use } // TODO 应该采取更合理的方式同时支持Parser和直接生成DAG - br := da.NewBaseWrite(nil, targetSpace, os2.GenerateRandomFileName(20)) + br := da.NewBaseWrite(nil, targetSpace, types.MakeTempDirPath(&targetSpace, os2.GenerateRandomFileName(20))) br.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub), true) as := da.NewStoreShard(targetSpace, shardInfoKey) diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 65bb909..00b16ff 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -102,7 +102,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: - tempFileName := os2.GenerateRandomFileName(20) + tempFileName := types.MakeTempDirPath(&t.Space, os2.GenerateRandomFileName(20)) write := ctx.DAG.NewBaseWrite(t, t.Space, tempFileName) switch addr := t.Space.RecommendHub.Address.(type) { diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go index 7ba578a..5e0da05 100644 --- a/common/pkgs/storage/efile/ec_multiplier.go +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -48,9 +48,10 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunk } fileName := os2.GenerateRandomFileName(10) + tempDir := path.Join(m.blder.detail.UserSpace.WorkingDir, types.TempWorkingDir) m.outputs = make([]string, len(coef)) for i := range m.outputs { - m.outputs[i] = path.Join(m.feat.TempDir, fmt.Sprintf("%s_%d", fileName, i)) + m.outputs[i] = path.Join(tempDir, fmt.Sprintf("%s_%d", fileName, i)) } u, err := url.JoinPath(m.url, "efile/openapi/v2/file/createECTask") diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go index 57b96a0..cfd9f4d 100644 --- a/common/pkgs/storage/efile/efile.go +++ b/common/pkgs/storage/efile/efile.go @@ -11,7 +11,6 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -96,7 +95,7 @@ func (b *builder) CreateECMultiplier(typeOnly bool) (types.ECMultiplier, error) return (*ECMultiplier)(nil), nil } - feat := utils.FindFeature[*cortypes.ECMultiplierFeature](b.detail) + feat := types.FindFeature[*cortypes.ECMultiplierFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature ECMultiplier not found") } diff --git a/common/pkgs/storage/local/base_store.go b/common/pkgs/storage/local/base_store.go index 0da130c..e3c1819 100644 --- a/common/pkgs/storage/local/base_store.go +++ b/common/pkgs/storage/local/base_store.go @@ -135,7 +135,8 @@ func (s *BaseStore) ListAll(path string) ([]types.ListEntry, error) { func (s *BaseStore) CleanTemps() { log := s.getLogger() - entries, err := os.ReadDir(filepath.Join(s.root, TempDir)) + tempDir := filepath.Join(s.root, s.detail.UserSpace.WorkingDir, types.TempWorkingDir) + entries, err := os.ReadDir(tempDir) if err != nil { log.Warnf("read temp dir: %v", err) return @@ -152,7 +153,7 @@ func (s *BaseStore) CleanTemps() { continue } - path := filepath.Join(s.root, TempDir, entry.Name()) + path := filepath.Join(tempDir, entry.Name()) err = os.Remove(path) if err != nil { log.Warnf("remove temp file %v: %v", path, err) diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 8d7e637..fa7121e 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -6,7 +6,6 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -58,7 +57,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) { return (*Multiparter)(nil), nil } - feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail) + feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) } @@ -73,7 +72,7 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) { return (*S2STransfer)(nil), nil } - feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail) + feat := types.FindFeature[*cortypes.S2STransferFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{}) } diff --git a/common/pkgs/storage/local/multipart_upload.go b/common/pkgs/storage/local/multipart_upload.go index 027bd95..bdfd68d 100644 --- a/common/pkgs/storage/local/multipart_upload.go +++ b/common/pkgs/storage/local/multipart_upload.go @@ -18,26 +18,28 @@ import ( ) type Multiparter struct { - feat *cortypes.MultipartUploadFeature + detail *clitypes.UserSpaceDetail + feat *cortypes.MultipartUploadFeature } -func (m *Multiparter) MinPartSize() int64 { - return m.feat.MinPartSize +func (*Multiparter) MinPartSize() int64 { + return 1 * 1024 * 1024 // 1MB } -func (m *Multiparter) MaxPartSize() int64 { - return m.feat.MaxPartSize +func (*Multiparter) MaxPartSize() int64 { + return 5 * 1024 * 1024 * 1024 // 5GB } func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { - absTempDir, err := filepath.Abs(m.feat.TempDir) + tempDir := filepath.Join(m.detail.UserSpace.WorkingDir, types.TempWorkingDir) + absTempDir, err := filepath.Abs(tempDir) if err != nil { - return nil, fmt.Errorf("get abs temp dir %v: %v", m.feat.TempDir, err) + return nil, fmt.Errorf("get abs temp dir %v: %v", tempDir, err) } tempFileName := os2.GenerateRandomFileName(10) tempPartsDir := filepath.Join(absTempDir, tempFileName) - joinedFilePath := filepath.Join(absTempDir, tempFileName+".joined") + absJoinedFilePath := filepath.Join(absTempDir, tempFileName+".joined") err = os.MkdirAll(tempPartsDir, 0777) @@ -46,11 +48,12 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) } return &MultipartTask{ - absTempDir: absTempDir, - tempFileName: tempFileName, - tempPartsDir: tempPartsDir, - joinedFilePath: joinedFilePath, - uploadID: tempPartsDir, + absTempDir: absTempDir, + tempFileName: tempFileName, + tempPartsDir: tempPartsDir, + joinedFilePath: types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir, tempFileName+".joined"), + absJoinedFilePath: absJoinedFilePath, + uploadID: tempPartsDir, }, nil } @@ -73,11 +76,12 @@ func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitSt } type MultipartTask struct { - absTempDir string // 应该要是绝对路径 - tempFileName string - tempPartsDir string - joinedFilePath string - uploadID string + absTempDir string // 应该要是绝对路径 + tempFileName string + tempPartsDir string + joinedFilePath string + absJoinedFilePath string + uploadID string } func (i *MultipartTask) InitState() types.MultipartInitState { @@ -91,7 +95,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar return l.PartNumber - r.PartNumber }) - joined, err := os.Create(i.joinedFilePath) + joined, err := os.Create(i.absJoinedFilePath) if err != nil { return types.FileInfo{}, err } @@ -111,7 +115,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar h := hasher.Sum(nil) return types.FileInfo{ - Path: joined.Name(), + Path: i.joinedFilePath, Size: size, Hash: clitypes.NewFullHash(h), }, nil diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 864df94..edb337e 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -13,11 +13,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) -const ( - TempDir = "tmp" - BlocksDir = "blocks" -) - type ShardStore struct { detail *clitypes.UserSpaceDetail absRoot string @@ -26,7 +21,7 @@ type ShardStore struct { } func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) { - absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.ShardStore.BaseDir)) + absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir)) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) } @@ -78,7 +73,7 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ return types.FileInfo{ Hash: hash, Size: size, - Path: newPath, + Path: s.getSlashFilePathFromHash(hash), }, nil } @@ -105,8 +100,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - blockDir := filepath.Join(s.absRoot, BlocksDir) - err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -128,7 +122,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { infos = append(infos, types.FileInfo{ Hash: fileHash, Size: info.Size(), - Path: filepath.Join(blockDir, path), + Path: s.getSlashFilePathFromHash(fileHash), }) return nil }) @@ -150,8 +144,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { cnt := 0 - blockDir := filepath.Join(s.absRoot, BlocksDir) - err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -203,9 +196,13 @@ func (s *ShardStore) getLogger() logger.Logger { } func (s *ShardStore) getFileDirFromHash(hash clitypes.FileHash) string { - return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2)) + return filepath.Join(s.absRoot, hash.GetHashPrefix(2)) } func (s *ShardStore) getFilePathFromHash(hash clitypes.FileHash) string { - return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2), string(hash)) + return filepath.Join(s.absRoot, hash.GetHashPrefix(2), string(hash)) +} + +func (s *ShardStore) getSlashFilePathFromHash(hash clitypes.FileHash) string { + return types.PathJoin(s.detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir, hash.GetHashPrefix(2), string(hash)) } diff --git a/common/pkgs/storage/obs/obs.go b/common/pkgs/storage/obs/obs.go index 1fd9955..5c20933 100644 --- a/common/pkgs/storage/obs/obs.go +++ b/common/pkgs/storage/obs/obs.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg" s3stg "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -96,7 +95,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) { } stgType := b.detail.UserSpace.Storage.(*cortypes.OBSType) - feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail) + feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) } @@ -125,7 +124,7 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) { } stgType := b.detail.UserSpace.Storage.(*cortypes.OBSType) - feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail) + feat := types.FindFeature[*cortypes.S2STransferFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{}) } @@ -135,5 +134,5 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) { return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential) } - return NewS2STransfer(stgType, cred, feat), nil + return NewS2STransfer(b.detail, stgType, cred, feat), nil } diff --git a/common/pkgs/storage/obs/obs_test.go b/common/pkgs/storage/obs/obs_test.go index 67a4822..65bf268 100644 --- a/common/pkgs/storage/obs/obs_test.go +++ b/common/pkgs/storage/obs/obs_test.go @@ -22,9 +22,7 @@ func Test_S2S(t *testing.T) { AK: "", SK: "", }, - feat: &cortypes.S2STransferFeature{ - TempDir: "s2s", - }, + feat: &cortypes.S2STransferFeature{}, } _, err := s2s.Transfer(context.TODO(), &clitypes.UserSpaceDetail{ diff --git a/common/pkgs/storage/obs/s2s.go b/common/pkgs/storage/obs/s2s.go index f78d534..3a37626 100644 --- a/common/pkgs/storage/obs/s2s.go +++ b/common/pkgs/storage/obs/s2s.go @@ -14,12 +14,12 @@ import ( omsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/region" "gitlink.org.cn/cloudream/common/utils/os2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) type S2STransfer struct { + detail *clitypes.UserSpaceDetail stgType *cortypes.OBSType cred *cortypes.OBSCred feat *cortypes.S2STransferFeature @@ -27,8 +27,9 @@ type S2STransfer struct { omsCli *oms.OmsClient } -func NewS2STransfer(stgType *cortypes.OBSType, cred *cortypes.OBSCred, feat *cortypes.S2STransferFeature) *S2STransfer { +func NewS2STransfer(detail *clitypes.UserSpaceDetail, stgType *cortypes.OBSType, cred *cortypes.OBSCred, feat *cortypes.S2STransferFeature) *S2STransfer { return &S2STransfer{ + detail: detail, stgType: stgType, cred: cred, feat: feat, @@ -71,7 +72,8 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai } // 先上传成一个临时文件 - tempPrefix := s3.JoinKey(s.feat.TempDir, os2.GenerateRandomFileName(10)) + "/" + tempDir := types.PathJoin(s.detail.UserSpace.WorkingDir, types.TempWorkingDir) + tempPrefix := types.PathJoin(tempDir, os2.GenerateRandomFileName(10)) + "/" taskType := model.GetCreateTaskReqTaskTypeEnum().OBJECT s.omsCli = oms.NewOmsClient(cli) @@ -108,7 +110,7 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai _, err = obsCli.CopyObject(ctx, &awss3.CopyObjectInput{ Bucket: aws.String(bkt), - CopySource: aws.String(s3.JoinKey(bkt, tempPrefix, srcPath)), + CopySource: aws.String(types.PathJoin(bkt, tempPrefix, srcPath)), Key: aws.String(dstPath), }) if err != nil { diff --git a/common/pkgs/storage/s3/base_store.go b/common/pkgs/storage/s3/base_store.go index 8a6acbd..221bc67 100644 --- a/common/pkgs/storage/s3/base_store.go +++ b/common/pkgs/storage/s3/base_store.go @@ -162,7 +162,7 @@ func (s *BaseStore) CleanTemps() { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir, "/")), + Prefix: aws.String(types.PathJoin(s.Detail.UserSpace.WorkingDir, types.TempWorkingDir, "/")), Marker: marker, }) diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go index 0ee9d46..614fcf6 100644 --- a/common/pkgs/storage/s3/multipart_upload.go +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -4,7 +4,6 @@ import ( "context" "crypto/sha256" "io" - "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -33,17 +32,18 @@ func NewMultiparter(detail *clitypes.UserSpaceDetail, feat *cortypes.MultipartUp } } -func (m *Multiparter) MinPartSize() int64 { - return m.feat.MinPartSize +func (*Multiparter) MinPartSize() int64 { + return 5 * 1024 * 1024 // 5MB } -func (m *Multiparter) MaxPartSize() int64 { - return m.feat.MaxPartSize +func (*Multiparter) MaxPartSize() int64 { + return 5 * 1024 * 1024 * 1024 // 5GB } func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { tempFileName := os2.GenerateRandomFileName(10) - tempFilePath := filepath.Join(m.feat.TempDir, tempFileName) + tempDir := types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir) + tempFilePath := types.PathJoin(tempDir, tempFileName) resp, err := m.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ Bucket: aws.String(m.bucket), @@ -57,7 +57,7 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) return &MultipartTask{ cli: m.cli, bucket: m.bucket, - tempDir: m.feat.TempDir, + tempDir: tempDir, tempFileName: tempFileName, tempFilePath: tempFilePath, uploadID: *resp.UploadId, diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index 9cffe3b..e17e686 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -9,7 +9,6 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -97,7 +96,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) { } stgType := b.detail.UserSpace.Storage.(*cortypes.S3Type) - feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail) + feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 1399a10..ab36236 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -12,34 +12,31 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) -const ( - TempDir = "tmp" - BlocksDir = "blocks" -) - type ShardStoreOption struct { UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验,如果不行,则使用本地计算。默认使用本地计算。 } type ShardStore struct { - Detail *clitypes.UserSpaceDetail - Bucket string - cli *s3.Client - opt ShardStoreOption - lock sync.Mutex + Detail *clitypes.UserSpaceDetail + Bucket string + workingDir string + cli *s3.Client + opt ShardStoreOption + lock sync.Mutex } func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) { return &ShardStore{ - Detail: detail, - Bucket: bkt, - cli: cli, - opt: opt, + Detail: detail, + Bucket: bkt, + workingDir: types.PathJoin(detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir), + cli: cli, + opt: opt, }, nil } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.BaseDir) + s.getLogger().Infof("start, root: %v", s.workingDir) } func (s *ShardStore) Stop() { @@ -55,11 +52,11 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ log.Debugf("write file %v finished, size: %v, hash: %v", path, size, hash) blockDir := s.GetFileDirFromHash(hash) - newPath := JoinKey(blockDir, string(hash)) + newPath := types.PathJoin(blockDir, string(hash)) _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ Bucket: aws.String(s.Bucket), - CopySource: aws.String(JoinKey(s.Bucket, path)), + CopySource: aws.String(types.PathJoin(s.Bucket, path)), Key: aws.String(newPath), }) if err != nil { @@ -101,13 +98,11 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir) - var marker *string for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(blockDir), + Prefix: aws.String(s.workingDir), Marker: marker, }) @@ -117,7 +112,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { } for _, obj := range resp.Contents { - key := BaseKey(*obj.Key) + key := types.PathBase(*obj.Key) fileHash, err := clitypes.ParseHash(key) if err != nil { @@ -150,14 +145,12 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { avais[hash] = true } - blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir) - var deletes []s3types.ObjectIdentifier var marker *string for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(blockDir), + Prefix: aws.String(s.workingDir), Marker: marker, }) @@ -167,7 +160,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { } for _, obj := range resp.Contents { - key := BaseKey(*obj.Key) + key := types.PathBase(*obj.Key) fileHash, err := clitypes.ParseHash(key) if err != nil { continue @@ -220,9 +213,9 @@ func (s *ShardStore) getLogger() logger.Logger { } func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string { - return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2)) + return types.PathJoin(s.workingDir, hash.GetHashPrefix(2)) } func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string { - return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2), string(hash)) + return types.PathJoin(s.workingDir, hash.GetHashPrefix(2), string(hash)) } diff --git a/common/pkgs/storage/s3/utils.go b/common/pkgs/storage/s3/utils.go index 6bfe7c0..02fa87d 100644 --- a/common/pkgs/storage/s3/utils.go +++ b/common/pkgs/storage/s3/utils.go @@ -3,31 +3,8 @@ package s3 import ( "encoding/base64" "fmt" - "strings" ) -func JoinKey(comps ...string) string { - sb := strings.Builder{} - - hasTrailingSlash := true - for _, comp := range comps { - if comp == "" { - continue - } - if !hasTrailingSlash { - sb.WriteString("/") - } - sb.WriteString(comp) - hasTrailingSlash = strings.HasSuffix(comp, "/") - } - - return sb.String() -} - -func BaseKey(key string) string { - return key[strings.LastIndex(key, "/")+1:] -} - func DecodeBase64Hash(hash string) ([]byte, error) { hashBytes := make([]byte, 32) n, err := base64.RawStdEncoding.Decode(hashBytes, []byte(hash)) diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/multiparter.go similarity index 100% rename from common/pkgs/storage/types/s3_client.go rename to common/pkgs/storage/types/multiparter.go diff --git a/common/pkgs/storage/types/shard_store.go b/common/pkgs/storage/types/shard_store.go index b09e1f3..2a0f902 100644 --- a/common/pkgs/storage/types/shard_store.go +++ b/common/pkgs/storage/types/shard_store.go @@ -4,23 +4,7 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" ) -type Status interface { - String() string -} - -type OKStatus struct { - StoreEvent -} - -func (s *OKStatus) String() string { - return "OK" -} - -var StatusOK = &OKStatus{} - -type StoreEvent interface { - IsStoreEvent() -} +const ShardStoreWorkingDir = "shards" type ShardStore interface { Start(ch *StorageEventChan) @@ -49,3 +33,15 @@ type Stats struct { // 描述信息,用于调试 Description string } + +type Status interface { + String() string +} + +type OKStatus struct{} + +func (s *OKStatus) String() string { + return "OK" +} + +var StatusOK = &OKStatus{} diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 497ab80..8a80304 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -7,6 +7,8 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" ) +const TempWorkingDir = "temp" + var ErrStorageNotFound = errors.New("storage not found") // 不支持的操作。可以作为StorageBuilder中任意函数的错误返回值,代表该操作不被支持。 @@ -14,7 +16,9 @@ var ErrUnsupported = errors.New("unsupported operation") var ErrStorageExists = errors.New("storage already exists") -type StorageEvent interface{} +type StorageEvent interface { + IsStorageEvent() bool +} type StorageEventChan = async.UnboundChannel[StorageEvent] diff --git a/common/pkgs/storage/types/utils.go b/common/pkgs/storage/types/utils.go new file mode 100644 index 0000000..a6cea72 --- /dev/null +++ b/common/pkgs/storage/types/utils.go @@ -0,0 +1,33 @@ +package types + +import ( + "path" + + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +func FindFeature[T cortypes.StorageFeature](detail *clitypes.UserSpaceDetail) T { + for _, f := range detail.UserSpace.Features { + f2, ok := f.(T) + if ok { + return f2 + } + } + + var def T + return def +} + +func PathJoin(comps ...string) string { + return path.Join(comps...) +} + +func PathBase(p string) string { + return path.Base(p) +} + +func MakeTempDirPath(detail *clitypes.UserSpaceDetail, comps ...string) string { + cs := append([]string{detail.UserSpace.WorkingDir, TempWorkingDir}, comps...) + return PathJoin(cs...) +} diff --git a/common/pkgs/storage/utils/utils.go b/common/pkgs/storage/utils/utils.go deleted file mode 100644 index 4614a27..0000000 --- a/common/pkgs/storage/utils/utils.go +++ /dev/null @@ -1,18 +0,0 @@ -package utils - -import ( - clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" -) - -func FindFeature[T cortypes.StorageFeature](detail *clitypes.UserSpaceDetail) T { - for _, f := range detail.UserSpace.Features { - f2, ok := f.(T) - if ok { - return f2 - } - } - - var def T - return def -} diff --git a/coordinator/types/storage.go b/coordinator/types/storage.go index 02b078d..ce34730 100644 --- a/coordinator/types/storage.go +++ b/coordinator/types/storage.go @@ -248,6 +248,5 @@ func (a *S3Type) Equals(other StorageType) bool { } type ShardStoreUserConfig struct { - BaseDir string `json:"baseDir"` - MaxSize int64 `json:"maxSize"` + MaxSize int64 `json:"maxSize"` } diff --git a/coordinator/types/storage_feature.go b/coordinator/types/storage_feature.go index a477518..02b8f2b 100644 --- a/coordinator/types/storage_feature.go +++ b/coordinator/types/storage_feature.go @@ -13,34 +13,15 @@ type StorageFeature interface { } var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[StorageFeature]( - (*BypassWriteFeature)(nil), (*MultipartUploadFeature)(nil), - (*InternalServerlessCallFeature)(nil), (*S2STransferFeature)(nil), (*ECMultiplierFeature)(nil), )), "type") -// 存储服务支持被非MasterHub直接上传文件 -type BypassWriteFeature struct { - serder.Metadata `union:"BypassWrite"` - Type string `json:"type"` -} - -func (f *BypassWriteFeature) GetFeatureType() string { - return "BypassWrite" -} - -func (f *BypassWriteFeature) String() string { - return "BypassWrite" -} - // 存储服务支持分段上传 type MultipartUploadFeature struct { serder.Metadata `union:"MultipartUpload"` Type string `json:"type"` - TempDir string `json:"tempDir"` // 临时文件存放目录 - MinPartSize int64 `json:"minPartSize"` // 最小分段大小 - MaxPartSize int64 `json:"maxPartSize"` // 最大分段大小 } func (f *MultipartUploadFeature) GetFeatureType() string { @@ -51,26 +32,10 @@ func (f *MultipartUploadFeature) String() string { return "MultipartUpload" } -// 在存储服务所在的环境中部署有内部的Serverless服务 -type InternalServerlessCallFeature struct { - serder.Metadata `union:"InternalServerlessCall"` - Type string `json:"type"` - CommandDir string `json:"commandDir"` // 存放命令文件的目录 -} - -func (f *InternalServerlessCallFeature) GetFeatureType() string { - return "InternalServerlessCall" -} - -func (f *InternalServerlessCallFeature) String() string { - return "InternalServerlessCall" -} - // 存储服务之间直传文件 type S2STransferFeature struct { serder.Metadata `union:"S2STransfer"` Type string `json:"type"` - TempDir string `json:"tempDir"` // 临时文件存放目录 } func (f *S2STransferFeature) GetFeatureType() string { @@ -85,7 +50,6 @@ func (f *S2STransferFeature) String() string { type ECMultiplierFeature struct { serder.Metadata `union:"ECMultiplier"` Type string `json:"type"` - TempDir string `json:"tempDir"` // 临时文件存放目录 } func (f *ECMultiplierFeature) GetFeatureType() string {