Browse Source

解决路径问题

feature_gxh
Sydonian 6 months ago
parent
commit
8e27676eeb
25 changed files with 141 additions and 190 deletions
  1. +4
    -0
      client/types/types.go
  2. +1
    -1
      common/pkgs/ioswitch2/parser/gen/generator.go
  3. +1
    -1
      common/pkgs/ioswitch2/plans/complete_multipart.go
  4. +1
    -1
      common/pkgs/ioswitchlrc/parser/passes.go
  5. +2
    -1
      common/pkgs/storage/efile/ec_multiplier.go
  6. +1
    -2
      common/pkgs/storage/efile/efile.go
  7. +3
    -2
      common/pkgs/storage/local/base_store.go
  8. +2
    -3
      common/pkgs/storage/local/local.go
  9. +24
    -20
      common/pkgs/storage/local/multipart_upload.go
  10. +11
    -14
      common/pkgs/storage/local/shard_store.go
  11. +3
    -4
      common/pkgs/storage/obs/obs.go
  12. +1
    -3
      common/pkgs/storage/obs/obs_test.go
  13. +6
    -4
      common/pkgs/storage/obs/s2s.go
  14. +1
    -1
      common/pkgs/storage/s3/base_store.go
  15. +7
    -7
      common/pkgs/storage/s3/multipart_upload.go
  16. +1
    -2
      common/pkgs/storage/s3/s3.go
  17. +20
    -27
      common/pkgs/storage/s3/shard_store.go
  18. +0
    -23
      common/pkgs/storage/s3/utils.go
  19. +0
    -0
      common/pkgs/storage/types/multiparter.go
  20. +13
    -17
      common/pkgs/storage/types/shard_store.go
  21. +5
    -1
      common/pkgs/storage/types/types.go
  22. +33
    -0
      common/pkgs/storage/types/utils.go
  23. +0
    -18
      common/pkgs/storage/utils/utils.go
  24. +1
    -2
      coordinator/types/storage.go
  25. +0
    -36
      coordinator/types/storage_feature.go

+ 4
- 0
client/types/types.go View File

@@ -82,6 +82,10 @@ type UserSpace struct {
ShardStore *cotypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json; serializer:json" json:"shardStore"`
// 存储服务特性功能的配置
Features []cotypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"`
// 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根。
WorkingDir string `gorm:"column:WorkingDir; type:varchar(1024); not null" json:"workingDir"`
// 工作目录在存储系统中的真实路径。当工作路径在挂载点内时,这个字段记录的是挂载背后的真实路径。部分直接与存储系统交互的组件需要知道真实路径。
// RealWorkingDir string `gorm:"column:RealWorkingDir; type:varchar(1024); not null" json:"realWorkingDir"`
// 用户空间信息的版本号,每一次更改都需要更新版本号
Revision int64 `gorm:"column:Revision; type:bigint; not null" json:"revision"`
}


+ 1
- 1
common/pkgs/ioswitch2/parser/gen/generator.go View File

@@ -354,7 +354,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e
func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitch2.ToShardStore:
tempFileName := os2.GenerateRandomFileName(20)
tempFileName := types.MakeTempDirPath(&t.Space, os2.GenerateRandomFileName(20))

write := ctx.DAG.NewBaseWrite(t, t.Space, tempFileName)
if err := setEnvByAddress(write, t.Space.RecommendHub, t.Space.RecommendHub.Address); err != nil {


+ 1
- 1
common/pkgs/ioswitch2/plans/complete_multipart.go View File

@@ -32,7 +32,7 @@ func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.Use
}

// TODO 应该采取更合理的方式同时支持Parser和直接生成DAG
br := da.NewBaseWrite(nil, targetSpace, os2.GenerateRandomFileName(20))
br := da.NewBaseWrite(nil, targetSpace, types.MakeTempDirPath(&targetSpace, os2.GenerateRandomFileName(20)))
br.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub), true)

as := da.NewStoreShard(targetSpace, shardInfoKey)


+ 1
- 1
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -102,7 +102,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err
func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitchlrc.ToNode:
tempFileName := os2.GenerateRandomFileName(20)
tempFileName := types.MakeTempDirPath(&t.Space, os2.GenerateRandomFileName(20))
write := ctx.DAG.NewBaseWrite(t, t.Space, tempFileName)

