diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go index 79588c3..0f3ff68 100644 --- a/common/pkgs/storage/efile/ec_multiplier.go +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -6,17 +6,18 @@ import ( "net/url" "path" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/os2" "gitlink.org.cn/cloudream/common/utils/serder" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type ECMultiplier struct { blder *builder url string - feat *cdssdk.ECMultiplierFeature + feat *cortypes.ECMultiplierFeature outputs []string completed bool } @@ -98,7 +99,7 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunk ret[i] = types.BypassUploadedFile{ Path: m.outputs[i], Size: data.Size, - Hash: cdssdk.NewFullHashFromString(data.Sha256), + Hash: clitypes.NewFullHashFromString(data.Sha256), } } diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go index b81bcf6..3a18339 100644 --- a/common/pkgs/storage/efile/efile.go +++ b/common/pkgs/storage/efile/efile.go @@ -6,17 +6,17 @@ import ( "sync" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/utils" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func init() { - reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder { + reg.RegisterBuilder[*cortypes.EFileType](func(detail *clitypes.UserSpaceDetail) types.StorageBuilder { return &builder{ detail: detail, } @@ -25,35 +25,35 @@ func init() { type builder struct { types.EmptyBuilder - detail stgmod.StorageDetail + detail *clitypes.UserSpaceDetail token string tokenLock sync.Mutex getTokenTime time.Time } func (b *builder) getToken() (string, error) { - stgType := b.detail.Storage.Type.(*cdssdk.EFileType) + cred := b.detail.UserSpace.Credential.(*cortypes.EFileCred) b.tokenLock.Lock() defer b.tokenLock.Unlock() if b.token != "" { dt := time.Since(b.getTokenTime) - if dt < time.Second*time.Duration(stgType.TokenExpire) { + if dt < time.Second*time.Duration(cred.TokenExpire) { return b.token, nil } } - u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens") + u, err := url.JoinPath(cred.TokenURL, "/ac/openapi/v2/tokens") if err != nil { return "", err } resp, err := http2.PostJSON(u, http2.RequestParam{ Header: map[string]string{ - "user": stgType.User, - "password": stgType.Password, - "orgId": stgType.OrgID, + "user": cred.User, + "password": cred.Password, + "orgId": cred.OrgID, }, }) if err != nil { @@ -80,25 +80,30 @@ func (b *builder) getToken() (string, error) { } for _, d := range r.Data { - if d.ClusterID == stgType.ClusterID { + if d.ClusterID == cred.ClusterID { b.token = d.Token b.getTokenTime = time.Now() return d.Token, nil } } - return "", fmt.Errorf("clusterID %s not found", stgType.ClusterID) + return "", fmt.Errorf("clusterID %s not found", cred.ClusterID) } func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { - feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail) + feat := utils.FindFeature[*cortypes.ECMultiplierFeature](b.detail.Storage) if feat == nil { return nil, fmt.Errorf("feature ECMultiplier not found") } + cred, ok := b.detail.UserSpace.Credential.(*cortypes.EFileCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for efile storage", b.detail.UserSpace.Credential) + } + return &ECMultiplier{ blder: b, - url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL, + url: cred.APIURL, feat: feat, }, nil } diff --git a/common/pkgs/storage/factory/reg/reg.go b/common/pkgs/storage/factory/reg/reg.go index 8cd0a7a..c602d23 100644 --- a/common/pkgs/storage/factory/reg/reg.go +++ b/common/pkgs/storage/factory/reg/reg.go @@ -3,10 +3,10 @@ package reg import ( "reflect" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/reflect2" clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type BuilderCtor func(detail *clitypes.UserSpaceDetail) types.StorageBuilder @@ -14,7 +14,7 @@ type BuilderCtor func(detail *clitypes.UserSpaceDetail) types.StorageBuilder var StorageBuilders = make(map[reflect.Type]BuilderCtor) // 注册针对指定存储服务类型的Builder -func RegisterBuilder[T cdssdk.StorageType](ctor BuilderCtor) { +func RegisterBuilder[T cortypes.StorageType](ctor BuilderCtor) { StorageBuilders[reflect2.TypeOf[T]()] = ctor } diff --git a/common/pkgs/storage/local/agent.go b/common/pkgs/storage/local/agent.go deleted file mode 100644 index 3289c3f..0000000 --- a/common/pkgs/storage/local/agent.go +++ /dev/null @@ -1,52 +0,0 @@ -package local - -import ( - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" -) - -type agent struct { - Detail stgmod.StorageDetail - ShardStore *ShardStore - PublicStore *PublicStore -} - -func (s *agent) Start(ch *types.StorageEventChan) { - if s.ShardStore != nil { - s.ShardStore.Start(ch) - } - - if s.PublicStore != nil { - s.PublicStore.Start(ch) - } -} - -func (s *agent) Stop() { - if s.ShardStore != nil { - s.ShardStore.Stop() - } - - if s.PublicStore != nil { - s.PublicStore.Stop() - } -} - -func (s *agent) Info() stgmod.StorageDetail { - return s.Detail -} - -func (a *agent) GetShardStore() (types.ShardStore, error) { - if a.ShardStore == nil { - return nil, types.ErrUnsupported - } - - return a.ShardStore, nil -} - -func (a *agent) GetPublicStore() (types.PublicStore, error) { - if a.PublicStore == nil { - return nil, types.ErrUnsupported - } - - return a.PublicStore, nil -} diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 368bf28..6d4aa2b 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -3,15 +3,15 @@ package local import ( "fmt" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/utils" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func init() { - reg.RegisterBuilder[*cdssdk.LocalStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + reg.RegisterBuilder[*cortypes.LocalStorageType](func(detail *clitypes.UserSpaceDetail) types.StorageBuilder { return &builder{ detail: detail, } @@ -20,57 +20,28 @@ func init() { type builder struct { types.EmptyBuilder - detail stgmod.StorageDetail + detail *clitypes.UserSpaceDetail } -func (b *builder) CreateAgent() (types.StorageAgent, error) { - agt := &agent{ - Detail: b.detail, +func (b *builder) FeatureDesc() types.FeatureDesc { + return types.FeatureDesc{ + HasBypassWrite: true, + HasBypassRead: true, } - - if b.detail.Storage.ShardStore != nil { - local, ok := b.detail.Storage.ShardStore.(*cdssdk.LocalShardStorage) - if !ok { - return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) - } - - store, err := NewShardStore(agt, *local) - if err != nil { - return nil, err - } - - agt.ShardStore = store - } - - if b.detail.Storage.PublicStore != nil { - local, ok := b.detail.Storage.PublicStore.(*cdssdk.LocalPublicStorage) - if !ok { - return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore) - } - - store, err := NewPublicStore(agt, *local) - if err != nil { - return nil, err - } - - agt.PublicStore = store - } - - return agt, nil } -func (b *builder) ShardStoreDesc() types.ShardStoreDesc { - return &ShardStoreDesc{builder: b} +func (b *builder) CreateShardStore() (types.ShardStore, error) { + return NewShardStore(b.detail) } -func (b *builder) PublicStoreDesc() types.PublicStoreDesc { - return &PublicStoreDesc{builder: b} +func (b *builder) CreatePublicStore() (types.PublicStore, error) { + return NewPublicStore(b.detail) } func (b *builder) CreateMultiparter() (types.Multiparter, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail) + feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail.Storage) if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) + return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) } return &Multiparter{ @@ -79,9 +50,9 @@ func (b *builder) CreateMultiparter() (types.Multiparter, error) { } func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { - feat := utils.FindFeature[*cdssdk.S2STransferFeature](b.detail) + feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail.Storage) if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.S2STransferFeature{}) + return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{}) } return &S2STransfer{ diff --git a/common/pkgs/storage/local/multipart_upload.go b/common/pkgs/storage/local/multipart_upload.go index b2fce0f..0c1479b 100644 --- a/common/pkgs/storage/local/multipart_upload.go +++ b/common/pkgs/storage/local/multipart_upload.go @@ -9,15 +9,16 @@ import ( "os" "path/filepath" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/os2" "gitlink.org.cn/cloudream/common/utils/sort2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Multiparter struct { - feat *cdssdk.MultipartUploadFeature + feat *cortypes.MultipartUploadFeature } func (m *Multiparter) MinPartSize() int64 { @@ -112,7 +113,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar return types.BypassUploadedFile{ Path: joined.Name(), Size: size, - Hash: cdssdk.NewFullHash(h), + Hash: clitypes.NewFullHash(h), }, nil } diff --git a/common/pkgs/storage/local/public_store.go b/common/pkgs/storage/local/public_store.go index 57b6ae0..cf471e4 100644 --- a/common/pkgs/storage/local/public_store.go +++ b/common/pkgs/storage/local/public_store.go @@ -7,47 +7,26 @@ import ( "path/filepath" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) -type PublicStoreDesc struct { - types.EmptyPublicStoreDesc - builder *builder -} - -func (d *PublicStoreDesc) Enabled() bool { - return d.builder.detail.Storage.PublicStore != nil -} - type PublicStore struct { - agt *agent - cfg cdssdk.LocalPublicStorage + detail *clitypes.UserSpaceDetail } -func NewPublicStore(agt *agent, cfg cdssdk.LocalPublicStorage) (*PublicStore, error) { +func NewPublicStore(detail *clitypes.UserSpaceDetail) (*PublicStore, error) { return &PublicStore{ - agt: agt, - cfg: cfg, + detail: detail, }, nil } -func (s *PublicStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) -} - -func (s *PublicStore) Stop() { - s.getLogger().Infof("component stop") -} - func (s *PublicStore) Write(objPath string, stream io.Reader) error { - fullPath := filepath.Join(s.cfg.LoadBase, objPath) - err := os.MkdirAll(filepath.Dir(fullPath), 0755) + err := os.MkdirAll(filepath.Dir(objPath), 0755) if err != nil { return err } - f, err := os.Create(fullPath) + f, err := os.Create(objPath) if err != nil { return err } @@ -62,8 +41,7 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error { } func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { - fullPath := filepath.Join(s.cfg.LoadBase, objPath) - f, err := os.Open(fullPath) + f, err := os.Open(objPath) if err != nil { return nil, err } @@ -72,11 +50,9 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { } func (s *PublicStore) List(path string, recursive bool) ([]string, error) { - fullPath := filepath.Join(s.cfg.LoadBase, path) - var pathes []string if recursive { - err := filepath.WalkDir(fullPath, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -84,12 +60,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { return nil } - relPath, err := filepath.Rel(s.cfg.LoadBase, path) - if err != nil { - return err - } - - pathes = append(pathes, filepath.ToSlash(relPath)) + pathes = append(pathes, filepath.ToSlash(path)) return nil }) if err != nil { @@ -97,7 +68,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { } } else { - files, err := os.ReadDir(fullPath) + files, err := os.ReadDir(path) if err != nil { return nil, err } @@ -107,12 +78,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { continue } - relPath, err := filepath.Rel(s.cfg.LoadBase, filepath.Join(fullPath, f.Name())) - if err != nil { - return nil, err - } - - pathes = append(pathes, filepath.ToSlash(relPath)) + pathes = append(pathes, filepath.ToSlash(filepath.Join(path, f.Name()))) } } @@ -120,5 +86,5 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { } func (s *PublicStore) getLogger() logger.Logger { - return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) + return logger.WithField("PublicStore", "Local").WithField("UserSpace", s.detail.UserSpace) } diff --git a/common/pkgs/storage/local/s2s.go b/common/pkgs/storage/local/s2s.go index 445c86a..735b043 100644 --- a/common/pkgs/storage/local/s2s.go +++ b/common/pkgs/storage/local/s2s.go @@ -7,20 +7,20 @@ import ( "os" "path/filepath" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/os2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type S2STransfer struct { - feat cdssdk.S2STransferFeature - detail stgmod.StorageDetail + feat *cortypes.S2STransferFeature + detail *clitypes.UserSpaceDetail dstPath string } // 只有同一个机器的存储之间才可以进行数据直传 -func (s *S2STransfer) CanTransfer(src stgmod.StorageDetail) bool { - _, ok := src.Storage.Type.(*cdssdk.LocalStorageType) +func (s *S2STransfer) CanTransfer(src *clitypes.UserSpaceDetail) bool { + _, ok := src.Storage.Type.(*cortypes.LocalStorageType) if !ok { return false } @@ -33,7 +33,7 @@ func (s *S2STransfer) CanTransfer(src stgmod.StorageDetail) bool { } // 执行数据直传 -func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error) { +func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string) (string, error) { absTempDir, err := filepath.Abs(s.feat.TempDir) if err != nil { return "", fmt.Errorf("get abs temp dir %v: %v", s.feat.TempDir, err) diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 287185c..e8ea7a3 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -12,8 +12,8 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -23,41 +23,22 @@ const ( BlocksDir = "blocks" ) -type ShardStoreDesc struct { - types.EmptyShardStoreDesc - builder *builder -} - -func (s *ShardStoreDesc) Enabled() bool { - return s.builder.detail.Storage.ShardStore != nil -} - -func (s *ShardStoreDesc) HasBypassWrite() bool { - return s.Enabled() -} - -func (s *ShardStoreDesc) HasBypassRead() bool { - return s.Enabled() -} - type ShardStore struct { - agt *agent - cfg cdssdk.LocalShardStorage + detail *clitypes.UserSpaceDetail absRoot string lock sync.Mutex workingTempFiles map[string]bool done chan any } -func NewShardStore(svc *agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { - absRoot, err := filepath.Abs(cfg.Root) +func NewShardStore(detail *clitypes.UserSpaceDetail) (*ShardStore, error) { + absRoot, err := filepath.Abs(detail.UserSpace.ShardStore.Root) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) } return &ShardStore{ - agt: svc, - cfg: cfg, + detail: detail, absRoot: absRoot, workingTempFiles: make(map[string]bool), done: make(chan any, 1), @@ -65,7 +46,7 @@ func NewShardStore(svc *agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("component start, root: %v, max size: %v", s.absRoot, s.cfg.MaxSize) + s.getLogger().Infof("component start, root: %v, max size: %v", s.absRoot, s.detail.UserSpace.ShardStore.MaxSize) go func() { removeTempTicker := time.NewTicker(time.Minute * 10) @@ -137,7 +118,7 @@ func (s *ShardStore) Create(stream io.Reader) (types.FileInfo, error) { counter := io2.Counter(stream) size, hash, err := s.writeTempFile(file, counter) if stgglb.Stats.HubStorageTransfer != nil { - stgglb.Stats.HubStorageTransfer.RecordUpload(s.agt.Detail.Storage.StorageID, counter.Count(), err == nil) + stgglb.Stats.HubStorageTransfer.RecordUpload(s.detail.Storage.StorageID, counter.Count(), err == nil) } if err != nil { // Name是文件完整路径 @@ -171,7 +152,7 @@ func (s *ShardStore) createTempFile() (*os.File, error) { return file, nil } -func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdssdk.FileHash, error) { +func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, clitypes.FileHash, error) { defer file.Close() buf := make([]byte, 32*1024) @@ -197,10 +178,10 @@ func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdss } h := hasher.Sum(nil) - return size, cdssdk.NewFullHash(h), nil + return size, clitypes.NewFullHash(h), nil } -func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) { +func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash clitypes.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() defer delete(s.workingTempFiles, filepath.Base(tempFilePath)) @@ -285,12 +266,12 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { return io2.CounterCloser(ret, func(cnt int64, err error) { if stgglb.Stats.HubStorageTransfer != nil { - stgglb.Stats.HubStorageTransfer.RecordDownload(s.agt.Detail.Storage.StorageID, cnt, err == nil || err == io.EOF) + stgglb.Stats.HubStorageTransfer.RecordDownload(s.detail.Storage.StorageID, cnt, err == nil || err == io.EOF) } }), nil } -func (s *ShardStore) Info(hash cdssdk.FileHash) (types.FileInfo, error) { +func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() @@ -328,7 +309,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { return err } - fileHash, err := cdssdk.ParseHash(filepath.Base(info.Name())) + fileHash, err := clitypes.ParseHash(filepath.Base(info.Name())) if err != nil { return nil } @@ -347,11 +328,11 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { return infos, nil } -func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { +func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { s.lock.Lock() defer s.lock.Unlock() - avais := make(map[cdssdk.FileHash]bool) + avais := make(map[clitypes.FileHash]bool) for _, hash := range avaiables { avais[hash] = true } @@ -373,7 +354,7 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { return err } - fileHash, err := cdssdk.ParseHash(filepath.Base(info.Name())) + fileHash, err := clitypes.ParseHash(filepath.Base(info.Name())) if err != nil { return nil } @@ -407,14 +388,14 @@ func (s *ShardStore) Stats() types.Stats { } func (s *ShardStore) getLogger() logger.Logger { - return logger.WithField("ShardStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) + return logger.WithField("ShardStore", "Local").WithField("Storage", s.detail.Storage.String()) } -func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string { +func (s *ShardStore) getFileDirFromHash(hash clitypes.FileHash) string { return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2)) } -func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string { +func (s *ShardStore) getFilePathFromHash(hash clitypes.FileHash) string { return filepath.Join(s.absRoot, BlocksDir, hash.GetHashPrefix(2), string(hash)) } @@ -454,7 +435,7 @@ func (s *ShardStore) BypassUploaded(info types.BypassUploadedFile) error { var _ types.BypassRead = (*ShardStore)(nil) -func (s *ShardStore) BypassRead(fileHash cdssdk.FileHash) (types.BypassFilePath, error) { +func (s *ShardStore) BypassRead(fileHash clitypes.FileHash) (types.BypassFilePath, error) { s.lock.Lock() defer s.lock.Unlock() diff --git a/common/pkgs/storage/local/temp_store.go b/common/pkgs/storage/local/temp_store.go deleted file mode 100644 index cff37c8..0000000 --- a/common/pkgs/storage/local/temp_store.go +++ /dev/null @@ -1,15 +0,0 @@ -package local - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type LocalTempStore struct { - cfg cdssdk.BypassWriteFeature - stg cdssdk.Storage -} - -func NewLocalTempStore(stg cdssdk.Storage, cfg cdssdk.BypassWriteFeature) *LocalTempStore { - return &LocalTempStore{ - cfg: cfg, - stg: stg, - } -} diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go index 59dca30..c6a3ecd 100644 --- a/common/pkgs/storage/mashup/mashup.go +++ b/common/pkgs/storage/mashup/mashup.go @@ -1,14 +1,16 @@ package mashup import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "fmt" + + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func init() { - reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + reg.RegisterBuilder[*cortypes.MashupStorageType](func(detail *clitypes.UserSpaceDetail) types.StorageBuilder { return &builder{ detail: detail, } @@ -16,59 +18,95 @@ func init() { } type builder struct { - detail stgmod.StorageDetail + detail *clitypes.UserSpaceDetail } -func (b *builder) CreateAgent() (types.StorageAgent, error) { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Agent +func (b *builder) FeatureDesc() types.FeatureDesc { + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return types.FeatureDesc{} + } + + newDetail := *b.detail + newDetail.Storage.Type = stgType.Store + newDetail.UserSpace.Credential = cred.Store - blder := reg.GetBuilderInternal(detail) - return blder.CreateAgent() + blder := reg.GetBuilderInternal(&newDetail) + return blder.FeatureDesc() } -func (b *builder) ShardStoreDesc() types.ShardStoreDesc { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Agent +func (b *builder) CreateShardStore() (types.ShardStore, error) { + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for mashup storage", b.detail.UserSpace.Credential) + } - blder := reg.GetBuilderInternal(detail) - return blder.ShardStoreDesc() + newDetail := *b.detail + newDetail.Storage.Type = stgType.Store + newDetail.UserSpace.Credential = cred.Store + + blder := reg.GetBuilderInternal(&newDetail) + return blder.CreateShardStore() } -func (b *builder) PublicStoreDesc() types.PublicStoreDesc { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Agent +func (b *builder) CreatePublicStore() (types.PublicStore, error) { + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for mashup storage", b.detail.UserSpace.Credential) + } + + newDetail := *b.detail + newDetail.Storage.Type = stgType.Store + newDetail.UserSpace.Credential = cred.Store - blder := reg.GetBuilderInternal(detail) - return blder.PublicStoreDesc() + blder := reg.GetBuilderInternal(&newDetail) + return blder.CreatePublicStore() } func (b *builder) CreateMultiparter() (types.Multiparter, error) { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Feature + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for mashup storage", b.detail.UserSpace.Credential) + } - blder := reg.GetBuilderInternal(detail) + newDetail := *b.detail + newDetail.Storage.Type = stgType.Feature + newDetail.UserSpace.Credential = cred.Feature + + blder := reg.GetBuilderInternal(&newDetail) return blder.CreateMultiparter() } func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Feature + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for mashup storage", b.detail.UserSpace.Credential) + } + + newDetail := *b.detail + newDetail.Storage.Type = stgType.Feature + newDetail.UserSpace.Credential = cred.Feature - blder := reg.GetBuilderInternal(detail) + blder := reg.GetBuilderInternal(&newDetail) return blder.CreateS2STransfer() } func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { - stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) - detail := b.detail - detail.Storage.Type = stgType.Feature + stgType := b.detail.Storage.Type.(*cortypes.MashupStorageType) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.MashupCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for mashup storage", b.detail.UserSpace.Credential) + } + + newDetail := *b.detail + newDetail.Storage.Type = stgType.Feature + newDetail.UserSpace.Credential = cred.Feature - blder := reg.GetBuilderInternal(detail) + blder := reg.GetBuilderInternal(&newDetail) return blder.CreateECMultiplier() } diff --git a/common/pkgs/storage/obs/agent.go b/common/pkgs/storage/obs/agent.go deleted file mode 100644 index 48f9249..0000000 --- a/common/pkgs/storage/obs/agent.go +++ /dev/null @@ -1,39 +0,0 @@ -package obs - -import ( - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" -) - -type Agent struct { - Detail stgmod.StorageDetail - ShardStore *ShardStore -} - -func (s *Agent) Start(ch *types.StorageEventChan) { - if s.ShardStore != nil { - s.ShardStore.Start(ch) - } -} - -func (a *Agent) Stop() { - if a.ShardStore != nil { - a.ShardStore.Stop() - } -} - -func (a *Agent) Info() stgmod.StorageDetail { - return a.Detail -} - -func (a *Agent) GetShardStore() (types.ShardStore, error) { - if a.ShardStore == nil { - return nil, types.ErrUnsupported - } - - return a.ShardStore, nil -} - -func (a *Agent) GetPublicStore() (types.PublicStore, error) { - return nil, types.ErrUnsupported -} diff --git a/common/pkgs/storage/obs/obs.go b/common/pkgs/storage/obs/obs.go index 1d3488d..fcc1ec7 100644 --- a/common/pkgs/storage/obs/obs.go +++ b/common/pkgs/storage/obs/obs.go @@ -6,88 +6,96 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg" s3stg "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/utils" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func init() { - reg.RegisterBuilder[*cdssdk.OBSType](newBuilder) + reg.RegisterBuilder[*cortypes.OBSType](newBuilder) } type builder struct { types.EmptyBuilder - detail stgmod.StorageDetail + detail *clitypes.UserSpaceDetail } -func newBuilder(detail stgmod.StorageDetail) types.StorageBuilder { +func newBuilder(detail *clitypes.UserSpaceDetail) types.StorageBuilder { return &builder{ detail: detail, } } -func (b *builder) CreateAgent() (types.StorageAgent, error) { - obsType, ok := b.detail.Storage.Type.(*cdssdk.OBSType) - if !ok { - return nil, fmt.Errorf("invalid storage type %T for obs agent", b.detail.Storage.Type) +func (b *builder) FeatureDesc() types.FeatureDesc { + return types.FeatureDesc{ + HasBypassWrite: true, + HasBypassRead: true, + HasBypassHTTPRead: true, } +} - agt := &Agent{ - Detail: b.detail, +func (b *builder) CreateShardStore() (types.ShardStore, error) { + cred, ok := b.detail.UserSpace.Credential.(*cortypes.OBSCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential) } - if b.detail.Storage.ShardStore != nil { - cfg, ok := b.detail.Storage.ShardStore.(*cdssdk.S3ShardStorage) - if !ok { - return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) - } - - store, err := NewShardStore(b.detail, obsType, *cfg) - if err != nil { - return nil, err - } - - agt.ShardStore = store + cli, bucket, err := createClient(cred) + if err != nil { + return nil, err } - return agt, nil + return NewShardStore(b.detail, cred, cli, bucket) } -func (b *builder) ShardStoreDesc() types.ShardStoreDesc { - return &ShardStoreDesc{ - ShardStoreDesc: s3stg.NewShardStoreDesc(&b.detail), +func (b *builder) CreatePublicStore() (types.PublicStore, error) { + cred, ok := b.detail.UserSpace.Credential.(*cortypes.OBSCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential) } + + cli, bucket, err := createClient(cred) + if err != nil { + return nil, err + } + + return s3stg.NewPublicStore(b.detail, cli, bucket) } -func createClient(addr *cdssdk.OBSType) (*s3.Client, string, error) { +func createClient(cred *cortypes.OBSCred) (*s3.Client, string, error) { awsConfig := aws.Config{} cre := aws.Credentials{ - AccessKeyID: addr.AK, - SecretAccessKey: addr.SK, + AccessKeyID: cred.AK, + SecretAccessKey: cred.SK, } awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} - awsConfig.Region = addr.Region + awsConfig.Region = cred.Region options := []func(*s3.Options){} options = append(options, func(s3Opt *s3.Options) { - s3Opt.BaseEndpoint = &addr.Endpoint + s3Opt.BaseEndpoint = &cred.Endpoint }) cli := s3.NewFromConfig(awsConfig, options...) - return cli, addr.Bucket, nil + return cli, cred.Bucket, nil } func (b *builder) CreateMultiparter() (types.Multiparter, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail) + feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail.Storage) if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) + return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) } - cli, bucket, err := createClient(b.detail.Storage.Type.(*cdssdk.OBSType)) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.OBSCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential) + } + + cli, bucket, err := createClient(cred) if err != nil { return nil, err } @@ -101,10 +109,15 @@ func (b *builder) CreateMultiparter() (types.Multiparter, error) { } func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { - feat := utils.FindFeature[*cdssdk.S2STransferFeature](b.detail) + feat := utils.FindFeature[*cortypes.S2STransferFeature](b.detail.Storage) if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.S2STransferFeature{}) + return nil, fmt.Errorf("feature %T not found", cortypes.S2STransferFeature{}) + } + + cred, ok := b.detail.UserSpace.Credential.(*cortypes.OBSCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for obs storage", b.detail.UserSpace.Credential) } - return NewS2STransfer(b.detail.Storage.Type.(*cdssdk.OBSType), feat), nil + return NewS2STransfer(cred, feat), nil } diff --git a/common/pkgs/storage/obs/obs_test.go b/common/pkgs/storage/obs/obs_test.go index 9a2497a..4a200c5 100644 --- a/common/pkgs/storage/obs/obs_test.go +++ b/common/pkgs/storage/obs/obs_test.go @@ -5,14 +5,14 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func Test_S2S(t *testing.T) { Convey("OBS", t, func() { s2s := S2STransfer{ - dstStg: &cdssdk.OBSType{ + cred: &cortypes.OBSCred{ Region: "cn-north-4", Endpoint: "obs.cn-north-4.myhuaweicloud.com", AK: "", @@ -20,14 +20,14 @@ func Test_S2S(t *testing.T) { Bucket: "pcm3-bucket3", ProjectID: "", }, - feat: &cdssdk.S2STransferFeature{ + feat: &cortypes.S2STransferFeature{ TempDir: "s2s", }, } - newPath, err := s2s.Transfer(context.TODO(), stgmod.StorageDetail{ - Storage: cdssdk.Storage{ - Type: &cdssdk.OBSType{ + newPath, err := s2s.Transfer(context.TODO(), &clitypes.UserSpaceDetail{ + UserSpace: clitypes.UserSpace{ + Credential: cortypes.OBSCred{ Region: "cn-north-4", Endpoint: "obs.cn-north-4.myhuaweicloud.com", AK: "", @@ -36,6 +36,9 @@ func Test_S2S(t *testing.T) { ProjectID: "", }, }, + Storage: cortypes.Storage{ + Type: &cortypes.OBSType{}, + }, }, "test_data/test03.txt") defer s2s.Abort() diff --git a/common/pkgs/storage/obs/s2s.go b/common/pkgs/storage/obs/s2s.go index 4a586eb..12c650c 100644 --- a/common/pkgs/storage/obs/s2s.go +++ b/common/pkgs/storage/obs/s2s.go @@ -10,49 +10,49 @@ import ( oms "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/model" omsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/region" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/os2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/s3" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type S2STransfer struct { - dstStg *cdssdk.OBSType - feat *cdssdk.S2STransferFeature + cred *cortypes.OBSCred + feat *cortypes.S2STransferFeature taskID *int64 omsCli *oms.OmsClient } -func NewS2STransfer(dstStg *cdssdk.OBSType, feat *cdssdk.S2STransferFeature) *S2STransfer { +func NewS2STransfer(cred *cortypes.OBSCred, feat *cortypes.S2STransferFeature) *S2STransfer { return &S2STransfer{ - dstStg: dstStg, - feat: feat, + cred: cred, + feat: feat, } } // 判断是否能从指定的源存储中直传到当前存储的目的路径 -func (s *S2STransfer) CanTransfer(src stgmod.StorageDetail) bool { - req := s.makeRequest(src.Storage.Type, "") +func (s *S2STransfer) CanTransfer(src *clitypes.UserSpaceDetail) bool { + req := s.makeRequest(src, "") return req != nil } // 执行数据直传。返回传输后的文件路径 -func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error) { - req := s.makeRequest(src.Storage.Type, srcPath) +func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string) (string, error) { + req := s.makeRequest(src, srcPath) if req == nil { return "", fmt.Errorf("unsupported source storage type: %T", src.Storage.Type) } auth, err := basic.NewCredentialsBuilder(). - WithAk(s.dstStg.AK). - WithSk(s.dstStg.SK). - WithProjectId(s.dstStg.ProjectID). + WithAk(s.cred.AK). + WithSk(s.cred.SK). + WithProjectId(s.cred.ProjectID). SafeBuild() if err != nil { return "", err } - region, err := omsregion.SafeValueOf(s.dstStg.Region) + region, err := omsregion.SafeValueOf(s.cred.Region) if err != nil { return "", err } @@ -74,10 +74,10 @@ func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, sr TaskType: &taskType, SrcNode: req, DstNode: &model.DstNodeReq{ - Region: s.dstStg.Region, - Ak: s.dstStg.AK, - Sk: s.dstStg.SK, - Bucket: s.dstStg.Bucket, + Region: s.cred.Region, + Ak: s.cred.AK, + Sk: s.cred.SK, + Bucket: s.cred.Bucket, SavePrefix: &tempPrefix, }, }, @@ -96,16 +96,22 @@ func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, sr return s3.JoinKey(tempPrefix, srcPath), nil } -func (s *S2STransfer) makeRequest(srcStg cdssdk.StorageType, srcPath string) *model.SrcNodeReq { - switch srcStg := srcStg.(type) { - case *cdssdk.OBSType: +func (s *S2STransfer) makeRequest(srcStg *clitypes.UserSpaceDetail, srcPath string) *model.SrcNodeReq { + switch srcStg.Storage.Type.(type) { + case *cortypes.OBSType: cloudType := "HuaweiCloud" + + cred, ok := srcStg.UserSpace.Credential.(*cortypes.OBSCred) + if !ok { + return nil + } + return &model.SrcNodeReq{ CloudType: &cloudType, - Region: &srcStg.Region, - Ak: &srcStg.AK, - Sk: &srcStg.SK, - Bucket: &srcStg.Bucket, + Region: &cred.Region, + Ak: &cred.AK, + Sk: &cred.SK, + Bucket: &cred.Bucket, ObjectKey: &[]string{srcPath}, } diff --git a/common/pkgs/storage/obs/shard_store.go b/common/pkgs/storage/obs/shard_store.go index c117bfa..63c5452 100644 --- a/common/pkgs/storage/obs/shard_store.go +++ b/common/pkgs/storage/obs/shard_store.go @@ -1,37 +1,26 @@ package obs import ( + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -type ShardStoreDesc struct { - s3.ShardStoreDesc -} - -func (d *ShardStoreDesc) HasBypassHTTPRead() bool { - return d.Enabled() -} - type ShardStore struct { *s3.ShardStore - obsType *cdssdk.OBSType + cred *cortypes.OBSCred } -func NewShardStore(detail stgmod.StorageDetail, obsType *cdssdk.OBSType, cfg cdssdk.S3ShardStorage) (*ShardStore, error) { +func NewShardStore(detail *clitypes.UserSpaceDetail, cred *cortypes.OBSCred, s3Cli *awss3.Client, bkt string) (*ShardStore, error) { sd := ShardStore{ - obsType: obsType, - } - - s3Cli, bkt, err := createClient(obsType) - if err != nil { - return nil, err + cred: cred, } - sd.ShardStore, err = s3.NewShardStore(detail, s3Cli, bkt, cfg, s3.ShardStoreOption{ + var err error + sd.ShardStore, err = s3.NewShardStore(detail, s3Cli, bkt, s3.ShardStoreOption{ UseAWSSha256: false, }) if err != nil { @@ -41,8 +30,8 @@ func NewShardStore(detail stgmod.StorageDetail, obsType *cdssdk.OBSType, cfg cds return &sd, nil } -func (s *ShardStore) HTTPBypassRead(fileHash cdssdk.FileHash) (types.HTTPRequest, error) { - cli, err := obs.New(s.obsType.AK, s.obsType.SK, s.obsType.Endpoint) +func (s *ShardStore) HTTPBypassRead(fileHash clitypes.FileHash) (types.HTTPRequest, error) { + cli, err := obs.New(s.cred.AK, s.cred.SK, s.cred.Endpoint) if err != nil { return types.HTTPRequest{}, err } diff --git a/common/pkgs/storage/s3/agent.go b/common/pkgs/storage/s3/agent.go deleted file mode 100644 index cb085c5..0000000 --- a/common/pkgs/storage/s3/agent.go +++ /dev/null @@ -1,52 +0,0 @@ -package s3 - -import ( - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" -) - -type Agent struct { - Detail stgmod.StorageDetail - ShardStore *ShardStore - PublicStore *PublicStore -} - -func (s *Agent) Start(ch *types.StorageEventChan) { - if s.ShardStore != nil { - s.ShardStore.Start(ch) - } - - if s.PublicStore != nil { - s.PublicStore.Start(ch) - } -} - -func (a *Agent) Stop() { - if a.ShardStore != nil { - a.ShardStore.Stop() - } - - if a.PublicStore != nil { - a.PublicStore.Stop() - } -} - -func (a *Agent) Info() stgmod.StorageDetail { - return a.Detail -} - -func (a *Agent) GetShardStore() (types.ShardStore, error) { - if a.ShardStore == nil { - return nil, types.ErrUnsupported - } - - return a.ShardStore, nil -} - -func (a *Agent) GetPublicStore() (types.PublicStore, error) { - if a.PublicStore == nil { - return nil, types.ErrUnsupported - } - - return a.PublicStore, nil -} diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go index 38bbd38..b20cb10 100644 --- a/common/pkgs/storage/s3/multipart_upload.go +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -9,22 +9,22 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/os2" "gitlink.org.cn/cloudream/common/utils/sort2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Multiparter struct { - detail stgmod.StorageDetail - feat *cdssdk.MultipartUploadFeature + detail *clitypes.UserSpaceDetail + feat *cortypes.MultipartUploadFeature bucket string cli *s3.Client } -func NewMultiparter(detail stgmod.StorageDetail, feat *cdssdk.MultipartUploadFeature, bkt string, cli *s3.Client) *Multiparter { +func NewMultiparter(detail *clitypes.UserSpaceDetail, feat *cortypes.MultipartUploadFeature, bkt string, cli *s3.Client) *Multiparter { return &Multiparter{ detail: detail, feat: feat, @@ -138,7 +138,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar return types.BypassUploadedFile{}, err } - hash := cdssdk.CalculateCompositeHash(partHashes) + hash := clitypes.CalculateCompositeHash(partHashes) return types.BypassUploadedFile{ Path: i.tempFilePath, diff --git a/common/pkgs/storage/s3/public_store.go b/common/pkgs/storage/s3/public_store.go index ca2b35a..c3f675a 100644 --- a/common/pkgs/storage/s3/public_store.go +++ b/common/pkgs/storage/s3/public_store.go @@ -3,57 +3,29 @@ package s3 import ( "context" "io" - "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) -type PublicStoreDesc struct { - types.EmptyPublicStoreDesc - Detail *stgmod.StorageDetail -} - -func NewPublicStoreDesc(detail *stgmod.StorageDetail) PublicStoreDesc { - return PublicStoreDesc{ - Detail: detail, - } -} - -func (d *PublicStoreDesc) Enabled() bool { - return d.Detail.Storage.PublicStore != nil -} - type PublicStore struct { - Detail stgmod.StorageDetail + Detail *clitypes.UserSpaceDetail Bucket string cli *s3.Client - cfg cdssdk.S3PublicStorage } -func NewPublicStore(detail stgmod.StorageDetail, cli *s3.Client, bkt string, cfg cdssdk.S3PublicStorage) (*PublicStore, error) { +func NewPublicStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string) (*PublicStore, error) { return &PublicStore{ Detail: detail, Bucket: bkt, cli: cli, - cfg: cfg, }, nil } -func (s *PublicStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) -} - -func (s *PublicStore) Stop() { - s.getLogger().Infof("component stop") -} - func (s *PublicStore) Write(objPath string, stream io.Reader) error { - key := JoinKey(s.cfg.LoadBase, objPath) + key := objPath _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(s.Bucket), @@ -65,7 +37,7 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error { } func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { - key := JoinKey(s.cfg.LoadBase, objPath) + key := objPath resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{ Bucket: aws.String(s.Bucket), @@ -80,7 +52,7 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { } func (s *PublicStore) List(path string, recursive bool) ([]string, error) { - key := JoinKey(s.cfg.LoadBase, path) + key := path // TODO 待测试 input := &s3.ListObjectsInput{ @@ -103,7 +75,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { } for _, obj := range resp.Contents { - pathes = append(pathes, strings.TrimPrefix(*obj.Key, s.cfg.LoadBase+"/")) + pathes = append(pathes, *obj.Key) } if !*resp.IsTruncated { diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index 9022c49..9f882b0 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -6,118 +6,98 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/utils" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func init() { - reg.RegisterBuilder[*cdssdk.S3Type](newBuilder) + reg.RegisterBuilder[*cortypes.S3Type](newBuilder) } type builder struct { types.EmptyBuilder - detail stgmod.StorageDetail + detail *clitypes.UserSpaceDetail } -func newBuilder(detail stgmod.StorageDetail) types.StorageBuilder { +func newBuilder(detail *clitypes.UserSpaceDetail) types.StorageBuilder { return &builder{ detail: detail, } } -func (b *builder) CreateAgent() (types.StorageAgent, error) { - s3Type, ok := b.detail.Storage.Type.(*cdssdk.S3Type) - if !ok { - return nil, fmt.Errorf("invalid storage type %T for obs agent", b.detail.Storage.Type) +func (b *builder) FeatureDesc() types.FeatureDesc { + return types.FeatureDesc{ + HasBypassWrite: true, + HasBypassRead: true, + HasBypassHTTPRead: false, } +} - agt := &Agent{ - Detail: b.detail, +func (b *builder) CreateShardStore() (types.ShardStore, error) { + s3Cred, ok := b.detail.UserSpace.Credential.(*cortypes.S3Cred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for s3 storage", b.detail.UserSpace.Credential) } - if b.detail.Storage.ShardStore != nil { - cfg, ok := b.detail.Storage.ShardStore.(*cdssdk.S3ShardStorage) - if !ok { - return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) - } - - cli, bkt, err := createClient(s3Type) - if err != nil { - return nil, err - } - - store, err := NewShardStore(b.detail, cli, bkt, *cfg, ShardStoreOption{UseAWSSha256: true}) - if err != nil { - return nil, err - } - - agt.ShardStore = store + cli, bkt, err := createClient(s3Cred) + if err != nil { + return nil, err } - if b.detail.Storage.PublicStore != nil { - cfg, ok := b.detail.Storage.PublicStore.(*cdssdk.S3PublicStorage) - if !ok { - return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore) - } - - cli, bkt, err := createClient(s3Type) - if err != nil { - return nil, err - } - - store, err := NewPublicStore(b.detail, cli, bkt, *cfg) - if err != nil { - return nil, err - } + return NewShardStore(b.detail, cli, bkt, ShardStoreOption{UseAWSSha256: true}) +} - agt.PublicStore = store +func (b *builder) CreatePublicStore() (types.PublicStore, error) { + s3Cred, ok := b.detail.UserSpace.Credential.(*cortypes.S3Cred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for s3 storage", b.detail.UserSpace.Credential) } - return agt, nil -} - -func (b *builder) ShardStoreDesc() types.ShardStoreDesc { - desc := NewShardStoreDesc(&b.detail) - return &desc -} + cli, bkt, err := createClient(s3Cred) + if err != nil { + return nil, err + } -func (b *builder) PublicStoreDesc() types.PublicStoreDesc { - desc := NewPublicStoreDesc(&b.detail) - return &desc + return NewPublicStore(b.detail, cli, bkt) } -func createClient(addr *cdssdk.S3Type) (*s3.Client, string, error) { +func createClient(cred *cortypes.S3Cred) (*s3.Client, string, error) { awsConfig := aws.Config{} - if addr.AK != "" && addr.SK != "" { + if cred.AK != "" && cred.SK != "" { cre := aws.Credentials{ - AccessKeyID: addr.AK, - SecretAccessKey: addr.SK, + AccessKeyID: cred.AK, + SecretAccessKey: cred.SK, } awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} } - awsConfig.Region = addr.Region + awsConfig.Region = cred.Region options := []func(*s3.Options){} options = append(options, func(s3Opt *s3.Options) { - s3Opt.BaseEndpoint = &addr.Endpoint + s3Opt.BaseEndpoint = &cred.Endpoint }) cli := s3.NewFromConfig(awsConfig, options...) - return cli, addr.Bucket, nil + return cli, cred.Bucket, nil } func (b *builder) CreateMultiparter() (types.Multiparter, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail) + feat := utils.FindFeature[*cortypes.MultipartUploadFeature](b.detail.Storage) if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) + return nil, fmt.Errorf("feature %T not found", cortypes.MultipartUploadFeature{}) + } + + s3Cred, ok := b.detail.UserSpace.Credential.(*cortypes.S3Cred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for s3 public store", b.detail.UserSpace.Credential) } - cli, bucket, err := createClient(b.detail.Storage.Type.(*cdssdk.S3Type)) + cli, bucket, err := createClient(s3Cred) if err != nil { return nil, err } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index a51495e..c9905f3 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -14,11 +14,10 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/os2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -27,50 +26,25 @@ const ( BlocksDir = "blocks" ) -type ShardStoreDesc struct { - types.EmptyShardStoreDesc - Detail *stgmod.StorageDetail -} - -func NewShardStoreDesc(detail *stgmod.StorageDetail) ShardStoreDesc { - return ShardStoreDesc{ - Detail: detail, - } -} - -func (s *ShardStoreDesc) Enabled() bool { - return s.Detail.Storage.ShardStore != nil -} - -func (s *ShardStoreDesc) HasBypassWrite() bool { - return s.Enabled() -} - -func (s *ShardStoreDesc) HasBypassRead() bool { - return s.Enabled() -} - type ShardStoreOption struct { UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验,如果不行,则使用本地计算。默认使用本地计算。 } type ShardStore struct { - Detail stgmod.StorageDetail + Detail *clitypes.UserSpaceDetail Bucket string cli *s3.Client - cfg cdssdk.S3ShardStorage opt ShardStoreOption lock sync.Mutex workingTempFiles map[string]bool done chan any } -func NewShardStore(detail stgmod.StorageDetail, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage, opt ShardStoreOption) (*ShardStore, error) { +func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) { return &ShardStore{ Detail: detail, - cli: cli, Bucket: bkt, - cfg: cfg, + cli: cli, opt: opt, workingTempFiles: make(map[string]bool), done: make(chan any, 1), @@ -78,7 +52,7 @@ func NewShardStore(detail stgmod.StorageDetail, cli *s3.Client, bkt string, cfg } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("component start, root: %v", s.cfg.Root) + s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.Root) go func() { removeTempTicker := time.NewTicker(time.Minute * 10) @@ -107,7 +81,7 @@ func (s *ShardStore) removeUnusedTempFiles() { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(JoinKey(s.cfg.Root, TempDir, "/")), + Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir, "/")), Marker: marker, }) @@ -213,7 +187,7 @@ func (s *ShardStore) createWithAwsSha256(stream io.Reader) (types.FileInfo, erro return types.FileInfo{}, fmt.Errorf("decode SHA256 checksum: %v", err) } - return s.onCreateFinished(key, counter.Count(), cdssdk.NewFullHash(hash)) + return s.onCreateFinished(key, counter.Count(), clitypes.NewFullHash(hash)) } func (s *ShardStore) createWithCalcSha256(stream io.Reader) (types.FileInfo, error) { @@ -242,21 +216,21 @@ func (s *ShardStore) createWithCalcSha256(stream io.Reader) (types.FileInfo, err return types.FileInfo{}, err } - return s.onCreateFinished(key, counter.Count(), cdssdk.NewFullHash(hashStr.Sum())) + return s.onCreateFinished(key, counter.Count(), clitypes.NewFullHash(hashStr.Sum())) } func (s *ShardStore) createTempFile() (string, string) { s.lock.Lock() defer s.lock.Unlock() - tmpDir := JoinKey(s.cfg.Root, TempDir) + tmpDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir) tmpName := os2.GenerateRandomFileName(20) s.workingTempFiles[tmpName] = true return JoinKey(tmpDir, tmpName), tmpName } -func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) { +func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash clitypes.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() defer delete(s.workingTempFiles, filepath.Base(tempFilePath)) @@ -334,7 +308,7 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { }), nil } -func (s *ShardStore) Info(hash cdssdk.FileHash) (types.FileInfo, error) { +func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() @@ -361,7 +335,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - blockDir := JoinKey(s.cfg.Root, BlocksDir) + blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir) var marker *string for { @@ -379,7 +353,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { for _, obj := range resp.Contents { key := BaseKey(*obj.Key) - fileHash, err := cdssdk.ParseHash(key) + fileHash, err := clitypes.ParseHash(key) if err != nil { continue } @@ -401,16 +375,16 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { return infos, nil } -func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { +func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { s.lock.Lock() defer s.lock.Unlock() - avais := make(map[cdssdk.FileHash]bool) + avais := make(map[clitypes.FileHash]bool) for _, hash := range avaiables { avais[hash] = true } - blockDir := JoinKey(s.cfg.Root, BlocksDir) + blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir) var deletes []s3types.ObjectIdentifier var marker *string @@ -428,7 +402,7 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { for _, obj := range resp.Contents { key := BaseKey(*obj.Key) - fileHash, err := cdssdk.ParseHash(key) + fileHash, err := clitypes.ParseHash(key) if err != nil { continue } @@ -476,15 +450,15 @@ func (s *ShardStore) Stats() types.Stats { } func (s *ShardStore) getLogger() logger.Logger { - return logger.WithField("ShardStore", "S3").WithField("Storage", s.Detail.Storage.String()) + return logger.WithField("ShardStore", "S3").WithField("UserSpace", s.Detail) } -func (s *ShardStore) GetFileDirFromHash(hash cdssdk.FileHash) string { - return JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2)) +func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string { + return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2)) } -func (s *ShardStore) GetFilePathFromHash(hash cdssdk.FileHash) string { - return JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2), string(hash)) +func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string { + return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2), string(hash)) } var _ types.BypassWrite = (*ShardStore)(nil) @@ -526,7 +500,7 @@ func (s *ShardStore) BypassUploaded(info types.BypassUploadedFile) error { var _ types.BypassRead = (*ShardStore)(nil) -func (s *ShardStore) BypassRead(fileHash cdssdk.FileHash) (types.BypassFilePath, error) { +func (s *ShardStore) BypassRead(fileHash clitypes.FileHash) (types.BypassFilePath, error) { s.lock.Lock() defer s.lock.Unlock() diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go index fe9a30e..64f06b9 100644 --- a/common/pkgs/storage/types/empty_builder.go +++ b/common/pkgs/storage/types/empty_builder.go @@ -11,7 +11,7 @@ type EmptyBuilder struct { } func (b *EmptyBuilder) FeatureDesc() FeatureDesc { - return &EmptyFeatureDesc{} + return FeatureDesc{} } func (b *EmptyBuilder) CreateShardStore() (ShardStore, error) { return nil, fmt.Errorf("create shard store for %T: %w", b.Detail.Storage.Type, ErrUnsupported) @@ -32,22 +32,3 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) { func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) { return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported) } - -type EmptyFeatureDesc struct { -} - -func (d *EmptyFeatureDesc) Enabled() bool { - return false -} - -func (d *EmptyFeatureDesc) HasBypassWrite() bool { - return false -} - -func (d *EmptyFeatureDesc) HasBypassRead() bool { - return false -} - -func (d *EmptyFeatureDesc) HasBypassHTTPRead() bool { - return false -} diff --git a/common/pkgs/storage/types/s2s.go b/common/pkgs/storage/types/s2s.go index 30697fe..9db3c98 100644 --- a/common/pkgs/storage/types/s2s.go +++ b/common/pkgs/storage/types/s2s.go @@ -8,9 +8,9 @@ import ( type S2STransfer interface { // 判断是否能从指定的源存储中直传到当前存储的目的路径。仅在生成计划时使用 - CanTransfer(src clitypes.UserSpaceDetail) bool + CanTransfer(src *clitypes.UserSpaceDetail) bool // 执行数据直传。返回传输后的文件路径 - Transfer(ctx context.Context, src clitypes.UserSpaceDetail, srcPath string) (string, error) + Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string) (string, error) // 完成传输 Complete() // 取消传输。如果已经调用了Complete,则这个方法应该无效果 diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 289b77f..e782722 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -31,13 +31,11 @@ type StorageBuilder interface { CreateECMultiplier() (ECMultiplier, error) } -type FeatureDesc interface { - // 是否已启动 - Enabled() bool +type FeatureDesc struct { // 是否能旁路上传 - HasBypassWrite() bool + HasBypassWrite bool // 是否能旁路读取 - HasBypassRead() bool + HasBypassRead bool // 是否能通过HTTP读取 - HasBypassHTTPRead() bool + HasBypassHTTPRead bool }