diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 4594910..8f0216a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -43,7 +43,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { rt := s.engine.Use() - // initTemp(rt, s) + initTemp(rt, s) s.routeV1(s.engine) diff --git a/client/internal/http/temp.go b/client/internal/http/temp.go index 3c0b06e..9b46e44 100644 --- a/client/internal/http/temp.go +++ b/client/internal/http/temp.go @@ -1,6 +1,5 @@ package http -/* import ( "net/http" @@ -96,7 +95,7 @@ type TempGetObjectDetailResp struct { type ObjectBlockDetail struct { ObjectID cdssdk.ObjectID `json:"objectID"` Type string `json:"type"` - FileHash string `json:"fileHash"` + FileHash cdssdk.FileHash `json:"fileHash"` LocationType string `json:"locationType"` LocationName string `json:"locationName"` } @@ -123,43 +122,36 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { return } - loadedHubIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID) - if err != nil { - log.Warnf("getting loaded nodes: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) - return - } - - var allHubIDs []cdssdk.HubID - allHubIDs = append(allHubIDs, details.PinnedAt...) + var allStgIDs []cdssdk.StorageID + allStgIDs = append(allStgIDs, details.PinnedAt...) for _, b := range details.Blocks { - allHubIDs = append(allHubIDs, b.StorageID) + allStgIDs = append(allStgIDs, b.StorageID) } - allHubIDs = append(allHubIDs, loadedHubIDs...) - allHubIDs = lo.Uniq(allHubIDs) + allStgIDs = lo.Uniq(allStgIDs) - getNodes, err := s.svc.NodeSvc().GetNodes(allHubIDs) + getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs) if err != nil { log.Warnf("getting nodes: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) return } - allNodes := make(map[cdssdk.HubID]*cdssdk.Node) - for _, n := range getNodes { - n2 := n - allNodes[n.HubID] = &n2 + allStgs := make(map[cdssdk.StorageID]cdssdk.Storage) + for _, n := range getStgs { + if n != nil { + allStgs[n.Storage.StorageID] = n.Storage + } } var blocks []ObjectBlockDetail - for _, hubID := range details.PinnedAt { + for _, stgID := range details.PinnedAt { blocks = append(blocks, ObjectBlockDetail{ Type: "Rep", FileHash: details.Object.FileHash, LocationType: "Agent", - LocationName: allNodes[hubID].Name, + LocationName: allStgs[stgID].Name, }) } @@ -171,7 +163,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -182,7 +174,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -193,20 +185,11 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Block", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } - for _, hubID := range loadedHubIDs { - blocks = append(blocks, ObjectBlockDetail{ - Type: "Rep", - FileHash: details.Object.FileHash, - LocationType: "Storage", - LocationName: allNodes[hubID].Name, - }) - } - ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{ Blocks: blocks, })) @@ -252,15 +235,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { return } - nodes, err := s.svc.NodeSvc().GetNodes(nil) + var allStgIDs []cdssdk.StorageID + for _, obj := range db.Objects { + allStgIDs = append(allStgIDs, obj.PinnedAt...) + for _, blk := range obj.Blocks { + allStgIDs = append(allStgIDs, blk.StorageID) + } + } + + getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs) if err != nil { log.Warnf("getting nodes: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) return } - allNodes := make(map[cdssdk.HubID]cdssdk.Node) - for _, n := range nodes { - allNodes[n.HubID] = n + allStgs := make(map[cdssdk.StorageID]cdssdk.Storage) + for _, n := range getStgs { + if n != nil { + allStgs[n.Storage.StorageID] = n.Storage + } } bkts := make(map[cdssdk.BucketID]*BucketDetail) @@ -274,25 +267,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { type PackageDetail struct { Package cdssdk.Package - Loaded []cdssdk.Node + // Loaded []cdssdk.Node } pkgs := make(map[cdssdk.PackageID]*PackageDetail) for _, pkg := range db.Packages { p := PackageDetail{ Package: pkg, - Loaded: make([]cdssdk.Node, 0), + // Loaded: make([]cdssdk.Node, 0), } - loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID) - if err != nil { - log.Warnf("getting loaded nodes: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) - return - } + // loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID) + // if err != nil { + // log.Warnf("getting loaded nodes: %s", err.Error()) + // ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) + // return + // } - for _, hubID := range loaded { - p.Loaded = append(p.Loaded, allNodes[hubID]) - } + // for _, hubID := range loaded { + // p.Loaded = append(p.Loaded, allNodes[hubID]) + // } pkgs[pkg.PackageID] = &p } @@ -316,7 +309,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: obj.Object.FileHash, LocationType: "Agent", - LocationName: allNodes[hubID].Name, + LocationName: allStgs[hubID].Name, }) } @@ -329,7 +322,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -341,7 +334,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -353,20 +346,20 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Block", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } - for _, node := range pkgs[obj.Object.PackageID].Loaded { - blocks = append(blocks, ObjectBlockDetail{ - ObjectID: obj.Object.ObjectID, - Type: "Rep", - FileHash: obj.Object.FileHash, - LocationType: "Storage", - LocationName: allNodes[node.HubID].Name, - }) - } + // for _, node := range pkgs[obj.Object.PackageID].Loaded { + // blocks = append(blocks, ObjectBlockDetail{ + // ObjectID: obj.Object.ObjectID, + // Type: "Rep", + // FileHash: obj.Object.FileHash, + // LocationType: "Storage", + // LocationName: allNodes[node.HubID].Name, + // }) + // } } @@ -390,4 +383,3 @@ func auth(ctx *gin.Context) { ctx.AbortWithStatus(http.StatusUnauthorized) } } -*/ diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 046de63..a9263eb 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -10,6 +10,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" @@ -58,6 +59,21 @@ func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model. return &getResp.Storage, nil } +func (svc *StorageService) GetDetails(stgIDs []cdssdk.StorageID) ([]*stgmod.StorageDetail, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return getResp.Storages, nil +} + func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -103,7 +119,7 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa return fmt.Errorf("unsupported download strategy: %T", strg) } - ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) + ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) // 顺便保存到同存储服务的分片存储中 if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() { ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 2c72e9b..f153776 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -195,26 +195,26 @@ func (t *ToShardStore) GetRange() math2.Range { return t.Range } -type LoadToShared struct { +type LoadToPublic struct { Hub cdssdk.Hub Storage stgmod.StorageDetail ObjectPath string } -func NewLoadToShared(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToShared { - return &LoadToShared{ +func NewLoadToPublic(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToPublic { + return &LoadToPublic{ Hub: hub, Storage: storage, ObjectPath: objectPath, } } -func (t *LoadToShared) GetStreamIndex() StreamIndex { +func (t *LoadToPublic) GetStreamIndex() StreamIndex { return StreamIndex{ Type: StreamIndexRaw, } } -func (t *LoadToShared) GetRange() math2.Range { +func (t *LoadToPublic) GetRange() math2.Range { return math2.Range{} } diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/public_store.go similarity index 61% rename from common/pkgs/ioswitch2/ops2/shared_store.go rename to common/pkgs/ioswitch2/ops2/public_store.go index 1d1bb11..8846c97 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/public_store.go @@ -13,29 +13,29 @@ import ( ) func init() { - exec.UseOp[*SharedLoad]() + exec.UseOp[*PublicLoad]() } -type SharedLoad struct { +type PublicLoad struct { Input exec.VarID StorageID cdssdk.StorageID ObjectPath string } -func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { +func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Input", o.Input). - Debugf("load file to shared store") - defer logger.Debugf("load file to shared store finished") + Debugf("load file to public store") + defer logger.Debugf("load file to public store finished") stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgAgts.GetSharedStore(o.StorageID) + store, err := stgAgts.GetPublicStore(o.StorageID) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting public store of storage %v: %w", o.StorageID, err) } input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) @@ -47,19 +47,19 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return store.Write(o.ObjectPath, input.Stream) } -func (o *SharedLoad) String() string { - return fmt.Sprintf("SharedLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath) +func (o *PublicLoad) String() string { + return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath) } -type SharedLoadNode struct { +type PublicLoadNode struct { dag.NodeBase To ioswitch2.To Storage stgmod.StorageDetail ObjectPath string } -func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *SharedLoadNode { - node := &SharedLoadNode{ +func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *PublicLoadNode { + node := &PublicLoadNode{ To: to, Storage: stg, ObjectPath: objPath, @@ -70,23 +70,23 @@ func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDeta return node } -func (t *SharedLoadNode) GetTo() ioswitch2.To { +func (t *PublicLoadNode) GetTo() ioswitch2.To { return t.To } -func (t *SharedLoadNode) SetInput(input *dag.StreamVar) { +func (t *PublicLoadNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *SharedLoadNode) Input() dag.StreamInputSlot { +func (t *PublicLoadNode) Input() dag.StreamInputSlot { return dag.StreamInputSlot{ Node: t, Index: 0, } } -func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { - return &SharedLoad{ +func (t *PublicLoadNode) GenerateOp() (exec.Op, error) { + return &PublicLoad{ Input: t.InputStreams().Get(0).VarID, StorageID: t.Storage.Storage.StorageID, ObjectPath: t.ObjectPath, diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index de35c62..b9da733 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -361,8 +361,8 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) return n, nil - case *ioswitch2.LoadToShared: - n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath) + case *ioswitch2.LoadToPublic: + n := ctx.DAG.NewPublicLoad(t, t.Storage, t.ObjectPath) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err diff --git a/common/pkgs/storage/agtpool/pool.go b/common/pkgs/storage/agtpool/pool.go index 7b5bb1a..0f93632 100644 --- a/common/pkgs/storage/agtpool/pool.go +++ b/common/pkgs/storage/agtpool/pool.go @@ -87,8 +87,8 @@ func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, err return stg.Agent.GetShardStore() } -// 查找指定Storage的SharedStore组件 -func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) { +// 查找指定Storage的PublicStore组件 +func (m *AgentPool) GetPublicStore(stgID cdssdk.StorageID) (types.PublicStore, error) { m.lock.Lock() defer m.lock.Unlock() @@ -97,5 +97,5 @@ func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, e return nil, types.ErrStorageNotFound } - return stg.Agent.GetSharedStore() + return stg.Agent.GetPublicStore() } diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go new file mode 100644 index 0000000..739c337 --- /dev/null +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -0,0 +1,93 @@ +package efile + +import ( + "fmt" + "net/url" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type ECMultiplier struct { + token string + url string + feat *cdssdk.ECMultiplierFeature + outputs []string + completed bool +} + +// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 +// 输出为每一个块文件的路径,数组长度 = len(coef) +func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) { + type Request struct { + Inputs []types.HTTPReqeust `json:"inputs"` + Outputs []string `json:"outputs"` + Coefs [][]byte `json:"coefs"` + ChunkSize int64 `json:"chunkSize"` + } + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + } + + fileName := os2.GenerateRandomFileName(10) + m.outputs = make([]string, len(coef)) + for i := range m.outputs { + m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i) + } + + u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Body: Request{ + Inputs: inputs, + Outputs: m.outputs, + Coefs: coef, + ChunkSize: chunkSize, + }, + }) + if err != nil { + return nil, err + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return nil, err + } + + if r.Code != "0" { + return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg) + } + + return m.outputs, nil +} + +// 完成计算 +func (m *ECMultiplier) Complete() { + m.completed = true +} + +// 取消计算。如果已经调用了Complete,则应该无任何影响 +func (m *ECMultiplier) Abort() { + if !m.completed { + u, err := url.JoinPath(m.url, "efile/openapi/v2/file/remove") + if err != nil { + return + } + + for _, output := range m.outputs { + http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Query: map[string]string{"paths": output}, + }) + } + } +} diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go new file mode 100644 index 0000000..af869d6 --- /dev/null +++ b/common/pkgs/storage/efile/efile.go @@ -0,0 +1,109 @@ +package efile + +import ( + "fmt" + "net/url" + "sync" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" +) + +func init() { + reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + types.EmptyBuilder + detail stgmod.StorageDetail + token string + tokenLock sync.Mutex + getTokenTime time.Time +} + +func (b *builder) getToken() (string, error) { + stgType := b.detail.Storage.Type.(*cdssdk.EFileType) + + b.tokenLock.Lock() + defer b.tokenLock.Unlock() + + if b.token != "" { + dt := time.Since(b.getTokenTime) + if dt < time.Second*time.Duration(stgType.TokenExpire) { + return b.token, nil + } + } + + u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens") + if err != nil { + return "", err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{ + "user": stgType.User, + "password": stgType.Password, + "orgId": stgType.OrgID, + }, + }) + if err != nil { + return "", err + } + + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data []struct { + ClusterID string `json:"clusterId"` + Token string `json:"token"` + } `json:"data"` + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return "", err + } + + if r.Code != "0" { + return "", fmt.Errorf("code:%s, msg:%s", r.Code, r.Msg) + } + + for _, d := range r.Data { + if d.ClusterID == stgType.ClusterID { + b.token = d.Token + b.getTokenTime = time.Now() + return d.Token, nil + } + } + + return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID) +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail) + if feat == nil { + return nil, fmt.Errorf("feature ECMultiplier not found") + } + + token, err := b.getToken() + if err != nil { + return nil, fmt.Errorf("get token: %v", err) + } + + return &ECMultiplier{ + token: token, + url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL, + feat: feat, + }, nil +} diff --git a/common/pkgs/storage/local/agent.go b/common/pkgs/storage/local/agent.go index c2e5e52..ab5f252 100644 --- a/common/pkgs/storage/local/agent.go +++ b/common/pkgs/storage/local/agent.go @@ -8,7 +8,7 @@ import ( type agent struct { Detail stgmod.StorageDetail ShardStore *ShardStore - SharedStore *SharedStore + PublicStore *PublicStore } func (s *agent) Start(ch *types.StorageEventChan) { @@ -16,8 +16,8 @@ func (s *agent) Start(ch *types.StorageEventChan) { s.ShardStore.Start(ch) } - if s.SharedStore != nil { - s.SharedStore.Start(ch) + if s.PublicStore != nil { + s.PublicStore.Start(ch) } } @@ -26,8 +26,8 @@ func (s *agent) Stop() { s.ShardStore.Stop() } - if s.SharedStore != nil { - s.SharedStore.Stop() + if s.PublicStore != nil { + s.PublicStore.Stop() } } @@ -43,10 +43,10 @@ func (a *agent) GetShardStore() (types.ShardStore, error) { return a.ShardStore, nil } -func (a *agent) GetSharedStore() (types.SharedStore, error) { - if a.SharedStore == nil { +func (a *agent) GetPublicStore() (types.PublicStore, error) { + if a.PublicStore == nil { return nil, types.ErrUnsupported } - return a.SharedStore, nil + return a.PublicStore, nil } diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index ca49db8..e1a81d7 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -19,6 +19,7 @@ func init() { } type builder struct { + types.EmptyBuilder detail stgmod.StorageDetail } @@ -41,18 +42,18 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) { agt.ShardStore = store } - if b.detail.Storage.SharedStore != nil { - local, ok := b.detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage) + if b.detail.Storage.PublicStore != nil { + local, ok := b.detail.Storage.PublicStore.(*cdssdk.LocalPublicStorage) if !ok { - return nil, fmt.Errorf("invalid shared store type %T for local storage", b.detail.Storage.SharedStore) + return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore) } - store, err := NewSharedStore(agt, *local) + store, err := NewPublicStore(agt, *local) if err != nil { return nil, err } - agt.SharedStore = store + agt.PublicStore = store } return agt, nil @@ -62,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { return &ShardStoreDesc{builder: b} } -func (b *builder) SharedStoreDesc() types.SharedStoreDesc { - return &SharedStoreDesc{builder: b} +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { + return &PublicStoreDesc{builder: b} } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/public_store.go similarity index 54% rename from common/pkgs/storage/local/shared_store.go rename to common/pkgs/storage/local/public_store.go index bfd9730..55f96b2 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/public_store.go @@ -10,39 +10,40 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type SharedStoreDesc struct { +type PublicStoreDesc struct { + types.EmptyPublicStoreDesc builder *builder } -func (d *SharedStoreDesc) Enabled() bool { - return d.builder.detail.Storage.SharedStore != nil +func (d *PublicStoreDesc) Enabled() bool { + return d.builder.detail.Storage.PublicStore != nil } -func (d *SharedStoreDesc) HasBypassWrite() bool { +func (d *PublicStoreDesc) HasBypassWrite() bool { return false } -type SharedStore struct { +type PublicStore struct { agt *agent - cfg cdssdk.LocalSharedStorage + cfg cdssdk.LocalPublicStorage } -func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) { - return &SharedStore{ +func NewPublicStore(agt *agent, cfg cdssdk.LocalPublicStorage) (*PublicStore, error) { + return &PublicStore{ agt: agt, cfg: cfg, }, nil } -func (s *SharedStore) Start(ch *types.StorageEventChan) { +func (s *PublicStore) Start(ch *types.StorageEventChan) { s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) } -func (s *SharedStore) Stop() { +func (s *PublicStore) Stop() { s.getLogger().Infof("component stop") } -func (s *SharedStore) Write(objPath string, stream io.Reader) error { +func (s *PublicStore) Write(objPath string, stream io.Reader) error { fullPath := filepath.Join(s.cfg.LoadBase, objPath) err := os.MkdirAll(filepath.Dir(fullPath), 0755) if err != nil { @@ -63,6 +64,6 @@ func (s *SharedStore) Write(objPath string, stream io.Reader) error { return nil } -func (s *SharedStore) getLogger() logger.Logger { - return logger.WithField("SharedStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) +func (s *PublicStore) getLogger() logger.Logger { + return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 278cd2b..28ed46a 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -23,6 +23,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go new file mode 100644 index 0000000..de1e383 --- /dev/null +++ b/common/pkgs/storage/mashup/mashup.go @@ -0,0 +1,75 @@ +package mashup + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func init() { + reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + detail stgmod.StorageDetail +} + +func (b *builder) CreateAgent() (types.StorageAgent, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.CreateAgent() +} + +func (b *builder) ShardStoreDesc() types.ShardStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.ShardStoreDesc() +} + +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.PublicStoreDesc() +} + +func (b *builder) CreateMultiparter() (types.Multiparter, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateMultiparter() +} + +func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateS2STransfer() +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateECMultiplier() +} diff --git a/common/pkgs/storage/s3/agent.go b/common/pkgs/storage/s3/agent.go index f33d6c0..271bcab 100644 --- a/common/pkgs/storage/s3/agent.go +++ b/common/pkgs/storage/s3/agent.go @@ -34,6 +34,6 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) { return a.ShardStore, nil } -func (a *Agent) GetSharedStore() (types.SharedStore, error) { +func (a *Agent) GetPublicStore() (types.PublicStore, error) { return nil, types.ErrUnsupported } diff --git a/common/pkgs/storage/s3/obs/s2s.go b/common/pkgs/storage/s3/obs/s2s.go index 5c28b84..6317092 100644 --- a/common/pkgs/storage/s3/obs/s2s.go +++ b/common/pkgs/storage/s3/obs/s2s.go @@ -170,5 +170,7 @@ func (s *S2STransfer) Abort() { s.omsCli.DeleteTask(&model.DeleteTaskRequest{ TaskId: fmt.Sprintf("%v", *s.taskID), }) + + // TODO 清理临时文件 } } diff --git a/common/pkgs/storage/s3/public_store.go b/common/pkgs/storage/s3/public_store.go new file mode 100644 index 0000000..f65dbcc --- /dev/null +++ b/common/pkgs/storage/s3/public_store.go @@ -0,0 +1,7 @@ +package s3 + +import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + +type PublicStoreDesc struct { + types.EmptyPublicStoreDesc +} diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index c65a19f..9ac531a 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { return &ShardStoreDesc{builder: b} } -func (b *builder) SharedStoreDesc() types.SharedStoreDesc { - return &SharedStoreDesc{} +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { + return &PublicStoreDesc{} } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 7185476..aa98e5f 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -27,6 +27,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/shared_store.go deleted file mode 100644 index bb822e8..0000000 --- a/common/pkgs/storage/s3/shared_store.go +++ /dev/null @@ -1,12 +0,0 @@ -package s3 - -type SharedStoreDesc struct { -} - -func (d *SharedStoreDesc) Enabled() bool { - return false -} - -func (d *SharedStoreDesc) HasBypassWrite() bool { - return false -} diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index 5e8d23f..9760559 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -27,3 +27,15 @@ type BypassFilePath struct { type BypassRead interface { BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error) } + +// 能通过一个Http请求直接访问文件 +// 仅用于分片存储。 +type HTTPBypassRead interface { + HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error) +} + +type HTTPReqeust struct { + SignedUrl string `json:"signedUrl"` + Header map[string]string `json:"header"` + Body string `json:"body"` +} diff --git a/common/pkgs/storage/types/ec_multiplier.go b/common/pkgs/storage/types/ec_multiplier.go new file mode 100644 index 0000000..c9d8d7b --- /dev/null +++ b/common/pkgs/storage/types/ec_multiplier.go @@ -0,0 +1,11 @@ +package types + +type ECMultiplier interface { + // 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 + // 输出为每一个块文件的路径,数组长度 = len(coef) + Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error) + // 完成计算 + Complete() + // 取消计算。如果已经调用了Complete,则应该无任何影响 + Abort() +} diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go index be0178c..667af24 100644 --- a/common/pkgs/storage/types/empty_builder.go +++ b/common/pkgs/storage/types/empty_builder.go @@ -19,8 +19,8 @@ func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc { return &EmptyShardStoreDesc{} } -func (b *EmptyBuilder) SharedStoreDesc() SharedStoreDesc { - return &EmptySharedStoreDesc{} +func (b *EmptyBuilder) PublicStoreDesc() PublicStoreDesc { + return &EmptyPublicStoreDesc{} } // 创建一个分片上传组件 @@ -32,6 +32,10 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) { return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported) } +func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) { + return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + type EmptyShardStoreDesc struct { } @@ -47,13 +51,17 @@ func (d *EmptyShardStoreDesc) HasBypassRead() bool { return false } -type EmptySharedStoreDesc struct { +func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool { + return false +} + +type EmptyPublicStoreDesc struct { } -func (d *EmptySharedStoreDesc) Enabled() bool { +func (d *EmptyPublicStoreDesc) Enabled() bool { return false } -func (d *EmptySharedStoreDesc) HasBypassWrite() bool { +func (d *EmptyPublicStoreDesc) HasBypassWrite() bool { return false } diff --git a/common/pkgs/storage/types/shared_store.go b/common/pkgs/storage/types/public_store.go similarity index 80% rename from common/pkgs/storage/types/shared_store.go rename to common/pkgs/storage/types/public_store.go index 98456e8..782f2aa 100644 --- a/common/pkgs/storage/types/shared_store.go +++ b/common/pkgs/storage/types/public_store.go @@ -4,7 +4,7 @@ import ( "io" ) -type SharedStore interface { +type PublicStore interface { Start(ch *StorageEventChan) Stop() diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 11b2516..e74a1af 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -31,7 +31,7 @@ type StorageAgent interface { // 获取分片存储服务 GetShardStore() (ShardStore, error) // 获取共享存储服务 - GetSharedStore() (SharedStore, error) + GetPublicStore() (PublicStore, error) } // 创建存储服务的指定组件。 @@ -45,11 +45,12 @@ type StorageBuilder interface { // 是否支持分片存储服务 ShardStoreDesc() ShardStoreDesc // 是否支持共享存储服务 - SharedStoreDesc() SharedStoreDesc + PublicStoreDesc() PublicStoreDesc // 创建一个分片上传组件 CreateMultiparter() (Multiparter, error) // 创建一个存储服务直传组件 CreateS2STransfer() (S2STransfer, error) + CreateECMultiplier() (ECMultiplier, error) } type ShardStoreDesc interface { @@ -59,9 +60,11 @@ type ShardStoreDesc interface { HasBypassWrite() bool // 是否能旁路读取 HasBypassRead() bool + // 是否能通过HTTP读取 + HasBypassHTTPRead() bool } -type SharedStoreDesc interface { +type PublicStoreDesc interface { // 是否已启动 Enabled() bool // 是否能旁路上传 diff --git a/common/pkgs/sysevent/config.go b/common/pkgs/sysevent/config.go new file mode 100644 index 0000000..dfe1bf5 --- /dev/null +++ b/common/pkgs/sysevent/config.go @@ -0,0 +1,8 @@ +package sysevent + +type Config struct { + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + VHost string `json:"vhost"` +} diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go new file mode 100644 index 0000000..5b42c93 --- /dev/null +++ b/common/pkgs/sysevent/publisher.go @@ -0,0 +1,119 @@ +package sysevent + +import ( + "fmt" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type PublisherEvent interface{} + +type PublisherExited struct { + Err error +} + +type PublishError struct { + Err error +} + +type OtherError struct { + Err error +} + +type Publisher struct { + connection *amqp.Connection + channel *amqp.Channel + eventChan *async.UnboundChannel[SysEvent] + thisSource Source +} + +func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("declare queue: %w", err) + } + + pub := &Publisher{ + connection: connection, + channel: channel, + eventChan: async.NewUnboundChannel[SysEvent](), + thisSource: thisSource, + } + + return pub, nil +} + +func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { + ch := async.NewUnboundChannel[PublisherEvent]() + go func() { + defer ch.Close() + defer p.channel.Close() + defer p.connection.Close() + + for { + event := <-p.eventChan.Receive().Chan() + if event.Err != nil { + if event.Err == async.ErrChannelClosed { + ch.Send(PublisherExited{Err: nil}) + } else { + ch.Send(PublisherExited{Err: event.Err}) + } + return + } + + eventData, err := serder.ObjectToJSONEx(event.Value) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("serialize event data: %w", err)}) + continue + } + + err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: eventData, + Expiration: "60000", // 消息超时时间默认1分钟 + }) + if err != nil { + ch.Send(PublishError{Err: err}) + continue + } + } + }() + + return ch +} + +// Publish 发布事件,自动补齐时间戳和源信息 +func (p *Publisher) Publish(evt SysEvent) { + // TODO 补齐时间戳和源信息 + p.eventChan.Send(evt) +} + +// PublishRaw 完全原样发布事件,不补齐任何信息 +func (p *Publisher) PublishRaw(evt SysEvent) { + p.eventChan.Send(evt) +} diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go new file mode 100644 index 0000000..082b93e --- /dev/null +++ b/common/pkgs/sysevent/sysevent.go @@ -0,0 +1,9 @@ +package sysevent + +const ( + SysEventQueueName = "SysEventQueue" +) + +type SysEvent = any // TODO 换成具体的类型 + +type Source = any // TODO 换成具体的类型 diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go new file mode 100644 index 0000000..6ffbd72 --- /dev/null +++ b/common/pkgs/sysevent/watcher.go @@ -0,0 +1,121 @@ +package sysevent + +import ( + "fmt" + "sync" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type Watcher interface { + OnEvent(event SysEvent) +} + +type WatcherEvent interface{} + +type WatcherExited struct { + Err error +} + +type WatcherHost struct { + watchers []Watcher + lock sync.Mutex + connection *amqp.Connection + channel *amqp.Channel + recvChan <-chan amqp.Delivery +} + +func NewWatcherHost(cfg Config) (*WatcherHost, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("declare queue: %w", err) + } + + recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("consume queue: %w", err) + } + + wat := &WatcherHost{ + connection: connection, + channel: channel, + recvChan: recvChan, + } + + return wat, nil +} + +func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] { + ch := async.NewUnboundChannel[WatcherEvent]() + + go func() { + defer ch.Close() + defer w.channel.Close() + defer w.connection.Close() + + for m := range w.recvChan { + evt, err := serder.JSONToObjectEx[SysEvent](m.Body) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("deserialize event: %w", err)}) + continue + } + + w.lock.Lock() + ws := make([]Watcher, 0, len(w.watchers)) + ws = append(ws, w.watchers...) + w.lock.Unlock() + + for _, w := range ws { + w.OnEvent(evt) + } + } + + ch.Send(WatcherExited{Err: nil}) + }() + + return ch +} + +func (w *WatcherHost) AddWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = append(w.watchers, watcher) +} + +func (w *WatcherHost) RemoveWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = lo2.Remove(w.watchers, watcher) +} diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 16590b4..359ea27 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -45,7 +45,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err ft.AddFrom(fromExec) for i, stg := range u.targetStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash")) - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa))) + ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa))) stgIDs = append(stgIDs, stg.Storage.StorageID) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 862b9f6..596db35 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -51,7 +51,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash")) for i, stg := range w.loadToStgs { - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) + ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) } plans := exec.NewPlanBuilder() diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index 30ea971..245e36f 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -86,8 +86,8 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff if stg.MasterHub == nil { return nil, fmt.Errorf("load to storage %v has no master hub", stgID) } - if factory.GetBuilder(stg).ShardStoreDesc().Enabled() { - return nil, fmt.Errorf("load to storage %v has no shared store", stgID) + if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() { + return nil, fmt.Errorf("load to storage %v has no public store", stgID) } loadToStgs[i] = stg