switch addr := t.Space.RecommendHub.Address.(type) {


+ 2
- 1
common/pkgs/storage/efile/ec_multiplier.go View File

@@ -48,9 +48,10 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunk
}

fileName := os2.GenerateRandomFileName(10)
tempDir := path.Join(m.blder.detail.UserSpace.WorkingDir, types.TempWorkingDir)
m.outputs = make([]string, len(coef))
for i := range m.outputs {
m.outputs[i] = path.Join(m.feat.TempDir, fmt.Sprintf("%s_%d", fileName, i))
m.outputs[i] = path.Join(tempDir, fmt.Sprintf("%s_%d", fileName, i))
}

u, err := url.JoinPath(m.url, "efile/openapi/v2/file/createECTask")


+ 1
- 2
common/pkgs/storage/efile/efile.go View File

@@ -11,7 +11,6 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

@@ -96,7 +95,7 @@ func (b *builder) CreateECMultiplier(typeOnly bool) (types.ECMultiplier, error)
return (*ECMultiplier)(nil), nil
}

feat := utils.FindFeature[*cortypes.ECMultiplierFeature](b.detail)
feat := types.FindFeature[*cortypes.ECMultiplierFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature ECMultiplier not found")
}


+ 3
- 2
common/pkgs/storage/local/base_store.go View File

@@ -135,7 +135,8 @@ func (s *BaseStore) ListAll(path string) ([]types.ListEntry, error) {
func (s *BaseStore) CleanTemps() {
log := s.getLogger()

entries, err := os.ReadDir(filepath.Join(s.root, TempDir))
tempDir := filepath.Join(s.root, s.detail.UserSpace.WorkingDir, types.TempWorkingDir)
entries, err := os.ReadDir(tempDir)
if err != nil {
log.Warnf("read temp dir: %v", err)
return
@@ -152,7 +153,7 @@ func (s *BaseStore) CleanTemps() {
continue
}

path := filepath.Join(s.root, TempDir, entry.Name())
path := filepath.Join(tempDir, entry.Name())
err = os.Remove(path)
if err != nil {
log.Warnf("remove temp file %v: %v", path, err)


+ 2
- 3
common/pkgs/storage/local/local.go View File

@@ -6,7 +6,6 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

@@ -58,7 +57,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) {
return (*Multiparter)(nil), nil
}

feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{})
}
@@ -73,7 +72,7 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) {
return (*S2STransfer)(nil), nil
}

feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail)
feat := types.FindFeature[*cortypes.S2STransferFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{})
}


+ 24
- 20
common/pkgs/storage/local/multipart_upload.go View File

@@ -18,26 +18,28 @@ import (
)

type Multiparter struct {
feat *cortypes.MultipartUploadFeature
detail *clitypes.UserSpaceDetail
feat *cortypes.MultipartUploadFeature
}

func (m *Multiparter) MinPartSize() int64 {
return m.feat.MinPartSize
func (*Multiparter) MinPartSize() int64 {
return 1 * 1024 * 1024 // 1MB
}

func (m *Multiparter) MaxPartSize() int64 {
return m.feat.MaxPartSize
func (*Multiparter) MaxPartSize() int64 {
return 5 * 1024 * 1024 * 1024 // 5GB
}

func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) {
absTempDir, err := filepath.Abs(m.feat.TempDir)
tempDir := filepath.Join(m.detail.UserSpace.WorkingDir, types.TempWorkingDir)
absTempDir, err := filepath.Abs(tempDir)
if err != nil {
return nil, fmt.Errorf("get abs temp dir %v: %v", m.feat.TempDir, err)
return nil, fmt.Errorf("get abs temp dir %v: %v", tempDir, err)
}

tempFileName := os2.GenerateRandomFileName(10)
tempPartsDir := filepath.Join(absTempDir, tempFileName)
joinedFilePath := filepath.Join(absTempDir, tempFileName+".joined")
absJoinedFilePath := filepath.Join(absTempDir, tempFileName+".joined")

err = os.MkdirAll(tempPartsDir, 0777)

@@ -46,11 +48,12 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error)
}

