From bbc0cf5adf14cd4a77a7c9a8c227a01d199d2224 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 7 Apr 2025 15:20:28 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4ioswitch=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/agent_worker.go | 8 +- common/pkgs/ioswitch2/fromto.go | 38 ++++---- common/pkgs/ioswitch2/http_hub_worker.go | 8 +- common/pkgs/ioswitch2/ops2/bypass.go | 66 ++++++------- common/pkgs/ioswitch2/ops2/ec.go | 32 ++++--- common/pkgs/ioswitch2/ops2/multipart.go | 40 ++++---- common/pkgs/ioswitch2/ops2/public_store.go | 25 +++-- common/pkgs/ioswitch2/ops2/s2s.go | 21 +++-- common/pkgs/ioswitch2/ops2/shard_store.go | 63 +++++++------ common/pkgs/ioswitch2/parser/gen/generator.go | 18 ++-- common/pkgs/ioswitch2/parser/opt/ec.go | 18 ++-- common/pkgs/ioswitch2/parser/opt/multipart.go | 10 +- common/pkgs/ioswitch2/parser/opt/s2s.go | 16 ++-- .../ioswitch2/plans/complete_multipart.go | 14 +-- common/pkgs/ioswitch2/plans/utils.go | 8 +- common/pkgs/ioswitchlrc/agent_worker.go | 6 +- common/pkgs/ioswitchlrc/fromto.go | 24 ++--- common/pkgs/ioswitchlrc/ops2/ops.go | 2 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 78 ++++++++-------- common/pkgs/ioswitchlrc/parser/passes.go | 10 +- common/pkgs/storage/pool/pool.go | 92 +++++++++++++++++++ 21 files changed, 354 insertions(+), 243 deletions(-) create mode 100644 common/pkgs/storage/pool/pool.go diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go index 7ee72ad..9846d35 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -6,11 +6,11 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/types" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( @@ -19,8 +19,8 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.Wo ))) type AgentWorker struct { - Hub cdssdk.Hub - Address cdssdk.GRPCAddressInfo + Hub cortypes.Hub + Address cortypes.GRPCAddressInfo } func (w *AgentWorker) NewClient() (exec.WorkerClient, error) { @@ -46,7 +46,7 @@ func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool { } type AgentWorkerClient struct { - hubID cdssdk.HubID + hubID cortypes.HubID cli *agtrpc.PoolClient } diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index e52a820..74f7af0 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -2,9 +2,9 @@ package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type From interface { @@ -69,9 +69,9 @@ type FromTos []FromTo type FromTo struct { // 如果输入或者输出用到了EC编码的流,则需要提供EC参数。 - ECParam *cdssdk.ECRedundancy + ECParam *types.ECRedundancy // 同上 - SegmentParam *cdssdk.SegmentRedundancy + SegmentParam *types.SegmentRedundancy Froms []From Toes []To } @@ -110,17 +110,17 @@ func (f *FromDriver) GetStreamIndex() StreamIndex { } type FromShardstore struct { - FileHash cdssdk.FileHash - Hub cdssdk.Hub - Storage stgmod.StorageDetail + FileHash types.FileHash + Hub cortypes.Hub + Space types.UserSpaceDetail StreamIndex StreamIndex } -func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage stgmod.StorageDetail, strIdx StreamIndex) *FromShardstore { +func NewFromShardstore(fileHash types.FileHash, hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex) *FromShardstore { return &FromShardstore{ FileHash: fileHash, Hub: hub, - Storage: storage, + Space: space, StreamIndex: strIdx, } } @@ -161,26 +161,26 @@ func (t *ToDriver) GetRange() math2.Range { } type ToShardStore struct { - Hub cdssdk.Hub - Storage stgmod.StorageDetail + Hub cortypes.Hub + Space types.UserSpaceDetail StreamIndex StreamIndex Range math2.Range FileHashStoreKey string } -func NewToShardStore(hub cdssdk.Hub, stg stgmod.StorageDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore { +func NewToShardStore(hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore { return &ToShardStore{ Hub: hub, - Storage: stg, + Space: space, StreamIndex: strIdx, FileHashStoreKey: fileHashStoreKey, } } -func NewToShardStoreWithRange(hub cdssdk.Hub, stg stgmod.StorageDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore { +func NewToShardStoreWithRange(hub cortypes.Hub, space types.UserSpaceDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore { return &ToShardStore{ Hub: hub, - Storage: stg, + Space: space, StreamIndex: streamIndex, FileHashStoreKey: fileHashStoreKey, Range: rng, @@ -196,15 +196,15 @@ func (t *ToShardStore) GetRange() math2.Range { } type LoadToPublic struct { - Hub cdssdk.Hub - Storage stgmod.StorageDetail + Hub cortypes.Hub + Space types.UserSpaceDetail ObjectPath string } -func NewLoadToPublic(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToPublic { +func NewLoadToPublic(hub cortypes.Hub, space types.UserSpaceDetail, objectPath string) *LoadToPublic { return &LoadToPublic{ Hub: hub, - Storage: storage, + Space: space, ObjectPath: objectPath, } } diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index c1ad220..2e1447a 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -6,18 +6,18 @@ import ( "strconv" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type HttpHubWorker struct { - Hub cdssdk.Hub + Hub cortypes.Hub } func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { - addressInfo := w.Hub.Address.(*cdssdk.HttpAddressInfo) + addressInfo := w.Hub.Address.(*cortypes.HttpAddressInfo) baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) config := cdsapi.Config{ URL: baseUrl, @@ -46,7 +46,7 @@ func (w *HttpHubWorker) Equals(worker exec.WorkerInfo) bool { } type HttpHubWorkerClient struct { - hubID cdssdk.HubID + hubID cortypes.HubID cli *cdsapi.Client } diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index 2abe0a2..9ef200e 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -5,8 +5,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -43,26 +43,26 @@ func (r *BypassHandleResultValue) Clone() exec.VarValue { } type BypassToShardStore struct { - StorageID cdssdk.StorageID + UserSpace clitypes.UserSpaceDetail BypassFileInfo exec.VarID BypassCallback exec.VarID FileHash exec.VarID } func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { return err } - shardStore, err := stgAgts.GetShardStore(o.StorageID) + shardStore, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { return err } br, ok := shardStore.(types.BypassWrite) if !ok { - return fmt.Errorf("shard store %v not support bypass write", o.StorageID) + return fmt.Errorf("shard store %v not support bypass write", o.UserSpace) } fileInfo, err := exec.BindVar[*BypassUploadedFileValue](e, ctx.Context, o.BypassFileInfo) @@ -81,7 +81,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er } func (o *BypassToShardStore) String() string { - return fmt.Sprintf("BypassToShardStore[StorageID:%v] Info: %v, Callback: %v", o.StorageID, o.BypassFileInfo, o.BypassCallback) + return fmt.Sprintf("BypassToShardStore[UserSpace:%v] Info: %v, Callback: %v", o.UserSpace, o.BypassFileInfo, o.BypassCallback) } type BypassFilePathValue struct { @@ -95,25 +95,25 @@ func (v *BypassFilePathValue) Clone() exec.VarValue { } type BypassFromShardStore struct { - StorageID cdssdk.StorageID - FileHash cdssdk.FileHash + UserSpace clitypes.UserSpaceDetail + FileHash clitypes.FileHash Output exec.VarID } func (o *BypassFromShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { return err } - shardStore, err := stgAgts.GetShardStore(o.StorageID) + shardStore, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { return err } br, ok := shardStore.(types.BypassRead) if !ok { - return fmt.Errorf("shard store %v not support bypass read", o.StorageID) + return fmt.Errorf("shard store %v not support bypass read", o.UserSpace) } path, err := br.BypassRead(o.FileHash) @@ -126,13 +126,13 @@ func (o *BypassFromShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) } func (o *BypassFromShardStore) String() string { - return fmt.Sprintf("BypassFromShardStore[StorageID:%v] FileHash: %v, Output: %v", o.StorageID, o.FileHash, o.Output) + return fmt.Sprintf("BypassFromShardStore[UserSpace:%v] FileHash: %v, Output: %v", o.UserSpace, o.FileHash, o.Output) } // 旁路Http读取 type BypassFromShardStoreHTTP struct { - StorageID cdssdk.StorageID - FileHash cdssdk.FileHash + UserSpace clitypes.UserSpaceDetail + FileHash clitypes.FileHash Output exec.VarID } @@ -147,19 +147,19 @@ func (v *HTTPRequestValue) Clone() exec.VarValue { } func (o *BypassFromShardStoreHTTP) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { return err } - shardStore, err := stgAgts.GetShardStore(o.StorageID) + shardStore, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { return err } br, ok := shardStore.(types.HTTPBypassRead) if !ok { - return fmt.Errorf("shard store %v not support bypass read", o.StorageID) + return fmt.Errorf("shard store %v not support bypass read", o.UserSpace) } req, err := br.HTTPBypassRead(o.FileHash) @@ -172,19 +172,19 @@ func (o *BypassFromShardStoreHTTP) Execute(ctx *exec.ExecContext, e *exec.Execut } func (o *BypassFromShardStoreHTTP) String() string { - return fmt.Sprintf("BypassFromShardStoreHTTP[StorageID:%v] FileHash: %v, Output: %v", o.StorageID, o.FileHash, o.Output) + return fmt.Sprintf("BypassFromShardStoreHTTP[UserSpace:%v] FileHash: %v, Output: %v", o.UserSpace, o.FileHash, o.Output) } // 旁路写入 type BypassToShardStoreNode struct { dag.NodeBase - StorageID cdssdk.StorageID + UserSpace clitypes.UserSpaceDetail FileHashStoreKey string } -func (b *GraphNodeBuilder) NewBypassToShardStore(storageID cdssdk.StorageID, fileHashStoreKey string) *BypassToShardStoreNode { +func (b *GraphNodeBuilder) NewBypassToShardStore(userSpace clitypes.UserSpaceDetail, fileHashStoreKey string) *BypassToShardStoreNode { node := &BypassToShardStoreNode{ - StorageID: storageID, + UserSpace: userSpace, FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) @@ -217,7 +217,7 @@ func (n *BypassToShardStoreNode) FileHashVar() dag.ValueOutputSlot { func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) { return &BypassToShardStore{ - StorageID: t.StorageID, + UserSpace: t.UserSpace, BypassFileInfo: t.BypassFileInfoSlot().Var().VarID, BypassCallback: t.BypassCallbackVar().Var().VarID, FileHash: t.FileHashVar().Var().VarID, @@ -227,13 +227,13 @@ func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) { // 旁路读取 type BypassFromShardStoreNode struct { dag.NodeBase - StorageID cdssdk.StorageID - FileHash cdssdk.FileHash + UserSpace clitypes.UserSpaceDetail + FileHash clitypes.FileHash } -func (b *GraphNodeBuilder) NewBypassFromShardStore(storageID cdssdk.StorageID, fileHash cdssdk.FileHash) *BypassFromShardStoreNode { +func (b *GraphNodeBuilder) NewBypassFromShardStore(userSpace clitypes.UserSpaceDetail, fileHash clitypes.FileHash) *BypassFromShardStoreNode { node := &BypassFromShardStoreNode{ - StorageID: storageID, + UserSpace: userSpace, FileHash: fileHash, } b.AddNode(node) @@ -251,7 +251,7 @@ func (n *BypassFromShardStoreNode) FilePathVar() dag.ValueOutputSlot { func (n *BypassFromShardStoreNode) GenerateOp() (exec.Op, error) { return &BypassFromShardStore{ - StorageID: n.StorageID, + UserSpace: n.UserSpace, FileHash: n.FileHash, Output: n.FilePathVar().Var().VarID, }, nil @@ -260,13 +260,13 @@ func (n *BypassFromShardStoreNode) GenerateOp() (exec.Op, error) { // 旁路Http读取 type BypassFromShardStoreHTTPNode struct { dag.NodeBase - StorageID cdssdk.StorageID - FileHash cdssdk.FileHash + UserSpace clitypes.UserSpaceDetail + FileHash clitypes.FileHash } -func (b *GraphNodeBuilder) NewBypassFromShardStoreHTTP(storageID cdssdk.StorageID, fileHash cdssdk.FileHash) *BypassFromShardStoreHTTPNode { +func (b *GraphNodeBuilder) NewBypassFromShardStoreHTTP(userSpace clitypes.UserSpaceDetail, fileHash clitypes.FileHash) *BypassFromShardStoreHTTPNode { node := &BypassFromShardStoreHTTPNode{ - StorageID: storageID, + UserSpace: userSpace, FileHash: fileHash, } b.AddNode(node) @@ -284,7 +284,7 @@ func (n *BypassFromShardStoreHTTPNode) HTTPRequestVar() dag.ValueOutputSlot { func (n *BypassFromShardStoreHTTPNode) GenerateOp() (exec.Op, error) { return &BypassFromShardStoreHTTP{ - StorageID: n.StorageID, + UserSpace: n.UserSpace, FileHash: n.FileHash, Output: n.HTTPRequestVar().Var().VarID, }, nil diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index be979c5..3658177 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -8,13 +8,12 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sync2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -159,7 +158,7 @@ func (o *ECMultiply) String() string { } type CallECMultiplier struct { - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail Coef [][]byte Inputs []exec.VarID Outputs []exec.VarID @@ -168,7 +167,12 @@ type CallECMultiplier struct { } func (o *CallECMultiplier) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - ecMul, err := factory.GetBuilder(o.Storage).CreateECMultiplier() + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) + if err != nil { + return fmt.Errorf("getting storage pool: %w", err) + } + + ecMul, err := stgPool.GetECMultiplier(&o.UserSpace) if err != nil { return err } @@ -218,9 +222,9 @@ func (o *CallECMultiplier) Execute(ctx *exec.ExecContext, e *exec.Executor) erro func (o *CallECMultiplier) String() string { return fmt.Sprintf( - "CallECMultiplier(storage=%v, coef=%v) (%v) -> (%v)", + "CallECMultiplier(userSpace=%v, coef=%v) (%v) -> (%v)", o.Coef, - o.Storage.Storage.String(), + o.UserSpace, utils.FormatVarIDs(o.Inputs), utils.FormatVarIDs(o.Outputs), ) @@ -228,12 +232,12 @@ func (o *CallECMultiplier) String() string { type ECMultiplyNode struct { dag.NodeBase - EC cdssdk.ECRedundancy + EC clitypes.ECRedundancy InputIndexes []int OutputIndexes []int } -func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode { +func (b *GraphNodeBuilder) NewECMultiply(ec clitypes.ECRedundancy) *ECMultiplyNode { node := &ECMultiplyNode{ EC: ec, } @@ -282,15 +286,15 @@ func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { type CallECMultiplierNode struct { dag.NodeBase - Storage stgmod.StorageDetail - EC cdssdk.ECRedundancy + UserSpace clitypes.UserSpaceDetail + EC clitypes.ECRedundancy InputIndexes []int OutputIndexes []int } -func (b *GraphNodeBuilder) NewCallECMultiplier(storage stgmod.StorageDetail) *CallECMultiplierNode { +func (b *GraphNodeBuilder) NewCallECMultiplier(userSpace clitypes.UserSpaceDetail) *CallECMultiplierNode { node := &CallECMultiplierNode{ - Storage: storage, + UserSpace: userSpace, } b.AddNode(node) return node @@ -337,7 +341,7 @@ func (t *CallECMultiplierNode) GenerateOp() (exec.Op, error) { } return &CallECMultiplier{ - Storage: t.Storage, + UserSpace: t.UserSpace, Coef: coef, Inputs: t.InputValues().GetVarIDsRanged(0, len(t.InputIndexes)), Outputs: t.OutputValues().GetVarIDs(), diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index e673a53..af10b1a 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -7,8 +7,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" log "gitlink.org.cn/cloudream/common/pkgs/logger" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -40,7 +40,7 @@ func (v *UploadedPartInfoValue) Clone() exec.VarValue { } type MultipartInitiator struct { - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail UploadArgs exec.VarID UploadedParts []exec.VarID BypassFileOutput exec.VarID // 分片上传之后的临时文件的路径 @@ -48,8 +48,12 @@ type MultipartInitiator struct { } func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - blder := factory.GetBuilder(o.Storage) - multi, err := blder.CreateMultiparter() + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) + if err != nil { + return fmt.Errorf("getting storage pool: %w", err) + } + + multi, err := stgPool.GetMultiparter(&o.UserSpace) if err != nil { return err } @@ -106,7 +110,7 @@ func (o *MultipartInitiator) String() string { } type MultipartUpload struct { - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail UploadArgs exec.VarID UploadResult exec.VarID PartStream exec.VarID @@ -115,7 +119,11 @@ type MultipartUpload struct { } func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - blder := factory.GetBuilder(o.Storage) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) + if err != nil { + return fmt.Errorf("getting storage pool: %w", err) + } + uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs) if err != nil { return err @@ -127,7 +135,7 @@ func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error } defer partStr.Stream.Close() - multi, err := blder.CreateMultiparter() + multi, err := stgPool.GetMultiparter(&o.UserSpace) if err != nil { return err } @@ -152,12 +160,12 @@ func (o *MultipartUpload) String() string { type MultipartInitiatorNode struct { dag.NodeBase - Storage stgmod.StorageDetail `json:"storageID"` + UserSpace clitypes.UserSpaceDetail } -func (b *GraphNodeBuilder) NewMultipartInitiator(storage stgmod.StorageDetail) *MultipartInitiatorNode { +func (b *GraphNodeBuilder) NewMultipartInitiator(userSpace clitypes.UserSpaceDetail) *MultipartInitiatorNode { node := &MultipartInitiatorNode{ - Storage: storage, + UserSpace: userSpace, } b.AddNode(node) @@ -196,7 +204,7 @@ func (n *MultipartInitiatorNode) AppendPartInfoSlot() dag.ValueInputSlot { func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) { return &MultipartInitiator{ - Storage: n.Storage, + UserSpace: n.UserSpace, UploadArgs: n.UploadArgsVar().Var().VarID, UploadedParts: n.InputValues().GetVarIDsStart(1), BypassFileOutput: n.BypassFileInfoVar().Var().VarID, @@ -206,14 +214,14 @@ func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) { type MultipartUploadNode struct { dag.NodeBase - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail PartNumber int PartSize int64 } -func (b *GraphNodeBuilder) NewMultipartUpload(stg stgmod.StorageDetail, partNumber int, partSize int64) *MultipartUploadNode { +func (b *GraphNodeBuilder) NewMultipartUpload(userSpace clitypes.UserSpaceDetail, partNumber int, partSize int64) *MultipartUploadNode { node := &MultipartUploadNode{ - Storage: stg, + UserSpace: userSpace, PartNumber: partNumber, PartSize: partSize, } @@ -248,7 +256,7 @@ func (n *MultipartUploadNode) PartStreamSlot() dag.StreamInputSlot { func (n *MultipartUploadNode) GenerateOp() (exec.Op, error) { return &MultipartUpload{ - Storage: n.Storage, + UserSpace: n.UserSpace, UploadArgs: n.UploadArgsSlot().Var().VarID, UploadResult: n.UploadResultVar().Var().VarID, PartStream: n.PartStreamSlot().Var().VarID, diff --git a/common/pkgs/ioswitch2/ops2/public_store.go b/common/pkgs/ioswitch2/ops2/public_store.go index ac261f2..1229c76 100644 --- a/common/pkgs/ioswitch2/ops2/public_store.go +++ b/common/pkgs/ioswitch2/ops2/public_store.go @@ -6,10 +6,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) func init() { @@ -18,7 +17,7 @@ func init() { type PublicLoad struct { Input exec.VarID - StorageID cdssdk.StorageID + UserSpace clitypes.UserSpaceDetail ObjectPath string } @@ -28,14 +27,14 @@ func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("load file to public store") defer logger.Debugf("load file to public store finished") - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { - return fmt.Errorf("getting storage manager: %w", err) + return fmt.Errorf("getting storage pool: %w", err) } - store, err := stgAgts.GetPublicStore(o.StorageID) + store, err := stgPool.GetPublicStore(&o.UserSpace) if err != nil { - return fmt.Errorf("getting public store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting public store of storage %v: %w", o.UserSpace, err) } input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) @@ -48,20 +47,20 @@ func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *PublicLoad) String() string { - return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath) + return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.UserSpace, o.ObjectPath) } type PublicLoadNode struct { dag.NodeBase To ioswitch2.To - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail ObjectPath string } -func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *PublicLoadNode { +func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, objPath string) *PublicLoadNode { node := &PublicLoadNode{ To: to, - Storage: stg, + UserSpace: userSpace, ObjectPath: objPath, } b.AddNode(node) @@ -88,7 +87,7 @@ func (t *PublicLoadNode) Input() dag.StreamInputSlot { func (t *PublicLoadNode) GenerateOp() (exec.Op, error) { return &PublicLoad{ Input: t.InputStreams().Get(0).VarID, - StorageID: t.Storage.Storage.StorageID, + UserSpace: t.UserSpace, ObjectPath: t.ObjectPath, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index 49be5c4..7747e19 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -5,8 +5,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -15,9 +15,9 @@ func init() { } type S2STransfer struct { - Src stgmod.StorageDetail + Src clitypes.UserSpaceDetail SrcPath exec.VarID - Dst stgmod.StorageDetail + Dst clitypes.UserSpaceDetail Output exec.VarID BypassCallback exec.VarID } @@ -28,7 +28,12 @@ func (o *S2STransfer) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return err } - s2s, err := factory.GetBuilder(o.Dst).CreateS2STransfer() + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) + if err != nil { + return fmt.Errorf("getting storage pool: %w", err) + } + + s2s, err := stgPool.GetS2STransfer(&o.Dst) if err != nil { return err } @@ -66,11 +71,11 @@ func (o *S2STransfer) String() string { type S2STransferNode struct { dag.NodeBase - Src stgmod.StorageDetail - Dst stgmod.StorageDetail + Src clitypes.UserSpaceDetail + Dst clitypes.UserSpaceDetail } -func (b *GraphNodeBuilder) NewS2STransfer(src stgmod.StorageDetail, dst stgmod.StorageDetail) *S2STransferNode { +func (b *GraphNodeBuilder) NewS2STransfer(src, dst clitypes.UserSpaceDetail) *S2STransferNode { n := &S2STransferNode{ Src: src, Dst: dst, diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 518119f..55da375 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -8,11 +8,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -23,8 +22,8 @@ func init() { } type ShardInfoValue struct { - Hash cdssdk.FileHash `json:"hash"` - Size int64 `json:"size"` + Hash clitypes.FileHash `json:"hash"` + Size int64 `json:"size"` } func (v *ShardInfoValue) Clone() exec.VarValue { @@ -32,9 +31,9 @@ func (v *ShardInfoValue) Clone() exec.VarValue { } type ShardRead struct { - Output exec.VarID `json:"output"` - StorageID cdssdk.StorageID `json:"storageID"` - Open types.OpenOption `json:"option"` + Output exec.VarID + UserSpace clitypes.UserSpaceDetail + Open types.OpenOption } func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -43,14 +42,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { - return fmt.Errorf("getting storage manager: %w", err) + return fmt.Errorf("getting storage pool: %w", err) } - store, err := stgAgts.GetShardStore(o.StorageID) + store, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err) } file, err := store.Open(o.Open) @@ -73,26 +72,26 @@ func (o *ShardRead) String() string { } type ShardWrite struct { - Input exec.VarID `json:"input"` - FileHash exec.VarID `json:"fileHash"` - StorageID cdssdk.StorageID `json:"storageID"` + Input exec.VarID + FileHashVar exec.VarID + UserSpace clitypes.UserSpaceDetail } func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Input", o.Input). - WithField("FileHash", o.FileHash). + WithField("FileHash", o.FileHashVar). Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { - return fmt.Errorf("getting storage manager: %w", err) + return fmt.Errorf("getting storage pool: %w", err) } - store, err := stgAgts.GetShardStore(o.StorageID) + store, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err) } input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) @@ -106,7 +105,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("writing file to shard store: %w", err) } - e.PutVar(o.FileHash, &ShardInfoValue{ + e.PutVar(o.FileHashVar, &ShardInfoValue{ Hash: fileInfo.Hash, Size: fileInfo.Size, }) @@ -114,20 +113,20 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *ShardWrite) String() string { - return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHash) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHashVar) } type ShardReadNode struct { dag.NodeBase From *ioswitch2.FromShardstore - StorageID cdssdk.StorageID + UserSpace clitypes.UserSpaceDetail Open types.OpenOption } -func (b *GraphNodeBuilder) NewShardRead(fr *ioswitch2.FromShardstore, stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(fr *ioswitch2.FromShardstore, userSpace clitypes.UserSpaceDetail, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ From: fr, - StorageID: stgID, + UserSpace: userSpace, Open: open, } b.AddNode(node) @@ -150,7 +149,7 @@ func (t *ShardReadNode) Output() dag.StreamOutputSlot { func (t *ShardReadNode) GenerateOp() (exec.Op, error) { return &ShardRead{ Output: t.OutputStreams().Get(0).VarID, - StorageID: t.StorageID, + UserSpace: t.UserSpace, Open: t.Open, }, nil } @@ -162,14 +161,14 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { type ShardWriteNode struct { dag.NodeBase To *ioswitch2.ToShardStore - Storage stgmod.StorageDetail + UserSpace clitypes.UserSpaceDetail FileHashStoreKey string } -func (b *GraphNodeBuilder) NewShardWrite(to *ioswitch2.ToShardStore, stg stgmod.StorageDetail, fileHashStoreKey string) *ShardWriteNode { +func (b *GraphNodeBuilder) NewShardWrite(to *ioswitch2.ToShardStore, userSpace clitypes.UserSpaceDetail, fileHashStoreKey string) *ShardWriteNode { node := &ShardWriteNode{ To: to, - Storage: stg, + UserSpace: userSpace, FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) @@ -200,9 +199,9 @@ func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ - Input: t.InputStreams().Get(0).VarID, - FileHash: t.OutputValues().Get(0).VarID, - StorageID: t.Storage.Storage.StorageID, + Input: t.InputStreams().Get(0).VarID, + FileHashVar: t.OutputValues().Get(0).VarID, + UserSpace: t.UserSpace, }, nil } diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index 4c566f2..90453e2 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -5,13 +5,13 @@ import ( "math" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser/state" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) // 检查使用不同编码时参数是否设置到位 @@ -259,7 +259,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e switch f := f.(type) { case *ioswitch2.FromShardstore: - t := ctx.DAG.NewShardRead(f, f.Storage.Storage.StorageID, types.NewOpen(f.FileHash)) + t := ctx.DAG.NewShardRead(f, f.Space, types.NewOpen(f.FileHash)) if f.StreamIndex.IsRaw() { t.Open.WithNullableLength(repRange.Offset, repRange.Length) @@ -287,11 +287,11 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e } switch addr := f.Hub.Address.(type) { - case *cdssdk.HttpAddressInfo: + case *cortypes.HttpAddressInfo: t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub}) t.Env().Pinned = true - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr}) t.Env().Pinned = true @@ -344,7 +344,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToShardStore: - n := ctx.DAG.NewShardWrite(t, t.Storage, t.FileHashStoreKey) + n := ctx.DAG.NewShardWrite(t, t.Space, t.FileHashStoreKey) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err @@ -362,7 +362,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) return n, nil case *ioswitch2.LoadToPublic: - n := ctx.DAG.NewPublicLoad(t, t.Storage, t.ObjectPath) + n := ctx.DAG.NewPublicLoad(t, t.Space, t.ObjectPath) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err @@ -377,12 +377,12 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) } } -func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error { +func setEnvByAddress(n dag.Node, hub cortypes.Hub, addr cortypes.HubAddressInfo) error { switch addr := addr.(type) { - case *cdssdk.HttpAddressInfo: + case *cortypes.HttpAddressInfo: n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}) - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr}) default: diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 3b3000d..5768448 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -2,12 +2,12 @@ package opt import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser/state" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 @@ -69,12 +69,12 @@ func UseECMultiplier(ctx *state.GenerateState) { if to == nil { to = swNode.To - } else if to.Storage.Storage.StorageID != swNode.Storage.Storage.StorageID { + } else if to.Space.UserSpace.StorageID != swNode.UserSpace.UserSpace.StorageID { return true } swNodes = append(swNodes, swNode) } - _, err := factory.GetBuilder(to.Storage).CreateECMultiplier() + _, err := factory.GetBuilder(&to.Space).CreateECMultiplier() if err != nil { return true } @@ -88,7 +88,7 @@ func UseECMultiplier(ctx *state.GenerateState) { return true } - if !factory.GetBuilder(srNode.From.Storage).ShardStoreDesc().HasBypassHTTPRead() { + if !factory.GetBuilder(&srNode.From.Space).FeatureDesc().HasBypassHTTPRead() { return true } @@ -96,13 +96,13 @@ func UseECMultiplier(ctx *state.GenerateState) { } // 检查满足条件后,替换ECMultiply指令 - callMul := ctx.DAG.NewCallECMultiplier(to.Storage) + callMul := ctx.DAG.NewCallECMultiplier(to.Space) switch addr := to.Hub.Address.(type) { - case *cdssdk.HttpAddressInfo: + case *cortypes.HttpAddressInfo: callMul.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: to.Hub}) callMul.Env().Pinned = true - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: callMul.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: to.Hub, Address: *addr}) callMul.Env().Pinned = true @@ -119,7 +119,7 @@ func UseECMultiplier(ctx *state.GenerateState) { delete(ctx.FromNodes, srNode.From) } - hbr := ctx.DAG.NewBypassFromShardStoreHTTP(srNode.StorageID, srNode.From.FileHash) + hbr := ctx.DAG.NewBypassFromShardStoreHTTP(srNode.UserSpace, srNode.From.FileHash) hbr.Env().CopyFrom(srNode.Env()) hbr.HTTPRequestVar().ToSlot(callMul.InputSlot(i)) } @@ -128,7 +128,7 @@ func UseECMultiplier(ctx *state.GenerateState) { ctx.DAG.RemoveNode(swNode) delete(ctx.ToNodes, swNode.To) - bs := ctx.DAG.NewBypassToShardStore(to.Storage.Storage.StorageID, swNode.FileHashStoreKey) + bs := ctx.DAG.NewBypassToShardStore(to.Space, swNode.FileHashStoreKey) bs.Env().CopyFrom(swNode.Env()) callMul.OutputVar(i).ToSlot(bs.BypassFileInfoSlot()) diff --git a/common/pkgs/ioswitch2/parser/opt/multipart.go b/common/pkgs/ioswitch2/parser/opt/multipart.go index 3e7ba9d..6e96c99 100644 --- a/common/pkgs/ioswitch2/parser/opt/multipart.go +++ b/common/pkgs/ioswitch2/parser/opt/multipart.go @@ -34,7 +34,7 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { } // Join的目的地必须支持MultipartUpload功能才能替换成分片上传 - multiUpload, err := factory.GetBuilder(shardNode.Storage).CreateMultiparter() + multiUpload, err := factory.GetBuilder(&shardNode.UserSpace).CreateMultiparter() if err != nil { return true } @@ -47,7 +47,7 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { } } - initNode := ctx.DAG.NewMultipartInitiator(shardNode.Storage) + initNode := ctx.DAG.NewMultipartInitiator(shardNode.UserSpace) initNode.Env().CopyFrom(shardNode.Env()) partNumber := 1 @@ -64,7 +64,7 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { joinInput.Var().ToSlot(splitNode.InputSlot()) for i2 := 0; i2 < len(splits); i2++ { - uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, splits[i2]) + uploadNode := ctx.DAG.NewMultipartUpload(shardNode.UserSpace, partNumber, splits[i2]) uploadNode.Env().CopyFrom(joinInput.Var().Src.Env()) initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot()) @@ -75,7 +75,7 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { } } else { // 否则直接上传整个分段 - uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, size) + uploadNode := ctx.DAG.NewMultipartUpload(shardNode.UserSpace, partNumber, size) // 上传指令直接在流的产生节点执行 uploadNode.Env().CopyFrom(joinInput.Var().Src.Env()) @@ -89,7 +89,7 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { joinInput.Var().NotTo(joinNode) } - bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.Storage.Storage.StorageID, shardNode.FileHashStoreKey) + bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.UserSpace, shardNode.FileHashStoreKey) bypassNode.Env().CopyFrom(shardNode.Env()) // 分片上传Node产生的结果送到bypassNode,bypassNode将处理结果再送回分片上传Node diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index ba2e63c..aa838a7 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -20,8 +20,8 @@ func UseS2STransfer(ctx *state.GenerateState) { continue } - fromStgBld := factory.GetBuilder(fromShard.Storage) - if !fromStgBld.ShardStoreDesc().HasBypassRead() { + fromStgBld := factory.GetBuilder(&fromShard.Space) + if !fromStgBld.FeatureDesc().HasBypassRead() { continue } @@ -46,13 +46,13 @@ func UseS2STransfer(ctx *state.GenerateState) { switch dstNode := dstNode.(type) { case *ops2.ShardWriteNode: - dstStgBld := factory.GetBuilder(dstNode.Storage) - if !dstStgBld.ShardStoreDesc().HasBypassWrite() { + dstStgBld := factory.GetBuilder(&dstNode.UserSpace) + if !dstStgBld.FeatureDesc().HasBypassWrite() { failed = true break } - if !s2s.CanTransfer(dstNode.Storage) { + if !s2s.CanTransfer(dstNode.UserSpace) { failed = true break } @@ -77,17 +77,17 @@ func UseS2STransfer(ctx *state.GenerateState) { } for _, toShard := range toShards { - s2sNode := ctx.DAG.NewS2STransfer(fromShard.Storage, toShard.Storage) + s2sNode := ctx.DAG.NewS2STransfer(fromShard.Space, toShard.UserSpace) // 直传指令在目的地Hub上执行 s2sNode.Env().CopyFrom(toShard.Env()) // 先获取文件路径,送到S2S节点 - brNode := ctx.DAG.NewBypassFromShardStore(fromShard.Storage.Storage.StorageID, fromShard.FileHash) + brNode := ctx.DAG.NewBypassFromShardStore(fromShard.Space, fromShard.FileHash) brNode.Env().CopyFrom(frNode.Env()) brNode.FilePathVar().ToSlot(s2sNode.SrcPathSlot()) // 传输结果通知目的节点 - bwNode := ctx.DAG.NewBypassToShardStore(toShard.Storage.Storage.StorageID, toShard.To.FileHashStoreKey) + bwNode := ctx.DAG.NewBypassToShardStore(toShard.UserSpace, toShard.To.FileHashStoreKey) bwNode.Env().CopyFrom(toShard.Env()) s2sNode.BypassFileInfoVar().ToSlot(bwNode.BypassFileInfoSlot()) diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index 587aa12..95485f8 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -3,12 +3,12 @@ package plans import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) -func CompleteMultipart(blocks []stgmod.ObjectBlock, blockStgs []stgmod.StorageDetail, targetStg stgmod.StorageDetail, shardInfoKey string, blder *exec.PlanBuilder) error { +func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.UserSpaceDetail, targetSpace clitypes.UserSpaceDetail, shardInfoKey string, blder *exec.PlanBuilder) error { da := ops2.NewGraphNodeBuilder() sizes := make([]int64, len(blocks)) @@ -16,20 +16,20 @@ func CompleteMultipart(blocks []stgmod.ObjectBlock, blockStgs []stgmod.StorageDe sizes[i] = blk.Size } joinNode := da.NewSegmentJoin(sizes) - joinNode.Env().ToEnvWorker(getWorkerInfo(*targetStg.MasterHub)) + joinNode.Env().ToEnvWorker(getWorkerInfo(*targetSpace.MasterHub)) joinNode.Env().Pinned = true for i, blk := range blocks { - rd := da.NewShardRead(nil, blk.StorageID, types.NewOpen(blk.FileHash)) - rd.Env().ToEnvWorker(getWorkerInfo(*blockStgs[i].MasterHub)) + rd := da.NewShardRead(nil, blockSpaces[i], types.NewOpen(blk.FileHash)) + rd.Env().ToEnvWorker(getWorkerInfo(*blockSpaces[i].MasterHub)) rd.Env().Pinned = true rd.Output().ToSlot(joinNode.InputSlot(i)) } // TODO 应该采取更合理的方式同时支持Parser和直接生成DAG - wr := da.NewShardWrite(nil, targetStg, shardInfoKey) - wr.Env().ToEnvWorker(getWorkerInfo(*targetStg.MasterHub)) + wr := da.NewShardWrite(nil, targetSpace, shardInfoKey) + wr.Env().ToEnvWorker(getWorkerInfo(*targetSpace.MasterHub)) wr.Env().Pinned = true joinNode.Joined().ToSlot(wr.Input()) diff --git a/common/pkgs/ioswitch2/plans/utils.go b/common/pkgs/ioswitch2/plans/utils.go index 14fffee..5522b2b 100644 --- a/common/pkgs/ioswitch2/plans/utils.go +++ b/common/pkgs/ioswitch2/plans/utils.go @@ -2,16 +2,16 @@ package plans import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -func getWorkerInfo(hub cdssdk.Hub) exec.WorkerInfo { +func getWorkerInfo(hub cortypes.Hub) exec.WorkerInfo { switch addr := hub.Address.(type) { - case *cdssdk.HttpAddressInfo: + case *cortypes.HttpAddressInfo: return &ioswitch2.HttpHubWorker{Hub: hub} - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: return &ioswitch2.AgentWorker{Hub: hub, Address: *addr} default: diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index 3d2f30d..d0de079 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -5,9 +5,9 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) // var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( @@ -15,8 +15,8 @@ import ( // ))) type AgentWorker struct { - Hub cdssdk.Hub - Address cdssdk.GRPCAddressInfo + Hub cortypes.Hub + Address cortypes.GRPCAddressInfo } func (w *AgentWorker) NewClient() (exec.WorkerClient, error) { diff --git a/common/pkgs/ioswitchlrc/fromto.go b/common/pkgs/ioswitchlrc/fromto.go index 8e65cab..a60cc72 100644 --- a/common/pkgs/ioswitchlrc/fromto.go +++ b/common/pkgs/ioswitchlrc/fromto.go @@ -2,8 +2,9 @@ package ioswitchlrc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type From interface { @@ -38,17 +39,18 @@ func (f *FromDriver) GetDataIndex() int { } type FromNode struct { - FileHash cdssdk.FileHash - Hub cdssdk.Hub - Storage cdssdk.Storage + FileHash clitypes.FileHash + Hub cortypes.Hub + Space clitypes.UserSpaceDetail DataIndex int } -func NewFromStorage(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage cdssdk.Storage, dataIndex int) *FromNode { +func NewFromStorage(fileHash clitypes.FileHash, hub cortypes.Hub, space clitypes.UserSpaceDetail, dataIndex int) *FromNode { return &FromNode{ FileHash: fileHash, Hub: hub, DataIndex: dataIndex, + Space: space, } } @@ -88,26 +90,26 @@ func (t *ToDriver) GetRange() math2.Range { } type ToNode struct { - Hub cdssdk.Hub - Storage cdssdk.Storage + Hub cortypes.Hub + Space clitypes.UserSpaceDetail DataIndex int Range math2.Range FileHashStoreKey string } -func NewToStorage(hub cdssdk.Hub, stg cdssdk.Storage, dataIndex int, fileHashStoreKey string) *ToNode { +func NewToStorage(hub cortypes.Hub, space clitypes.UserSpaceDetail, dataIndex int, fileHashStoreKey string) *ToNode { return &ToNode{ Hub: hub, - Storage: stg, + Space: space, DataIndex: dataIndex, FileHashStoreKey: fileHashStoreKey, } } -func NewToStorageWithRange(hub cdssdk.Hub, stg cdssdk.Storage, dataIndex int, fileHashStoreKey string, rng math2.Range) *ToNode { +func NewToStorageWithRange(hub cortypes.Hub, space clitypes.UserSpaceDetail, dataIndex int, fileHashStoreKey string, rng math2.Range) *ToNode { return &ToNode{ Hub: hub, - Storage: stg, + Space: space, DataIndex: dataIndex, FileHashStoreKey: fileHashStoreKey, Range: rng, diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index 412cb1d..87a7eb6 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -20,7 +20,7 @@ type FromNode interface { type ToNode interface { dag.Node - Input() dag.StreamOutputSlot + Input() dag.StreamInputSlot SetInput(input *dag.StreamVar) } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 57b4934..18f670f 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -8,31 +8,32 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) func init() { exec.UseOp[*ShardRead]() exec.UseOp[*ShardWrite]() - exec.UseVarValue[*FileHashValue]() + exec.UseVarValue[*ShardInfoValue]() } -type FileHashValue struct { - Hash cdssdk.FileHash `json:"hash"` +type ShardInfoValue struct { + Hash clitypes.FileHash `json:"hash"` + Size int64 `json:"size"` } -func (v *FileHashValue) Clone() exec.VarValue { - return &FileHashValue{Hash: v.Hash} +func (v *ShardInfoValue) Clone() exec.VarValue { + return &ShardInfoValue{Hash: v.Hash, Size: v.Size} } type ShardRead struct { - Output exec.VarID `json:"output"` - StorageID cdssdk.StorageID `json:"storageID"` - Open types.OpenOption `json:"option"` + Output exec.VarID + UserSpace clitypes.UserSpaceDetail + Open types.OpenOption } func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -41,14 +42,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { - return fmt.Errorf("getting storage manager: %w", err) + return fmt.Errorf("getting storage pool: %w", err) } - store, err := stgAgts.GetShardStore(o.StorageID) + store, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err) } file, err := store.Open(o.Open) @@ -71,26 +72,26 @@ func (o *ShardRead) String() string { } type ShardWrite struct { - Input exec.VarID `json:"input"` - FileHash exec.VarID `json:"fileHash"` - StorageID cdssdk.StorageID `json:"storageID"` + Input exec.VarID + FileHashVar exec.VarID + UserSpace clitypes.UserSpaceDetail } func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Input", o.Input). - WithField("FileHash", o.FileHash). + WithField("FileHash", o.FileHashVar). Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { - return fmt.Errorf("getting storage manager: %w", err) + return fmt.Errorf("getting storage pool: %w", err) } - store, err := stgAgts.GetShardStore(o.StorageID) + store, err := stgPool.GetShardStore(&o.UserSpace) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err) } input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) @@ -104,27 +105,28 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("writing file to shard store: %w", err) } - e.PutVar(o.FileHash, &FileHashValue{ + e.PutVar(o.FileHashVar, &ShardInfoValue{ Hash: fileInfo.Hash, + Size: fileInfo.Size, }) return nil } func (o *ShardWrite) String() string { - return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHash) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHashVar) } type ShardReadNode struct { dag.NodeBase - From ioswitchlrc.From - StorageID cdssdk.StorageID + From *ioswitchlrc.FromNode + UserSpace clitypes.UserSpaceDetail Open types.OpenOption } -func (b *GraphNodeBuilder) NewShardRead(fr ioswitchlrc.From, stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(fr *ioswitchlrc.FromNode, userSpace clitypes.UserSpaceDetail, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ From: fr, - StorageID: stgID, + UserSpace: userSpace, Open: open, } b.AddNode(node) @@ -147,7 +149,7 @@ func (t *ShardReadNode) Output() dag.StreamOutputSlot { func (t *ShardReadNode) GenerateOp() (exec.Op, error) { return &ShardRead{ Output: t.OutputStreams().Get(0).VarID, - StorageID: t.StorageID, + UserSpace: t.UserSpace, Open: t.Open, }, nil } @@ -158,15 +160,15 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { type ShardWriteNode struct { dag.NodeBase - To ioswitchlrc.To - StorageID cdssdk.StorageID + To *ioswitchlrc.ToNode + UserSpace clitypes.UserSpaceDetail FileHashStoreKey string } -func (b *GraphNodeBuilder) NewShardWrite(to ioswitchlrc.To, stgID cdssdk.StorageID, fileHashStoreKey string) *ShardWriteNode { +func (b *GraphNodeBuilder) NewShardWrite(to *ioswitchlrc.ToNode, userSpace clitypes.UserSpaceDetail, fileHashStoreKey string) *ShardWriteNode { node := &ShardWriteNode{ To: to, - StorageID: stgID, + UserSpace: userSpace, FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) @@ -184,8 +186,8 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *ShardWriteNode) Input() dag.StreamOutputSlot { - return dag.StreamOutputSlot{ +func (t *ShardWriteNode) Input() dag.StreamInputSlot { + return dag.StreamInputSlot{ Node: t, Index: 0, } @@ -197,9 +199,9 @@ func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ - Input: t.InputStreams().Get(0).VarID, - FileHash: t.OutputValues().Get(0).VarID, - StorageID: t.StorageID, + Input: t.InputStreams().Get(0).VarID, + FileHashVar: t.OutputValues().Get(0).VarID, + UserSpace: t.UserSpace, }, nil } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 48d7344..c9b194a 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -5,11 +5,11 @@ import ( "math" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) // 计算输入流的打开范围。会把流的范围按条带大小取整 @@ -63,7 +63,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err switch f := f.(type) { case *ioswitchlrc.FromNode: - t := ctx.DAG.NewShardRead(f, f.Storage.StorageID, types.NewOpen(f.FileHash)) + t := ctx.DAG.NewShardRead(f, f.Space, types.NewOpen(f.FileHash)) if f.DataIndex == -1 { t.Open.WithNullableLength(repRange.Offset, repRange.Length) @@ -72,7 +72,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err } // TODO2 支持HTTP协议 - t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Hub: f.Hub, Address: *f.Hub.Address.(*cdssdk.GRPCAddressInfo)}) + t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Hub: f.Hub, Address: *f.Hub.Address.(*cortypes.GRPCAddressInfo)}) t.Env().Pinned = true return t, nil @@ -100,12 +100,12 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: - n := ctx.DAG.NewShardWrite(t, t.Storage.StorageID, t.FileHashStoreKey) + n := ctx.DAG.NewShardWrite(t, t.Space, t.FileHashStoreKey) switch addr := t.Hub.Address.(type) { // case *cdssdk.HttpAddressInfo: // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub}) // TODO2 支持HTTP协议 - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Hub: t.Hub, Address: *addr}) default: diff --git a/common/pkgs/storage/pool/pool.go b/common/pkgs/storage/pool/pool.go new file mode 100644 index 0000000..2402c7a --- /dev/null +++ b/common/pkgs/storage/pool/pool.go @@ -0,0 +1,92 @@ +package pool + +import ( + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/async" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" +) + +type userSpace struct { + detail *clitypes.UserSpaceDetail + store types.ShardStore +} + +func (u *userSpace) Drop() { + if u.store != nil { + u.store.Stop() + u.store = nil + } +} + +type userSpaceKey struct { + UserID cortypes.UserID + UserSpaceID clitypes.UserSpaceID +} + +type Pool struct { + spaces map[userSpaceKey]*userSpace + lock sync.Mutex + eventChan *types.StorageEventChan +} + +func NewPool() *Pool { + return &Pool{ + spaces: make(map[userSpaceKey]*userSpace), + eventChan: async.NewUnboundChannel[types.StorageEvent](), + } +} + +func (p *Pool) GetShardStore(spaceDetail *clitypes.UserSpaceDetail) (types.ShardStore, error) { + p.lock.Lock() + defer p.lock.Unlock() + + key := userSpaceKey{ + UserID: spaceDetail.UserID, + UserSpaceID: spaceDetail.UserSpace.UserSpaceID, + } + + space := p.spaces[key] + if space == nil { + space = &userSpace{ + detail: spaceDetail, + } + p.spaces[key] = space + } + + if space.detail.UserSpace.Revision != spaceDetail.UserSpace.Revision { + space.Drop() + space.detail = spaceDetail + } + + if space.store == nil { + bld := factory.GetBuilder(spaceDetail) + store, err := bld.CreateShardStore() + if err != nil { + return nil, err + } + space.store = store + store.Start(p.eventChan) + } + + return space.store, nil +} + +func (p *Pool) GetPublicStore(spaceDetail *clitypes.UserSpaceDetail) (types.PublicStore, error) { + return factory.GetBuilder(spaceDetail).CreatePublicStore() +} + +func (p *Pool) GetMultiparter(spaceDetail *clitypes.UserSpaceDetail) (types.Multiparter, error) { + return factory.GetBuilder(spaceDetail).CreateMultiparter() +} + +func (p *Pool) GetS2STransfer(spaceDetail *clitypes.UserSpaceDetail) (types.S2STransfer, error) { + return factory.GetBuilder(spaceDetail).CreateS2STransfer() +} + +func (p *Pool) GetECMultiplier(spaceDetail *clitypes.UserSpaceDetail) (types.ECMultiplier, error) { + return factory.GetBuilder(spaceDetail).CreateECMultiplier() +}