return &MultipartTask{
absTempDir: absTempDir,
tempFileName: tempFileName,
tempPartsDir: tempPartsDir,
joinedFilePath: joinedFilePath,
uploadID: tempPartsDir,
absTempDir: absTempDir,
tempFileName: tempFileName,
tempPartsDir: tempPartsDir,
joinedFilePath: types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir, tempFileName+".joined"),
absJoinedFilePath: absJoinedFilePath,
uploadID: tempPartsDir,
}, nil
}

@@ -73,11 +76,12 @@ func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitSt
}

type MultipartTask struct {
absTempDir string // 应该要是绝对路径
tempFileName string
tempPartsDir string
joinedFilePath string
uploadID string
absTempDir string // 应该要是绝对路径
tempFileName string
tempPartsDir string
joinedFilePath string
absJoinedFilePath string
uploadID string
}

func (i *MultipartTask) InitState() types.MultipartInitState {
@@ -91,7 +95,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar
return l.PartNumber - r.PartNumber
})

joined, err := os.Create(i.joinedFilePath)
joined, err := os.Create(i.absJoinedFilePath)
if err != nil {
return types.FileInfo{}, err
}
@@ -111,7 +115,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar

h := hasher.Sum(nil)
return types.FileInfo{
Path: joined.Name(),
Path: i.joinedFilePath,
Size: size,
Hash: clitypes.NewFullHash(h),
}, nil


+ 11
- 14
common/pkgs/storage/local/shard_store.go View File

@@ -13,11 +13,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)

const (
TempDir = "tmp"
BlocksDir = "blocks"
)

type ShardStore struct {
detail *clitypes.UserSpaceDetail
absRoot string
@@ -26,7 +21,7 @@ type ShardStore struct {
}

func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) {
absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.ShardStore.BaseDir))
absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir))
if err != nil {
return nil, fmt.Errorf("get abs root: %w", err)
}
@@ -78,7 +73,7 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ
return types.FileInfo{
Hash: hash,
Size: size,
Path: newPath,
Path: s.getSlashFilePathFromHash(hash),
}, nil
}

@@ -105,8 +100,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {

var infos []types.FileInfo

blockDir := filepath.Join(s.absRoot, BlocksDir)
err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error {
err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
@@ -128,7 +122,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
infos = append(infos, types.FileInfo{
Hash: fileHash,
Size: info.Size(),
Path: filepath.Join(blockDir, path),
Path: s.getSlashFilePathFromHash(fileHash),
})
return nil
})
@@ -150,8 +144,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error {

cnt := 0

blockDir := filepath.Join(s.absRoot, BlocksDir)
err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error {
err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
@@ -203,9 +196,13 @@ func (s *ShardStore) getLogger() logger.Logger {
}

func (s *ShardStore) getFileDirFromHash(hash clitypes.FileHash) string {
return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2))
return filepath.Join(s.absRoot, hash.GetHashPrefix(2))
}

func (s *ShardStore) getFilePathFromHash(hash clitypes.FileHash) string {
return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2), string(hash))
return filepath.Join(s.absRoot, hash.GetHashPrefix(2), string(hash))
}

func (s *ShardStore) getSlashFilePathFromHash(hash clitypes.FileHash) string {
return types.PathJoin(s.detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir, hash.GetHashPrefix(2), string(hash))
}

+ 3
- 4
common/pkgs/storage/obs/obs.go View File

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg"
s3stg "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/s3"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

@@ -96,7 +95,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) {
}

stgType := b.detail.UserSpace.Storage.(*cortypes.OBSType)
feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{})
}
@@ -125,7 +124,7 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) {
}

stgType := b.detail.UserSpace.Storage.(*cortypes.OBSType)
feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail)
feat := types.FindFeature[*cortypes.S2STransferFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{})
}
@@ -135,5 +134,5 @@ func (b *builder) CreateS2STransfer(typeOnly bool) (types.S2STransfer, error) {
return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential)
}

return NewS2STransfer(stgType, cred, feat), nil
return NewS2STransfer(b.detail, stgType, cred, feat), nil
}

+ 1
- 3
common/pkgs/storage/obs/obs_test.go View File

@@ -22,9 +22,7 @@ func Test_S2S(t *testing.T) {
AK: "",
SK: "",
},
feat: &cortypes.S2STransferFeature{
TempDir: "s2s",
},
feat: &cortypes.S2STransferFeature{},
}

_, err := s2s.Transfer(context.TODO(), &clitypes.UserSpaceDetail{


+ 6
- 4
common/pkgs/storage/obs/s2s.go View File

@@ -14,12 +14,12 @@ import (
omsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/region"
"gitlink.org.cn/cloudream/common/utils/os2"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/s3"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

type S2STransfer struct {
detail *clitypes.UserSpaceDetail
stgType *cortypes.OBSType
cred *cortypes.OBSCred
feat *cortypes.S2STransferFeature
@@ -27,8 +27,9 @@ type S2STransfer struct {
omsCli *oms.OmsClient
}

func NewS2STransfer(stgType *cortypes.OBSType, cred *cortypes.OBSCred, feat *cortypes.S2STransferFeature) *S2STransfer {
func NewS2STransfer(detail *clitypes.UserSpaceDetail, stgType *cortypes.OBSType, cred *cortypes.OBSCred, feat *cortypes.S2STransferFeature) *S2STransfer {
return &S2STransfer{
detail: detail,
stgType: stgType,
cred: cred,
feat: feat,
@@ -71,7 +72,8 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai
}

// 先上传成一个临时文件
tempPrefix := s3.JoinKey(s.feat.TempDir, os2.GenerateRandomFileName(10)) + "/"
tempDir := types.PathJoin(s.detail.UserSpace.WorkingDir, types.TempWorkingDir)
tempPrefix := types.PathJoin(tempDir, os2.GenerateRandomFileName(10)) + "/"

taskType := model.GetCreateTaskReqTaskTypeEnum().OBJECT
s.omsCli = oms.NewOmsClient(cli)
@@ -108,7 +110,7 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai

_, err = obsCli.CopyObject(ctx, &awss3.CopyObjectInput{
Bucket: aws.String(bkt),
CopySource: aws.String(s3.JoinKey(bkt, tempPrefix, srcPath)),
CopySource: aws.String(types.PathJoin(bkt, tempPrefix, srcPath)),
Key: aws.String(dstPath),
})
if err != nil {


+ 1
- 1
common/pkgs/storage/s3/base_store.go View File

@@ -162,7 +162,7 @@ func (s *BaseStore) CleanTemps() {
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir, "/")),
Prefix: aws.String(types.PathJoin(s.Detail.UserSpace.WorkingDir, types.TempWorkingDir, "/")),
Marker: marker,
})



+ 7
- 7
common/pkgs/storage/s3/multipart_upload.go View File

@@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"io"
"path/filepath"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -33,17 +32,18 @@ func NewMultiparter(detail *clitypes.UserSpaceDetail, feat *cortypes.MultipartUp
}
}

func (m *Multiparter) MinPartSize() int64 {
return m.feat.MinPartSize
func (*Multiparter) MinPartSize() int64 {
return 5 * 1024 * 1024 // 5MB
}

func (m *Multiparter) MaxPartSize() int64 {
return m.feat.MaxPartSize
func (*Multiparter) MaxPartSize() int64 {
return 5 * 1024 * 1024 * 1024 // 5GB
}

func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) {
tempFileName := os2.GenerateRandomFileName(10)
tempFilePath := filepath.Join(m.feat.TempDir, tempFileName)
tempDir := types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir)
tempFilePath := types.PathJoin(tempDir, tempFileName)

resp, err := m.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(m.bucket),
@@ -57,7 +57,7 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error)
return &MultipartTask{
cli: m.cli,
bucket: m.bucket,
tempDir: m.feat.TempDir,
tempDir: tempDir,
tempFileName: tempFileName,
tempFilePath: tempFilePath,
uploadID: *resp.UploadId,


+ 1
- 2
common/pkgs/storage/s3/s3.go View File

@@ -9,7 +9,6 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/utils"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

@@ -97,7 +96,7 @@ func (b *builder) CreateMultiparter(typeOnly bool) (types.Multiparter, error) {
}

stgType := b.detail.UserSpace.Storage.(*cortypes.S3Type)
feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
feat := types.FindFeature[*cortypes.MultipartUploadFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{})
}


+ 20
- 27
common/pkgs/storage/s3/shard_store.go View File

@@ -12,34 +12,31 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)

const (
TempDir = "tmp"
BlocksDir = "blocks"
)

type ShardStoreOption struct {
UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验,如果不行,则使用本地计算。默认使用本地计算。
}

type ShardStore struct {
Detail *clitypes.UserSpaceDetail
Bucket string
cli *s3.Client
opt ShardStoreOption
lock sync.Mutex
Detail *clitypes.UserSpaceDetail
Bucket string
workingDir string
cli *s3.Client
opt ShardStoreOption
lock sync.Mutex
}

func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) {
return &ShardStore{
Detail: detail,
Bucket: bkt,
cli: cli,
opt: opt,
Detail: detail,
Bucket: bkt,
workingDir: types.PathJoin(detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir),
cli: cli,
opt: opt,
}, nil
}

func (s *ShardStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.BaseDir)
s.getLogger().Infof("start, root: %v", s.workingDir)
}

func (s *ShardStore) Stop() {
@@ -55,11 +52,11 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ
log.Debugf("write file %v finished, size: %v, hash: %v", path, size, hash)

blockDir := s.GetFileDirFromHash(hash)
newPath := JoinKey(blockDir, string(hash))
newPath := types.PathJoin(blockDir, string(hash))

_, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{
Bucket: aws.String(s.Bucket),
CopySource: aws.String(JoinKey(s.Bucket, path)),
CopySource: aws.String(types.PathJoin(s.Bucket, path)),
Key: aws.String(newPath),
})
if err != nil {
@@ -101,13 +98,11 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {

var infos []types.FileInfo

blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir)

var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(blockDir),
Prefix: aws.String(s.workingDir),
Marker: marker,
})

@@ -117,7 +112,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
}

for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
key := types.PathBase(*obj.Key)

fileHash, err := clitypes.ParseHash(key)
if err != nil {
@@ -150,14 +145,12 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error {
avais[hash] = true
}

blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir)

var deletes []s3types.ObjectIdentifier
var marker *string
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(blockDir),
Prefix: aws.String(s.workingDir),
Marker: marker,
})

@@ -167,7 +160,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error {
}

for _, obj := range resp.Contents {
key := BaseKey(*obj.Key)
key := types.PathBase(*obj.Key)
fileHash, err := clitypes.ParseHash(key)
if err != nil {
continue
@@ -220,9 +213,9 @@ func (s *ShardStore) getLogger() logger.Logger {
}

func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string {
return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2))
return types.PathJoin(s.workingDir, hash.GetHashPrefix(2))
}

func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string {
return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2), string(hash))
return types.PathJoin(s.workingDir, hash.GetHashPrefix(2), string(hash))
}

+ 0
- 23
common/pkgs/storage/s3/utils.go View File

@@ -3,31 +3,8 @@ package s3
import (
"encoding/base64"
"fmt"
"strings"
)

func JoinKey(comps ...string) string {
sb := strings.Builder{}

hasTrailingSlash := true
for _, comp := range comps {
if comp == "" {
continue
}
if !hasTrailingSlash {
sb.WriteString("/")
}
sb.WriteString(comp)
hasTrailingSlash = strings.HasSuffix(comp, "/")
}

return sb.String()
}

func BaseKey(key string) string {
return key[strings.LastIndex(key, "/")+1:]
}

func DecodeBase64Hash(hash string) ([]byte, error) {
hashBytes := make([]byte, 32)
n, err := base64.RawStdEncoding.Decode(hashBytes, []byte(hash))


common/pkgs/storage/types/s3_client.go → common/pkgs/storage/types/multiparter.go View File


+ 13
- 17
common/pkgs/storage/types/shard_store.go View File

@@ -4,23 +4,7 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
)

type Status interface {
String() string
}

type OKStatus struct {
StoreEvent
}

func (s *OKStatus) String() string {
return "OK"
}

var StatusOK = &OKStatus{}

type StoreEvent interface {
IsStoreEvent()
}
const ShardStoreWorkingDir = "shards"

type ShardStore interface {
Start(ch *StorageEventChan)
@@ -49,3 +33,15 @@ type Stats struct {
// 描述信息,用于调试
Description string
}

type Status interface {
String() string
}

type OKStatus struct{}

func (s *OKStatus) String() string {
return "OK"
}

var StatusOK = &OKStatus{}

+ 5
- 1
common/pkgs/storage/types/types.go View File

@@ -7,6 +7,8 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
)

const TempWorkingDir = "temp"

var ErrStorageNotFound = errors.New("storage not found")

// 不支持的操作。可以作为StorageBuilder中任意函数的错误返回值,代表该操作不被支持。
@@ -14,7 +16,9 @@ var ErrUnsupported = errors.New("unsupported operation")

var ErrStorageExists = errors.New("storage already exists")

type StorageEvent interface{}
type StorageEvent interface {
IsStorageEvent() bool
}

type StorageEventChan = async.UnboundChannel[StorageEvent]



+ 33
- 0
common/pkgs/storage/types/utils.go View File

@@ -0,0 +1,33 @@
package types

import (
"path"

clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

func FindFeature[T cortypes.StorageFeature](detail *clitypes.UserSpaceDetail) T {
for _, f := range detail.UserSpace.Features {
f2, ok := f.(T)
if ok {
return f2
}
}

var def T
return def
}

func PathJoin(comps ...string) string {
return path.Join(comps...)
}

func PathBase(p string) string {
return path.Base(p)
}

func MakeTempDirPath(detail *clitypes.UserSpaceDetail, comps ...string) string {
cs := append([]string{detail.UserSpace.WorkingDir, TempWorkingDir}, comps...)
return PathJoin(cs...)
}

+ 0
- 18
common/pkgs/storage/utils/utils.go View File

@@ -1,18 +0,0 @@
package utils

import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

func FindFeature[T cortypes.StorageFeature](detail *clitypes.UserSpaceDetail) T {
for _, f := range detail.UserSpace.Features {
f2, ok := f.(T)
if ok {
return f2
}
}

var def T
return def
}

+ 1
- 2
coordinator/types/storage.go View File

@@ -248,6 +248,5 @@ func (a *S3Type) Equals(other StorageType) bool {
}

type ShardStoreUserConfig struct {
BaseDir string `json:"baseDir"`
MaxSize int64 `json:"maxSize"`
MaxSize int64 `json:"maxSize"`
}

+ 0
- 36
coordinator/types/storage_feature.go View File

@@ -13,34 +13,15 @@ type StorageFeature interface {
}

var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[StorageFeature](
(*BypassWriteFeature)(nil),
(*MultipartUploadFeature)(nil),
(*InternalServerlessCallFeature)(nil),
(*S2STransferFeature)(nil),
(*ECMultiplierFeature)(nil),
)), "type")

// 存储服务支持被非MasterHub直接上传文件
type BypassWriteFeature struct {
serder.Metadata `union:"BypassWrite"`
Type string `json:"type"`
}

func (f *BypassWriteFeature) GetFeatureType() string {
return "BypassWrite"
}

func (f *BypassWriteFeature) String() string {
return "BypassWrite"
}

// 存储服务支持分段上传
type MultipartUploadFeature struct {
serder.Metadata `union:"MultipartUpload"`
Type string `json:"type"`
TempDir string `json:"tempDir"` // 临时文件存放目录
MinPartSize int64 `json:"minPartSize"` // 最小分段大小
MaxPartSize int64 `json:"maxPartSize"` // 最大分段大小
}

func (f *MultipartUploadFeature) GetFeatureType() string {
@@ -51,26 +32,10 @@ func (f *MultipartUploadFeature) String() string {
return "MultipartUpload"
}

// 在存储服务所在的环境中部署有内部的Serverless服务
type InternalServerlessCallFeature struct {
serder.Metadata `union:"InternalServerlessCall"`
Type string `json:"type"`
CommandDir string `json:"commandDir"` // 存放命令文件的目录
}

func (f *InternalServerlessCallFeature) GetFeatureType() string {
return "InternalServerlessCall"
}

func (f *InternalServerlessCallFeature) String() string {
return "InternalServerlessCall"
}

// 存储服务之间直传文件
type S2STransferFeature struct {
serder.Metadata `union:"S2STransfer"`
Type string `json:"type"`
TempDir string `json:"tempDir"` // 临时文件存放目录
}

func (f *S2STransferFeature) GetFeatureType() string {
@@ -85,7 +50,6 @@ func (f *S2STransferFeature) String() string {
type ECMultiplierFeature struct {
serder.Metadata `union:"ECMultiplier"`
Type string `json:"type"`
TempDir string `json:"tempDir"` // 临时文件存放目录
}

func (f *ECMultiplierFeature) GetFeatureType() string {


Loading…
Cancel
Save