From 562ebbe68e2831ea813090ddfe0604be4a2311f2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 24 Jan 2025 09:16:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9C=A8dage=E4=B8=AD?= =?UTF-8?q?=E7=94=9F=E6=88=90ECMultiplierOp=E7=9A=84=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/test.go | 20 +-- common/pkgs/ioswitch2/ops2/bypass.go | 100 +++++++++++-- common/pkgs/ioswitch2/ops2/ec.go | 139 ++++++++++++++++++ common/pkgs/ioswitch2/ops2/multipart.go | 4 +- common/pkgs/ioswitch2/ops2/s2s.go | 8 +- common/pkgs/ioswitch2/ops2/shard_store.go | 8 +- common/pkgs/ioswitch2/parser/opt/ec.go | 103 +++++++++++++ common/pkgs/ioswitch2/parser/opt/s2s.go | 3 +- common/pkgs/ioswitch2/parser/parser.go | 1 + common/pkgs/storage/all.go | 9 ++ common/pkgs/storage/efile/ec_multiplier.go | 62 ++++++-- common/pkgs/storage/efile/efile.go | 9 +- common/pkgs/storage/factory/factory.go | 5 +- common/pkgs/storage/factory/reg/reg.go | 14 ++ common/pkgs/storage/local/multipart_upload.go | 14 +- common/pkgs/storage/local/public_store.go | 4 - common/pkgs/storage/local/shard_store.go | 16 +- common/pkgs/storage/mashup/mashup.go | 13 +- common/pkgs/storage/{s3 => obs}/agent.go | 2 +- common/pkgs/storage/{s3/s3.go => obs/obs.go} | 82 ++++++----- common/pkgs/storage/{s3 => }/obs/obs_test.go | 0 common/pkgs/storage/{s3 => }/obs/s2s.go | 6 +- common/pkgs/storage/obs/shard_store.go | 64 ++++++++ common/pkgs/storage/s3/client.go | 17 --- common/pkgs/storage/s3/multipart_upload.go | 45 +++--- common/pkgs/storage/s3/obs/client.go | 27 ---- common/pkgs/storage/s3/s3_test.go | 96 ------------ common/pkgs/storage/s3/shard_store.go | 112 +++++++------- common/pkgs/storage/s3/{utils => }/utils.go | 2 +- common/pkgs/storage/types/bypass.go | 22 +-- common/pkgs/storage/types/ec_multiplier.go | 2 +- common/pkgs/storage/types/s3_client.go | 2 +- go.mod | 3 +- go.sum | 2 + 34 files changed, 665 insertions(+), 351 deletions(-) create mode 100644 common/pkgs/storage/all.go rename common/pkgs/storage/{s3 => obs}/agent.go (98%) rename common/pkgs/storage/{s3/s3.go => obs/obs.go} (56%) rename common/pkgs/storage/{s3 => }/obs/obs_test.go (100%) rename common/pkgs/storage/{s3 => }/obs/s2s.go (95%) create mode 100644 common/pkgs/storage/obs/shard_store.go delete mode 100644 common/pkgs/storage/s3/client.go delete mode 100644 common/pkgs/storage/s3/obs/client.go delete mode 100644 common/pkgs/storage/s3/s3_test.go rename common/pkgs/storage/s3/{utils => }/utils.go (98%) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 5f7edb4..c852432 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -27,23 +27,25 @@ func init() { } defer stgglb.CoordinatorMQPool.Release(coorCli) - stgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{1, 2, 3, 4})) + stgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{1, 2, 3, 4, 5})) if err != nil { panic(err) } ft := ioswitch2.NewFromTo() - ft.SegmentParam = cdssdk.NewSegmentRedundancy(1024*100*3, 3) - // ft.AddFrom(ioswitch2.NewFromShardstore("FullE58B075E9F7C5744CB1C2CBBECC30F163DE699DCDA94641DDA34A0C2EB01E240", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0))) - // ft.AddFrom(ioswitch2.NewFromShardstore("FullEA14D17544786427C3A766F0C5E6DEB221D00D3DE1875BBE3BD0AD5C8118C1A0", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1))) + ft.ECParam = cdssdk.NewECRedundancy(3, 6, 1024*1024*5) + // ft.SegmentParam = cdssdk.NewSegmentRedundancy(1024*100*3, 3) + ft.AddFrom(ioswitch2.NewFromShardstore("FullC036CBB7553A909F8B8877D4461924307F27ECB66CFF928EEEAFD569C3887E29", *stgs.Storages[3].MasterHub, *stgs.Storages[3], ioswitch2.ECStream(2))) + ft.AddFrom(ioswitch2.NewFromShardstore("Full543F38D9F524238AC0239263AA0DD4B4328763818EA98A7A5F72E59748FDA27A", *stgs.Storages[3].MasterHub, *stgs.Storages[3], ioswitch2.ECStream(3))) + ft.AddFrom(ioswitch2.NewFromShardstore("Full50B464DB2FDDC29D0380D9FFAB6D944FAF5C7624955D757939280590F01F3ECD", *stgs.Storages[3].MasterHub, *stgs.Storages[3], ioswitch2.ECStream(4))) // ft.AddFrom(ioswitch2.NewFromShardstore("Full4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2))) - ft.AddFrom(ioswitch2.NewFromShardstore("Full03B5CF4B57251D7BB4308FE5C81AF5A21E2B28994CC7CB1FB37698DAE271DC22", *stgs.Storages[2].MasterHub, *stgs.Storages[2], ioswitch2.RawStream())) - ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[3].MasterHub, *stgs.Storages[3], ioswitch2.RawStream(), "0")) + // ft.AddFrom(ioswitch2.NewFromShardstore("Full03B5CF4B57251D7BB4308FE5C81AF5A21E2B28994CC7CB1FB37698DAE271DC22", *stgs.Storages[2].MasterHub, *stgs.Storages[2], ioswitch2.RawStream())) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[3].MasterHub, *stgs.Storages[3], ioswitch2.RawStream(), "0")) // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) // ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1", math2.Range{Offset: 1})) - // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0), "0")) - // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1")) - // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2), "2")) + ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[4].MasterHub, *stgs.Storages[4], ioswitch2.ECStream(0), "0")) + ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[4].MasterHub, *stgs.Storages[4], ioswitch2.ECStream(1), "1")) + ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[4].MasterHub, *stgs.Storages[4], ioswitch2.ECStream(5), "2")) plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans) diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index e2cb10e..8af1be7 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -12,20 +12,23 @@ import ( func init() { exec.UseOp[*BypassToShardStore]() - exec.UseVarValue[*BypassFileInfoValue]() + exec.UseVarValue[*BypassUploadedFileValue]() exec.UseVarValue[*BypassHandleResultValue]() exec.UseOp[*BypassFromShardStore]() exec.UseVarValue[*BypassFilePathValue]() + + exec.UseOp[*BypassFromShardStoreHTTP]() + exec.UseVarValue[*HTTPRequestValue]() } -type BypassFileInfoValue struct { - types.BypassFileInfo +type BypassUploadedFileValue struct { + types.BypassUploadedFile } -func (v *BypassFileInfoValue) Clone() exec.VarValue { - return &BypassFileInfoValue{ - BypassFileInfo: v.BypassFileInfo, +func (v *BypassUploadedFileValue) Clone() exec.VarValue { + return &BypassUploadedFileValue{ + BypassUploadedFile: v.BypassUploadedFile, } } @@ -62,18 +65,18 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er return fmt.Errorf("shard store %v not support bypass write", o.StorageID) } - fileInfo, err := exec.BindVar[*BypassFileInfoValue](e, ctx.Context, o.BypassFileInfo) + fileInfo, err := exec.BindVar[*BypassUploadedFileValue](e, ctx.Context, o.BypassFileInfo) if err != nil { return err } - err = br.BypassUploaded(fileInfo.BypassFileInfo) + err = br.BypassUploaded(fileInfo.BypassUploadedFile) if err != nil { return err } e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true}) - e.PutVar(o.FileHash, &FileHashValue{Hash: fileInfo.FileHash}) + e.PutVar(o.FileHash, &FileHashValue{Hash: fileInfo.Hash}) return nil } @@ -126,6 +129,52 @@ func (o *BypassFromShardStore) String() string { return fmt.Sprintf("BypassFromShardStore[StorageID:%v] FileHash: %v, Output: %v", o.StorageID, o.FileHash, o.Output) } +// 旁路Http读取 +type BypassFromShardStoreHTTP struct { + StorageID cdssdk.StorageID + FileHash cdssdk.FileHash + Output exec.VarID +} + +type HTTPRequestValue struct { + types.HTTPRequest +} + +func (v *HTTPRequestValue) Clone() exec.VarValue { + return &HTTPRequestValue{ + HTTPRequest: v.HTTPRequest, + } +} + +func (o *BypassFromShardStoreHTTP) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + if err != nil { + return err + } + + shardStore, err := stgAgts.GetShardStore(o.StorageID) + if err != nil { + return err + } + + br, ok := shardStore.(types.HTTPBypassRead) + if !ok { + return fmt.Errorf("shard store %v not support bypass read", o.StorageID) + } + + req, err := br.HTTPBypassRead(o.FileHash) + if err != nil { + return err + } + + e.PutVar(o.Output, &HTTPRequestValue{HTTPRequest: req}) + return nil +} + +func (o *BypassFromShardStoreHTTP) String() string { + return fmt.Sprintf("BypassFromShardStoreHTTP[StorageID:%v] FileHash: %v, Output: %v", o.StorageID, o.FileHash, o.Output) +} + // 旁路写入 type BypassToShardStoreNode struct { dag.NodeBase @@ -207,3 +256,36 @@ func (n *BypassFromShardStoreNode) GenerateOp() (exec.Op, error) { Output: n.FilePathVar().Var().VarID, }, nil } + +// 旁路Http读取 +type BypassFromShardStoreHTTPNode struct { + dag.NodeBase + StorageID cdssdk.StorageID + FileHash cdssdk.FileHash +} + +func (b *GraphNodeBuilder) NewBypassFromShardStoreHTTP(storageID cdssdk.StorageID, fileHash cdssdk.FileHash) *BypassFromShardStoreHTTPNode { + node := &BypassFromShardStoreHTTPNode{ + StorageID: storageID, + FileHash: fileHash, + } + b.AddNode(node) + + node.OutputValues().Init(node, 1) + return node +} + +func (n *BypassFromShardStoreHTTPNode) HTTPRequestVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } +} + +func (n *BypassFromShardStoreHTTPNode) GenerateOp() (exec.Op, error) { + return &BypassFromShardStoreHTTP{ + StorageID: n.StorageID, + FileHash: n.FileHash, + Output: n.HTTPRequestVar().Var().VarID, + }, nil +} diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 5f3a1c4..6c4e43c 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -12,11 +12,16 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sync2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) func init() { exec.UseOp[*ECMultiply]() + + exec.UseOp[*CallECMultiplier]() } type ECMultiply struct { @@ -153,6 +158,74 @@ func (o *ECMultiply) String() string { ) } +type CallECMultiplier struct { + Storage stgmod.StorageDetail + Coef [][]byte + Inputs []exec.VarID + Outputs []exec.VarID + BypassCallbacks []exec.VarID + ChunkSize int +} + +func (o *CallECMultiplier) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + ecMul, err := factory.GetBuilder(o.Storage).CreateECMultiplier() + if err != nil { + return err + } + + inputs, err := exec.BindArray[*HTTPRequestValue](e, ctx.Context, o.Inputs) + if err != nil { + return err + } + + reqs := make([]types.HTTPRequest, 0, len(inputs)) + for _, input := range inputs { + reqs = append(reqs, input.HTTPRequest) + } + + outputs, err := ecMul.Multiply(o.Coef, reqs, o.ChunkSize) + if err != nil { + return err + } + defer ecMul.Abort() + + outputVals := make([]*BypassUploadedFileValue, 0, len(outputs)) + for _, output := range outputs { + outputVals = append(outputVals, &BypassUploadedFileValue{ + BypassUploadedFile: output, + }) + } + exec.PutArray(e, o.Outputs, outputVals) + + callbacks, err := exec.BindArray[*BypassHandleResultValue](e, ctx.Context, o.BypassCallbacks) + if err != nil { + return err + } + + allSuc := true + for _, callback := range callbacks { + if !callback.Commited { + allSuc = false + } + } + + if allSuc { + ecMul.Complete() + } + + return nil +} + +func (o *CallECMultiplier) String() string { + return fmt.Sprintf( + "CallECMultiplier(storage=%v, coef=%v) (%v) -> (%v)", + o.Coef, + o.Storage.Storage.String(), + utils.FormatVarIDs(o.Inputs), + utils.FormatVarIDs(o.Outputs), + ) +} + type ECMultiplyNode struct { dag.NodeBase EC cdssdk.ECRedundancy @@ -206,3 +279,69 @@ func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { // func (t *MultiplyType) String() string { // return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) // } + +type CallECMultiplierNode struct { + dag.NodeBase + Storage stgmod.StorageDetail + EC cdssdk.ECRedundancy + InputIndexes []int + OutputIndexes []int +} + +func (b *GraphNodeBuilder) NewCallECMultiplier(storage stgmod.StorageDetail) *CallECMultiplierNode { + node := &CallECMultiplierNode{ + Storage: storage, + } + b.AddNode(node) + return node +} + +func (t *CallECMultiplierNode) InitFrom(node *ECMultiplyNode) { + t.EC = node.EC + t.InputIndexes = node.InputIndexes + t.OutputIndexes = node.OutputIndexes + + t.InputValues().Init(len(t.InputIndexes) + len(t.OutputIndexes)) // 流的输出+回调的输入 + t.OutputValues().Init(t, len(t.OutputIndexes)) +} + +func (t *CallECMultiplierNode) InputSlot(idx int) dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: t, + Index: idx, + } +} + +func (t *CallECMultiplierNode) OutputVar(idx int) dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: t, + Index: idx, + } +} + +func (t *CallECMultiplierNode) BypassCallbackSlot(idx int) dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: t, + Index: idx + len(t.InputIndexes), + } +} + +func (t *CallECMultiplierNode) GenerateOp() (exec.Op, error) { + rs, err := ec.NewRs(t.EC.K, t.EC.N) + if err != nil { + return nil, err + } + coef, err := rs.GenerateMatrix(t.InputIndexes, t.OutputIndexes) + if err != nil { + return nil, err + } + + return &CallECMultiplier{ + Storage: t.Storage, + Coef: coef, + Inputs: t.InputValues().GetVarIDsRanged(0, len(t.InputIndexes)), + Outputs: t.OutputValues().GetVarIDs(), + BypassCallbacks: t.InputValues().GetVarIDsStart(len(t.InputIndexes)), + ChunkSize: t.EC.ChunkSize, + }, nil +} diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index db64d2a..7a5cd3a 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -84,8 +84,8 @@ func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) er } // 告知后续Op临时文件的路径 - e.PutVar(o.BypassFileOutput, &BypassFileInfoValue{ - BypassFileInfo: fileInfo, + e.PutVar(o.BypassFileOutput, &BypassUploadedFileValue{ + BypassUploadedFile: fileInfo, }) // 等待后续Op处理临时文件 diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index 4e955e6..3fab454 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -41,10 +41,10 @@ func (o *S2STransfer) Execute(ctx *exec.ExecContext, e *exec.Executor) error { defer s2s.Abort() // 告知后续Op处理临时文件 - e.PutVar(o.Output, &BypassFileInfoValue{BypassFileInfo: types.BypassFileInfo{ - TempFilePath: dstPath, - FileHash: srcPath.Info.Hash, - Size: srcPath.Info.Size, + e.PutVar(o.Output, &BypassUploadedFileValue{BypassUploadedFile: types.BypassUploadedFile{ + Path: dstPath, + Hash: srcPath.Info.Hash, + Size: srcPath.Info.Size, }}) // 等待后续Op处理临时文件 diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 76dfd86..4fab018 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -117,12 +117,12 @@ func (o *ShardWrite) String() string { type ShardReadNode struct { dag.NodeBase - From ioswitch2.From + From *ioswitch2.FromShardstore StorageID cdssdk.StorageID Open types.OpenOption } -func (b *GraphNodeBuilder) NewShardRead(fr ioswitch2.From, stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(fr *ioswitch2.FromShardstore, stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ From: fr, StorageID: stgID, @@ -159,12 +159,12 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { type ShardWriteNode struct { dag.NodeBase - To ioswitch2.To + To *ioswitch2.ToShardStore Storage stgmod.StorageDetail FileHashStoreKey string } -func (b *GraphNodeBuilder) NewShardWrite(to ioswitch2.To, stg stgmod.StorageDetail, fileHashStoreKey string) *ShardWriteNode { +func (b *GraphNodeBuilder) NewShardWrite(to *ioswitch2.ToShardStore, stg stgmod.StorageDetail, fileHashStoreKey string) *ShardWriteNode { node := &ShardWriteNode{ To: to, Storage: stg, diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 6e2c5b1..d477b81 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -2,9 +2,12 @@ package opt import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser/state" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" ) // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 @@ -36,3 +39,103 @@ func RemoveUnusedMultiplyOutput(ctx *state.GenerateState) bool { }) return changed } + +// 替换ECMultiply指令 +func UseECMultiplier(ctx *state.GenerateState) { + dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(mulNode *ops2.ECMultiplyNode) bool { + if mulNode.OutputStreams().Len() == 0 { + return true + } + + // 暂不支持编解码流的一部分 + if ctx.StreamRange.Offset != 0 || ctx.StreamRange.Length != nil { + return true + } + + var to *ioswitch2.ToShardStore + var swNodes []*ops2.ShardWriteNode + // 所有的输出流必须有且只有一个相同的目的地 + // 暂时只支持分片存储 + for i := 0; i < mulNode.OutputStreams().Len(); i++ { + if mulNode.OutputStreams().Get(i).Dst.Len() != 1 { + return true + } + + dstNode := mulNode.OutputStreams().Get(i).Dst.Get(0) + swNode, ok := dstNode.(*ops2.ShardWriteNode) + if !ok { + return true + } + + if to == nil { + to = swNode.To + } else if to.Storage.Storage.StorageID != swNode.Storage.Storage.StorageID { + return true + } + swNodes = append(swNodes, swNode) + } + _, err := factory.GetBuilder(to.Storage).CreateECMultiplier() + if err != nil { + return true + } + + // 每一个输入流都必须直接来自于存储服务,而且要支持通过HTTP读取文件 + var srNodes []*ops2.ShardReadNode + for i := 0; i < mulNode.InputStreams().Len(); i++ { + inNode := mulNode.InputStreams().Get(i).Src + srNode, ok := inNode.(*ops2.ShardReadNode) + if !ok { + return true + } + + if !factory.GetBuilder(srNode.From.Storage).ShardStoreDesc().HasBypassHTTPRead() { + return true + } + + srNodes = append(srNodes, srNode) + } + + // 检查满足条件后,替换ECMultiply指令 + callMul := ctx.DAG.NewCallECMultiplier(to.Storage) + switch addr := to.Hub.Address.(type) { + case *cdssdk.HttpAddressInfo: + callMul.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: to.Hub}) + callMul.Env().Pinned = true + + case *cdssdk.GRPCAddressInfo: + callMul.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: to.Hub, Address: *addr}) + callMul.Env().Pinned = true + + default: + return true + } + + callMul.InitFrom(mulNode) + for i, srNode := range srNodes { + srNode.Output().Var().NotTo(mulNode) + // 只有完全没有输出的ShardReadNode才可以被删除 + if srNode.Output().Var().Dst.Len() == 0 { + ctx.DAG.RemoveNode(srNode) + delete(ctx.FromNodes, srNode.From) + } + + hbr := ctx.DAG.NewBypassFromShardStoreHTTP(srNode.StorageID, srNode.From.FileHash) + hbr.Env().CopyFrom(srNode.Env()) + hbr.HTTPRequestVar().ToSlot(callMul.InputSlot(i)) + } + + for i, swNode := range swNodes { + ctx.DAG.RemoveNode(swNode) + delete(ctx.ToNodes, swNode.To) + + bs := ctx.DAG.NewBypassToShardStore(to.Storage.Storage.StorageID, swNode.FileHashStoreKey) + bs.Env().CopyFrom(swNode.Env()) + + callMul.OutputVar(i).ToSlot(bs.BypassFileInfoSlot()) + bs.BypassCallbackVar().ToSlot(callMul.BypassCallbackSlot(i)) + } + + ctx.DAG.RemoveNode(mulNode) + return true + }) +} diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index 4eb7fc4..d0bdbdd 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -87,8 +87,7 @@ func UseS2STransfer(ctx *state.GenerateState) { brNode.FilePathVar().ToSlot(s2sNode.SrcPathSlot()) // 传输结果通知目的节点 - to := toShard.To.(*ioswitch2.ToShardStore) - bwNode := ctx.DAG.NewBypassToShardStore(toShard.Storage.Storage.StorageID, to.FileHashStoreKey) + bwNode := ctx.DAG.NewBypassToShardStore(toShard.Storage.Storage.StorageID, toShard.To.FileHashStoreKey) bwNode.Env().CopyFrom(toShard.Env()) s2sNode.BypassFileInfoVar().ToSlot(bwNode.BypassFileInfoSlot()) diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index b7db314..6796375 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -77,6 +77,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { // 下面这些只需要执行一次,但需要按顺序 opt.RemoveUnusedFromNode(state) + opt.UseECMultiplier(state) opt.UseS2STransfer(state) opt.UseMultipartUploadToShardStore(state) opt.DropUnused(state) diff --git a/common/pkgs/storage/all.go b/common/pkgs/storage/all.go new file mode 100644 index 0000000..8220709 --- /dev/null +++ b/common/pkgs/storage/all.go @@ -0,0 +1,9 @@ +package storage + +import ( + // !!! 需要导入所有存储服务的包 !!! + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/efile" + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local" + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mashup" + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs" +) diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go index 739c337..1ccc052 100644 --- a/common/pkgs/storage/efile/ec_multiplier.go +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -2,7 +2,9 @@ package efile import ( "fmt" + "net/http" "net/url" + "path" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/http2" @@ -12,7 +14,7 @@ import ( ) type ECMultiplier struct { - token string + blder *builder url string feat *cdssdk.ECMultiplierFeature outputs []string @@ -21,41 +23,61 @@ type ECMultiplier struct { // 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 // 输出为每一个块文件的路径,数组长度 = len(coef) -func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) { +func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunkSize int) ([]types.BypassUploadedFile, error) { type Request struct { - Inputs []types.HTTPReqeust `json:"inputs"` + Inputs []types.HTTPRequest `json:"inputs"` Outputs []string `json:"outputs"` - Coefs [][]byte `json:"coefs"` - ChunkSize int64 `json:"chunkSize"` + Coefs [][]int `json:"coefs"` // 用int防止被base64编码 + ChunkSize int `json:"chunkSize"` } type Response struct { Code string `json:"code"` Msg string `json:"msg"` + Data []struct { + Size int64 `json:"size"` + Sha256 string `json:"sha256"` + } + } + + intCoefs := make([][]int, len(coef)) + for i := range intCoefs { + intCoefs[i] = make([]int, len(coef[i])) + for j := range intCoefs[i] { + intCoefs[i][j] = int(coef[i][j]) + } } fileName := os2.GenerateRandomFileName(10) m.outputs = make([]string, len(coef)) for i := range m.outputs { - m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i) + m.outputs[i] = path.Join(m.feat.TempDir, fmt.Sprintf("%s_%d", fileName, i)) } - u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask") + u, err := url.JoinPath(m.url, "efile/openapi/v2/file/createECTask") if err != nil { return nil, err } + token, err := m.blder.getToken() + if err != nil { + return nil, fmt.Errorf("get token: %w", err) + } + resp, err := http2.PostJSON(u, http2.RequestParam{ - Header: map[string]string{"token": m.token}, + Header: map[string]string{"token": token}, Body: Request{ Inputs: inputs, Outputs: m.outputs, - Coefs: coef, + Coefs: intCoefs, ChunkSize: chunkSize, }, }) if err != nil { return nil, err } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status code: %d", resp.StatusCode) + } var r Response err = serder.JSONToObjectStream(resp.Body, &r) @@ -67,7 +89,20 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunk return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg) } - return m.outputs, nil + if len(r.Data) != len(m.outputs) { + return nil, fmt.Errorf("data length not match outputs length") + } + + ret := make([]types.BypassUploadedFile, len(r.Data)) + for i, data := range r.Data { + ret[i] = types.BypassUploadedFile{ + Path: m.outputs[i], + Size: data.Size, + Hash: cdssdk.NewFullHashFromString(data.Sha256), + } + } + + return ret, nil } // 完成计算 @@ -83,9 +118,14 @@ func (m *ECMultiplier) Abort() { return } + token, err := m.blder.getToken() + if err != nil { + return + } + for _, output := range m.outputs { http2.PostJSON(u, http2.RequestParam{ - Header: map[string]string{"token": m.token}, + Header: map[string]string{"token": token}, Query: map[string]string{"paths": output}, }) } diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go index af869d6..851afcd 100644 --- a/common/pkgs/storage/efile/efile.go +++ b/common/pkgs/storage/efile/efile.go @@ -87,7 +87,7 @@ func (b *builder) getToken() (string, error) { } } - return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID) + return "", fmt.Errorf("clusterID %s not found", stgType.ClusterID) } func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { @@ -96,13 +96,8 @@ func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { return nil, fmt.Errorf("feature ECMultiplier not found") } - token, err := b.getToken() - if err != nil { - return nil, fmt.Errorf("get token: %v", err) - } - return &ECMultiplier{ - token: token, + blder: b, url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL, feat: feat, }, nil diff --git a/common/pkgs/storage/factory/factory.go b/common/pkgs/storage/factory/factory.go index ae46ceb..5b75890 100644 --- a/common/pkgs/storage/factory/factory.go +++ b/common/pkgs/storage/factory/factory.go @@ -4,12 +4,9 @@ import ( "reflect" stgmod "gitlink.org.cn/cloudream/storage/common/models" + _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - - // !!! 需要导入所有存储服务的包 !!! - _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local" - _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" ) // 此函数永远不会返回nil。如果找不到对应的Builder,则会返回EmptyBuilder, diff --git a/common/pkgs/storage/factory/reg/reg.go b/common/pkgs/storage/factory/reg/reg.go index 4161080..ef86de5 100644 --- a/common/pkgs/storage/factory/reg/reg.go +++ b/common/pkgs/storage/factory/reg/reg.go @@ -17,3 +17,17 @@ var StorageBuilders = make(map[reflect.Type]BuilderCtor) func RegisterBuilder[T cdssdk.StorageType](ctor BuilderCtor) { StorageBuilders[reflect2.TypeOf[T]()] = ctor } + +// 注:此函数只给storage包内部使用,外部包请使用外层的factory.GetBuilder +// 此函数永远不会返回nil。如果找不到对应的Builder,则会返回EmptyBuilder, +// 此Builder的所有函数都会返回否定值或者封装后的ErrUnsupported错误(需要使用errors.Is检查) +func GetBuilderInternal(detail stgmod.StorageDetail) types.StorageBuilder { + typ := reflect.TypeOf(detail.Storage.Type) + + ctor, ok := StorageBuilders[typ] + if !ok { + return &types.EmptyBuilder{} + } + + return ctor(detail) +} diff --git a/common/pkgs/storage/local/multipart_upload.go b/common/pkgs/storage/local/multipart_upload.go index b5c4374..78486e7 100644 --- a/common/pkgs/storage/local/multipart_upload.go +++ b/common/pkgs/storage/local/multipart_upload.go @@ -85,14 +85,14 @@ func (i *MultipartTask) InitState() types.MultipartInitState { } } -func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { +func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassUploadedFile, error) { parts = sort2.Sort(parts, func(l, r types.UploadedPartInfo) int { return l.PartNumber - r.PartNumber }) joined, err := os.Create(i.joinedFilePath) if err != nil { - return types.BypassFileInfo{}, err + return types.BypassUploadedFile{}, err } defer joined.Close() @@ -102,17 +102,17 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar for _, part := range parts { partSize, err := i.writePart(part, joined, hasher) if err != nil { - return types.BypassFileInfo{}, err + return types.BypassUploadedFile{}, err } size += partSize } h := hasher.Sum(nil) - return types.BypassFileInfo{ - TempFilePath: joined.Name(), - Size: size, - FileHash: cdssdk.NewFullHash(h), + return types.BypassUploadedFile{ + Path: joined.Name(), + Size: size, + Hash: cdssdk.NewFullHash(h), }, nil } diff --git a/common/pkgs/storage/local/public_store.go b/common/pkgs/storage/local/public_store.go index 55f96b2..344bda6 100644 --- a/common/pkgs/storage/local/public_store.go +++ b/common/pkgs/storage/local/public_store.go @@ -19,10 +19,6 @@ func (d *PublicStoreDesc) Enabled() bool { return d.builder.detail.Storage.PublicStore != nil } -func (d *PublicStoreDesc) HasBypassWrite() bool { - return false -} - type PublicStore struct { agt *agent cfg cdssdk.LocalPublicStorage diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 28ed46a..da6b00e 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -32,11 +32,11 @@ func (s *ShardStoreDesc) Enabled() bool { } func (s *ShardStoreDesc) HasBypassWrite() bool { - return true + return s.Enabled() } func (s *ShardStoreDesc) HasBypassRead() bool { - return true + return s.Enabled() } type ShardStore struct { @@ -409,27 +409,27 @@ func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string { var _ types.BypassWrite = (*ShardStore)(nil) -func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error { +func (s *ShardStore) BypassUploaded(info types.BypassUploadedFile) error { s.lock.Lock() defer s.lock.Unlock() log := s.getLogger() - log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.TempFilePath, info.Size, info.FileHash) + log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.Path, info.Size, info.Hash) - blockDir := s.getFileDirFromHash(info.FileHash) + blockDir := s.getFileDirFromHash(info.Hash) err := os.MkdirAll(blockDir, 0755) if err != nil { log.Warnf("make block dir %v: %v", blockDir, err) return fmt.Errorf("making block dir: %w", err) } - newPath := filepath.Join(blockDir, string(info.FileHash)) + newPath := filepath.Join(blockDir, string(info.Hash)) _, err = os.Stat(newPath) if os.IsNotExist(err) { - err = os.Rename(info.TempFilePath, newPath) + err = os.Rename(info.Path, newPath) if err != nil { - log.Warnf("rename %v to %v: %v", info.TempFilePath, newPath, err) + log.Warnf("rename %v to %v: %v", info.Path, newPath, err) return fmt.Errorf("rename file: %w", err) } diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go index de1e383..8a48464 100644 --- a/common/pkgs/storage/mashup/mashup.go +++ b/common/pkgs/storage/mashup/mashup.go @@ -3,7 +3,6 @@ package mashup import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -25,7 +24,7 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) { detail := b.detail detail.Storage.Type = stgType.Agent - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.CreateAgent() } @@ -34,7 +33,7 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { detail := b.detail detail.Storage.Type = stgType.Agent - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.ShardStoreDesc() } @@ -43,7 +42,7 @@ func (b *builder) PublicStoreDesc() types.PublicStoreDesc { detail := b.detail detail.Storage.Type = stgType.Agent - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.PublicStoreDesc() } @@ -52,7 +51,7 @@ func (b *builder) CreateMultiparter() (types.Multiparter, error) { detail := b.detail detail.Storage.Type = stgType.Feature - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.CreateMultiparter() } @@ -61,7 +60,7 @@ func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { detail := b.detail detail.Storage.Type = stgType.Feature - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.CreateS2STransfer() } @@ -70,6 +69,6 @@ func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { detail := b.detail detail.Storage.Type = stgType.Feature - blder := factory.GetBuilder(detail) + blder := reg.GetBuilderInternal(detail) return blder.CreateECMultiplier() } diff --git a/common/pkgs/storage/s3/agent.go b/common/pkgs/storage/obs/agent.go similarity index 98% rename from common/pkgs/storage/s3/agent.go rename to common/pkgs/storage/obs/agent.go index 271bcab..df2ad78 100644 --- a/common/pkgs/storage/s3/agent.go +++ b/common/pkgs/storage/obs/agent.go @@ -1,4 +1,4 @@ -package s3 +package obs import ( stgmod "gitlink.org.cn/cloudream/storage/common/models" diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/obs/obs.go similarity index 56% rename from common/pkgs/storage/s3/s3.go rename to common/pkgs/storage/obs/obs.go index 9ac531a..f137d02 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/obs/obs.go @@ -1,20 +1,20 @@ -package s3 +package obs import ( "fmt" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3/obs" + s3stg "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) func init() { - reg.RegisterBuilder[*cdssdk.COSType](newBuilder) - reg.RegisterBuilder[*cdssdk.OSSType](newBuilder) reg.RegisterBuilder[*cdssdk.OBSType](newBuilder) } @@ -30,6 +30,11 @@ func newBuilder(detail stgmod.StorageDetail) types.StorageBuilder { } func (b *builder) CreateAgent() (types.StorageAgent, error) { + obsType, ok := b.detail.Storage.Type.(*cdssdk.OBSType) + if !ok { + return nil, fmt.Errorf("invalid storage type %T for obs agent", b.detail.Storage.Type) + } + agt := &Agent{ Detail: b.detail, } @@ -40,15 +45,7 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) { return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) } - cli, bkt, err := createS3Client(b.detail.Storage.Type) - if err != nil { - return nil, err - } - - store, err := NewShardStore(agt, cli, bkt, *cfg, ShardStoreOption{ - // 目前对接的存储服务都不支持从上传接口直接获取到Sha256 - UseAWSSha256: false, - }) + store, err := NewShardStore(b.detail, obsType, *cfg) if err != nil { return nil, err } @@ -60,11 +57,28 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) { } func (b *builder) ShardStoreDesc() types.ShardStoreDesc { - return &ShardStoreDesc{builder: b} + return &ShardStoreDesc{ + ShardStoreDesc: s3stg.NewShardStoreDesc(&b.detail), + } } -func (b *builder) PublicStoreDesc() types.PublicStoreDesc { - return &PublicStoreDesc{} +func createClient(addr *cdssdk.OBSType) (*s3.Client, string, error) { + awsConfig := aws.Config{} + + cre := aws.Credentials{ + AccessKeyID: addr.AK, + SecretAccessKey: addr.SK, + } + awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} + awsConfig.Region = addr.Region + + options := []func(*s3.Options){} + options = append(options, func(s3Opt *s3.Options) { + s3Opt.BaseEndpoint = &addr.Endpoint + }) + + cli := s3.NewFromConfig(awsConfig, options...) + return cli, addr.Bucket, nil } func (b *builder) CreateMultiparter() (types.Multiparter, error) { @@ -73,10 +87,17 @@ func (b *builder) CreateMultiparter() (types.Multiparter, error) { return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) } - return &Multiparter{ - detail: b.detail, - feat: feat, - }, nil + cli, bucket, err := createClient(b.detail.Storage.Type.(*cdssdk.OBSType)) + if err != nil { + return nil, err + } + + return s3stg.NewMultiparter( + b.detail, + feat, + bucket, + cli, + ), nil } func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { @@ -85,24 +106,5 @@ func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { return nil, fmt.Errorf("feature %T not found", cdssdk.S2STransferFeature{}) } - switch addr := b.detail.Storage.Type.(type) { - case *cdssdk.OBSType: - return obs.NewS2STransfer(addr, feat), nil - default: - return nil, fmt.Errorf("unsupported storage type %T", addr) - } -} - -func createS3Client(addr cdssdk.StorageType) (*s3.Client, string, error) { - switch addr := addr.(type) { - // case *cdssdk.COSType: - - // case *cdssdk.OSSType: - - case *cdssdk.OBSType: - return obs.CreateS2Client(addr) - - default: - return nil, "", fmt.Errorf("unsupported storage type %T", addr) - } + return NewS2STransfer(b.detail.Storage.Type.(*cdssdk.OBSType), feat), nil } diff --git a/common/pkgs/storage/s3/obs/obs_test.go b/common/pkgs/storage/obs/obs_test.go similarity index 100% rename from common/pkgs/storage/s3/obs/obs_test.go rename to common/pkgs/storage/obs/obs_test.go diff --git a/common/pkgs/storage/s3/obs/s2s.go b/common/pkgs/storage/obs/s2s.go similarity index 95% rename from common/pkgs/storage/s3/obs/s2s.go rename to common/pkgs/storage/obs/s2s.go index 6317092..5e1f40f 100644 --- a/common/pkgs/storage/s3/obs/s2s.go +++ b/common/pkgs/storage/obs/s2s.go @@ -13,7 +13,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/os2" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3/utils" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" ) type S2STransfer struct { @@ -65,7 +65,7 @@ func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, sr return "", err } - tempPrefix := utils.JoinKey(s.feat.TempDir, os2.GenerateRandomFileName(10)) + "/" + tempPrefix := s3.JoinKey(s.feat.TempDir, os2.GenerateRandomFileName(10)) + "/" taskType := model.GetCreateTaskReqTaskTypeEnum().OBJECT s.omsCli = oms.NewOmsClient(cli) @@ -93,7 +93,7 @@ func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, sr return "", fmt.Errorf("wait task: %w", err) } - return utils.JoinKey(tempPrefix, srcPath), nil + return s3.JoinKey(tempPrefix, srcPath), nil } func (s *S2STransfer) makeRequest(srcStg cdssdk.StorageType, srcPath string) *model.SrcNodeReq { diff --git a/common/pkgs/storage/obs/shard_store.go b/common/pkgs/storage/obs/shard_store.go new file mode 100644 index 0000000..25317a0 --- /dev/null +++ b/common/pkgs/storage/obs/shard_store.go @@ -0,0 +1,64 @@ +package obs + +import ( + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type ShardStoreDesc struct { + s3.ShardStoreDesc +} + +func (d *ShardStoreDesc) HasBypassHTTPRead() bool { + return d.Enabled() +} + +type ShardStore struct { + *s3.ShardStore + obsType *cdssdk.OBSType +} + +func NewShardStore(detail stgmod.StorageDetail, obsType *cdssdk.OBSType, cfg cdssdk.S3ShardStorage) (*ShardStore, error) { + sd := ShardStore{ + obsType: obsType, + } + + s3Cli, bkt, err := createClient(obsType) + if err != nil { + return nil, err + } + + sd.ShardStore, err = s3.NewShardStore(detail, s3Cli, bkt, cfg, s3.ShardStoreOption{ + UseAWSSha256: false, + }) + if err != nil { + return nil, err + } + + return &sd, nil +} + +func (s *ShardStore) HTTPBypassRead(fileHash cdssdk.FileHash) (types.HTTPRequest, error) { + cli, err := obs.New(s.obsType.AK, s.obsType.SK, s.obsType.Endpoint) + if err != nil { + return types.HTTPRequest{}, err + } + + getSigned, err := cli.CreateSignedUrl(&obs.CreateSignedUrlInput{ + Method: "GET", + Bucket: s.Bucket, + Key: s.GetFilePathFromHash(fileHash), + Expires: 3600, + }) + if err != nil { + return types.HTTPRequest{}, err + } + + return types.HTTPRequest{ + URL: getSigned.SignedUrl, + Method: "GET", + }, nil +} diff --git a/common/pkgs/storage/s3/client.go b/common/pkgs/storage/s3/client.go deleted file mode 100644 index fdfa000..0000000 --- a/common/pkgs/storage/s3/client.go +++ /dev/null @@ -1,17 +0,0 @@ -package s3 - -// type S3Client interface { -// PutObject(ctx context.Context, bucket string, key string, body io.Reader) (PutObjectResp, error) -// GetObject(ctx context.Context, bucket string, key string, rng exec.Range) (io.ReadCloser, error) -// HeadObject(ctx context.Context, bucket string, key string) (HeadObjectResp, error) -// ListObjectsV2(ctx context.Context, bucket string, prefix string -// } - -// type PutObjectResp struct { -// Hash cdssdk.FileHash // 文件SHA256哈希值 -// Size int64 // 文件大小 -// } - -// type HeadObjectResp struct { -// Size int64 // 文件大小 -// } diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go index b80ac1e..d5a8a4d 100644 --- a/common/pkgs/storage/s3/multipart_upload.go +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -20,6 +20,17 @@ import ( type Multiparter struct { detail stgmod.StorageDetail feat *cdssdk.MultipartUploadFeature + bucket string + cli *s3.Client +} + +func NewMultiparter(detail stgmod.StorageDetail, feat *cdssdk.MultipartUploadFeature, bkt string, cli *s3.Client) *Multiparter { + return &Multiparter{ + detail: detail, + feat: feat, + bucket: bkt, + cli: cli, + } } func (m *Multiparter) MinPartSize() int64 { @@ -34,13 +45,8 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) tempFileName := os2.GenerateRandomFileName(10) tempFilePath := filepath.Join(m.feat.TempDir, tempFileName) - cli, bkt, err := createS3Client(m.detail.Storage.Type) - if err != nil { - return nil, err - } - - resp, err := cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ - Bucket: aws.String(bkt), + resp, err := m.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(m.bucket), Key: aws.String(tempFilePath), ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, }) @@ -49,8 +55,8 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) } return &MultipartTask{ - cli: cli, - bucket: bkt, + cli: m.cli, + bucket: m.bucket, tempDir: m.feat.TempDir, tempFileName: tempFileName, tempFilePath: tempFilePath, @@ -59,13 +65,8 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) } func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { - cli, _, err := createS3Client(m.detail.Storage.Type) - if err != nil { - return types.UploadedPartInfo{}, err - } - hashStr := io2.NewReadHasher(sha256.New(), stream) - resp, err := cli.UploadPart(ctx, &s3.UploadPartInput{ + resp, err := m.cli.UploadPart(ctx, &s3.UploadPartInput{ Bucket: aws.String(init.Bucket), Key: aws.String(init.Key), UploadId: aws.String(init.UploadID), @@ -100,7 +101,7 @@ func (i *MultipartTask) InitState() types.MultipartInitState { } } -func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { +func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassUploadedFile, error) { parts = sort2.Sort(parts, func(l, r types.UploadedPartInfo) int { return l.PartNumber - r.PartNumber }) @@ -126,7 +127,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar }, }) if err != nil { - return types.BypassFileInfo{}, err + return types.BypassUploadedFile{}, err } headResp, err := i.cli.HeadObject(ctx, &s3.HeadObjectInput{ @@ -134,15 +135,15 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar Key: aws.String(i.tempFilePath), }) if err != nil { - return types.BypassFileInfo{}, err + return types.BypassUploadedFile{}, err } hash := cdssdk.CalculateCompositeHash(partHashes) - return types.BypassFileInfo{ - TempFilePath: i.tempFilePath, - Size: *headResp.ContentLength, - FileHash: hash, + return types.BypassUploadedFile{ + Path: i.tempFilePath, + Size: *headResp.ContentLength, + Hash: hash, }, nil } diff --git a/common/pkgs/storage/s3/obs/client.go b/common/pkgs/storage/s3/obs/client.go deleted file mode 100644 index e2f73c0..0000000 --- a/common/pkgs/storage/s3/obs/client.go +++ /dev/null @@ -1,27 +0,0 @@ -package obs - -import ( - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -func CreateS2Client(addr *cdssdk.OBSType) (*s3.Client, string, error) { - awsConfig := aws.Config{} - - cre := aws.Credentials{ - AccessKeyID: addr.AK, - SecretAccessKey: addr.SK, - } - awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} - awsConfig.Region = addr.Region - - options := []func(*s3.Options){} - options = append(options, func(s3Opt *s3.Options) { - s3Opt.BaseEndpoint = &addr.Endpoint - }) - - cli := s3.NewFromConfig(awsConfig, options...) - return cli, addr.Bucket, nil -} diff --git a/common/pkgs/storage/s3/s3_test.go b/common/pkgs/storage/s3/s3_test.go deleted file mode 100644 index 7d81900..0000000 --- a/common/pkgs/storage/s3/s3_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package s3 - -import ( - "bytes" - "context" - "fmt" - "os" - "path/filepath" - "strings" - "testing" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - . "github.com/smartystreets/goconvey/convey" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -func Test_S3(t *testing.T) { - Convey("OBS", t, func() { - cli, bkt, err := createS3Client(&cdssdk.OBSType{ - Region: "cn-north-4", - AK: "*", - SK: "*", - Endpoint: "https://obs.cn-north-4.myhuaweicloud.com", - Bucket: "pcm3-bucket3", - }) - So(err, ShouldEqual, nil) - - // file, err := os.Open("./sky") - So(err, ShouldEqual, nil) - // defer file.Close() - - _, err = cli.PutObject(context.Background(), &s3.PutObjectInput{ - Bucket: aws.String(bkt), - Key: aws.String("sky2"), - Body: bytes.NewReader([]byte("hello world")), - // ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, - // ContentType: aws.String("application/octet-stream"), - ContentLength: aws.Int64(11), - // ContentEncoding: aws.String("identity"), - }) - - So(err, ShouldEqual, nil) - - // var marker *string - // for { - // resp, err := cli.ListObjects(context.Background(), &s3.ListObjectsInput{ - // Bucket: aws.String(bkt), - // Prefix: aws.String("cds"), - // MaxKeys: aws.Int32(5), - // Marker: marker, - // }) - // So(err, ShouldEqual, nil) - - // fmt.Printf("\n") - // for _, obj := range resp.Contents { - // fmt.Printf("%v, %v\n", *obj.Key, *obj.LastModified) - // } - - // if *resp.IsTruncated { - // marker = resp.NextMarker - // } else { - // break - // } - // } - - }) -} - -func Test_2(t *testing.T) { - Convey("OBS", t, func() { - dir := "d:\\Projects\\cloudream\\workspace\\storage\\common\\pkgs\\storage\\s3" - filepath.WalkDir(dir, func(fname string, d os.DirEntry, err error) error { - if err != nil { - return nil - } - - info, err := d.Info() - if err != nil { - return nil - } - - if info.IsDir() { - return nil - } - - path := strings.TrimPrefix(fname, dir+string(os.PathSeparator)) - // path := fname - comps := strings.Split(filepath.ToSlash(path), "/") - fmt.Println(path) - fmt.Println(comps) - // s.fs.syncer.SyncObject(append([]string{userName}, comps...), info.Size()) - return nil - }) - }) -} diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index aa98e5f..b53d956 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -17,7 +17,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/os2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3/utils" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -28,19 +28,25 @@ const ( type ShardStoreDesc struct { types.EmptyShardStoreDesc - builder *builder + Detail *stgmod.StorageDetail +} + +func NewShardStoreDesc(detail *stgmod.StorageDetail) ShardStoreDesc { + return ShardStoreDesc{ + Detail: detail, + } } func (s *ShardStoreDesc) Enabled() bool { - return s.builder.detail.Storage.ShardStore != nil + return s.Detail.Storage.ShardStore != nil } func (s *ShardStoreDesc) HasBypassWrite() bool { - return true + return s.Enabled() } func (s *ShardStoreDesc) HasBypassRead() bool { - return true + return s.Enabled() } type ShardStoreOption struct { @@ -48,9 +54,9 @@ type ShardStoreOption struct { } type ShardStore struct { - svc *Agent + Detail stgmod.StorageDetail + Bucket string cli *s3.Client - bucket string cfg cdssdk.S3ShardStorage opt ShardStoreOption lock sync.Mutex @@ -58,11 +64,11 @@ type ShardStore struct { done chan any } -func NewShardStore(svc *Agent, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage, opt ShardStoreOption) (*ShardStore, error) { +func NewShardStore(detail stgmod.StorageDetail, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage, opt ShardStoreOption) (*ShardStore, error) { return &ShardStore{ - svc: svc, + Detail: detail, cli: cli, - bucket: bkt, + Bucket: bkt, cfg: cfg, opt: opt, workingTempFiles: make(map[string]bool), @@ -99,8 +105,8 @@ func (s *ShardStore) removeUnusedTempFiles() { var marker *string for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ - Bucket: aws.String(s.bucket), - Prefix: aws.String(utils.JoinKey(s.cfg.Root, TempDir, "/")), + Bucket: aws.String(s.Bucket), + Prefix: aws.String(JoinKey(s.cfg.Root, TempDir, "/")), Marker: marker, }) @@ -110,7 +116,7 @@ func (s *ShardStore) removeUnusedTempFiles() { } for _, obj := range resp.Contents { - objName := utils.BaseKey(*obj.Key) + objName := BaseKey(*obj.Key) if s.workingTempFiles[objName] { continue @@ -134,7 +140,7 @@ func (s *ShardStore) removeUnusedTempFiles() { } resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Delete: &s3types.Delete{ Objects: deletes, }, @@ -175,7 +181,7 @@ func (s *ShardStore) createWithAwsSha256(stream io.Reader) (types.FileInfo, erro counter := io2.NewCounter(stream) resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(key), Body: counter, ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, @@ -196,7 +202,7 @@ func (s *ShardStore) createWithAwsSha256(stream io.Reader) (types.FileInfo, erro return types.FileInfo{}, errors.New("SHA256 checksum not found in response") } - hash, err := utils.DecodeBase64Hash(*resp.ChecksumSHA256) + hash, err := DecodeBase64Hash(*resp.ChecksumSHA256) if err != nil { log.Warnf("decode SHA256 checksum %v: %v", *resp.ChecksumSHA256, err) s.onCreateFailed(key, fileName) @@ -215,7 +221,7 @@ func (s *ShardStore) createWithCalcSha256(stream io.Reader) (types.FileInfo, err counter := io2.NewCounter(hashStr) _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(key), Body: counter, }) @@ -236,11 +242,11 @@ func (s *ShardStore) createTempFile() (string, string) { s.lock.Lock() defer s.lock.Unlock() - tmpDir := utils.JoinKey(s.cfg.Root, TempDir) + tmpDir := JoinKey(s.cfg.Root, TempDir) tmpName := os2.GenerateRandomFileName(20) s.workingTempFiles[tmpName] = true - return utils.JoinKey(tmpDir, tmpName), tmpName + return JoinKey(tmpDir, tmpName), tmpName } func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) { @@ -250,7 +256,7 @@ func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdss defer func() { // 不管是否成功。即使失败了也有定时清理机制去兜底 s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(tempFilePath), }) }() @@ -259,12 +265,12 @@ func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdss log.Debugf("write file %v finished, size: %v, hash: %v", tempFilePath, size, hash) - blockDir := s.getFileDirFromHash(hash) - newPath := utils.JoinKey(blockDir, string(hash)) + blockDir := s.GetFileDirFromHash(hash) + newPath := JoinKey(blockDir, string(hash)) _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ - Bucket: aws.String(s.bucket), - CopySource: aws.String(utils.JoinKey(s.bucket, tempFilePath)), + Bucket: aws.String(s.Bucket), + CopySource: aws.String(JoinKey(s.Bucket, tempFilePath)), Key: aws.String(newPath), }) if err != nil { @@ -282,7 +288,7 @@ func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdss func (s *ShardStore) onCreateFailed(key string, fileName string) { // 不管是否成功。即使失败了也有定时清理机制去兜底 s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(key), }) @@ -297,7 +303,7 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { s.lock.Lock() defer s.lock.Unlock() - filePath := s.getFilePathFromHash(opt.FileHash) + filePath := s.GetFilePathFromHash(opt.FileHash) rngStr := fmt.Sprintf("bytes=%d-", opt.Offset) if opt.Length >= 0 { @@ -305,7 +311,7 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { } resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(filePath), Range: aws.String(rngStr), }) @@ -321,9 +327,9 @@ func (s *ShardStore) Info(hash cdssdk.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() - filePath := s.getFilePathFromHash(hash) + filePath := s.GetFilePathFromHash(hash) info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(filePath), }) if err != nil { @@ -344,12 +350,12 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - blockDir := utils.JoinKey(s.cfg.Root, BlocksDir) + blockDir := JoinKey(s.cfg.Root, BlocksDir) var marker *string for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Prefix: aws.String(blockDir), Marker: marker, }) @@ -360,7 +366,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { } for _, obj := range resp.Contents { - key := utils.BaseKey(*obj.Key) + key := BaseKey(*obj.Key) fileHash, err := cdssdk.ParseHash(key) if err != nil { @@ -393,13 +399,13 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { avais[hash] = true } - blockDir := utils.JoinKey(s.cfg.Root, BlocksDir) + blockDir := JoinKey(s.cfg.Root, BlocksDir) var deletes []s3types.ObjectIdentifier var marker *string for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Prefix: aws.String(blockDir), Marker: marker, }) @@ -410,7 +416,7 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { } for _, obj := range resp.Contents { - key := utils.BaseKey(*obj.Key) + key := BaseKey(*obj.Key) fileHash, err := cdssdk.ParseHash(key) if err != nil { continue @@ -433,7 +439,7 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { cnt := 0 if len(deletes) > 0 { resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Delete: &s3types.Delete{ Objects: deletes, }, @@ -459,21 +465,21 @@ func (s *ShardStore) Stats() types.Stats { } func (s *ShardStore) getLogger() logger.Logger { - return logger.WithField("ShardStore", "S3").WithField("Storage", s.svc.Detail.Storage.String()) + return logger.WithField("ShardStore", "S3").WithField("Storage", s.Detail.Storage.String()) } -func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string { - return utils.JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2)) +func (s *ShardStore) GetFileDirFromHash(hash cdssdk.FileHash) string { + return JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2)) } -func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string { - return utils.JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2), string(hash)) +func (s *ShardStore) GetFilePathFromHash(hash cdssdk.FileHash) string { + return JoinKey(s.cfg.Root, BlocksDir, hash.GetHashPrefix(2), string(hash)) } var _ types.BypassWrite = (*ShardStore)(nil) -func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error { - if info.FileHash == "" { +func (s *ShardStore) BypassUploaded(info types.BypassUploadedFile) error { + if info.Hash == "" { return fmt.Errorf("empty file hash is not allowed by this shard store") } @@ -482,25 +488,25 @@ func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error { defer func() { // 不管是否成功。即使失败了也有定时清理机制去兜底 s.cli.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(info.TempFilePath), + Bucket: aws.String(s.Bucket), + Key: aws.String(info.Path), }) }() log := s.getLogger() - log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.TempFilePath, info.Size, info.FileHash) + log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.Path, info.Size, info.Hash) - blockDir := s.getFileDirFromHash(info.FileHash) - newPath := utils.JoinKey(blockDir, string(info.FileHash)) + blockDir := s.GetFileDirFromHash(info.Hash) + newPath := JoinKey(blockDir, string(info.Hash)) _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ - CopySource: aws.String(utils.JoinKey(s.bucket, info.TempFilePath)), - Bucket: aws.String(s.bucket), + CopySource: aws.String(JoinKey(s.Bucket, info.Path)), + Bucket: aws.String(s.Bucket), Key: aws.String(newPath), }) if err != nil { - log.Warnf("copy file %v to %v: %v", info.TempFilePath, newPath, err) + log.Warnf("copy file %v to %v: %v", info.Path, newPath, err) return fmt.Errorf("copy file: %w", err) } @@ -513,9 +519,9 @@ func (s *ShardStore) BypassRead(fileHash cdssdk.FileHash) (types.BypassFilePath, s.lock.Lock() defer s.lock.Unlock() - filePath := s.getFilePathFromHash(fileHash) + filePath := s.GetFilePathFromHash(fileHash) info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{ - Bucket: aws.String(s.bucket), + Bucket: aws.String(s.Bucket), Key: aws.String(filePath), }) if err != nil { diff --git a/common/pkgs/storage/s3/utils/utils.go b/common/pkgs/storage/s3/utils.go similarity index 98% rename from common/pkgs/storage/s3/utils/utils.go rename to common/pkgs/storage/s3/utils.go index a1d2454..6bfe7c0 100644 --- a/common/pkgs/storage/s3/utils/utils.go +++ b/common/pkgs/storage/s3/utils.go @@ -1,4 +1,4 @@ -package utils +package s3 import ( "encoding/base64" diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index 9760559..ff19be0 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -4,16 +4,17 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -type BypassFileInfo struct { - TempFilePath string - FileHash cdssdk.FileHash - Size int64 +// 通过旁路上传后的文件的信息 +type BypassUploadedFile struct { + Path string + Hash cdssdk.FileHash + Size int64 } // 不通过ShardStore上传文件,但上传完成后需要通知ShardStore。 // 也可以用于共享存储。 type BypassWrite interface { - BypassUploaded(info BypassFileInfo) error + BypassUploaded(info BypassUploadedFile) error } // 描述指定文件在分片存储中的路径。可以考虑设计成interface。 @@ -31,11 +32,12 @@ type BypassRead interface { // 能通过一个Http请求直接访问文件 // 仅用于分片存储。 type HTTPBypassRead interface { - HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error) + HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPRequest, error) } -type HTTPReqeust struct { - SignedUrl string `json:"signedUrl"` - Header map[string]string `json:"header"` - Body string `json:"body"` +type HTTPRequest struct { + URL string `json:"url"` + Method string `json:"method"` + Header map[string]string `json:"header"` + Body string `json:"body"` } diff --git a/common/pkgs/storage/types/ec_multiplier.go b/common/pkgs/storage/types/ec_multiplier.go index c9d8d7b..7e16548 100644 --- a/common/pkgs/storage/types/ec_multiplier.go +++ b/common/pkgs/storage/types/ec_multiplier.go @@ -3,7 +3,7 @@ package types type ECMultiplier interface { // 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 // 输出为每一个块文件的路径,数组长度 = len(coef) - Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error) + Multiply(coef [][]byte, inputs []HTTPRequest, chunkSize int) ([]BypassUploadedFile, error) // 完成计算 Complete() // 取消计算。如果已经调用了Complete,则应该无任何影响 diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/s3_client.go index 4ec3f9c..ad20d82 100644 --- a/common/pkgs/storage/types/s3_client.go +++ b/common/pkgs/storage/types/s3_client.go @@ -17,7 +17,7 @@ type Multiparter interface { type MultipartTask interface { InitState() MultipartInitState // 所有分片上传完成后,合并分片 - JoinParts(ctx context.Context, parts []UploadedPartInfo) (BypassFileInfo, error) + JoinParts(ctx context.Context, parts []UploadedPartInfo) (BypassUploadedFile, error) // 合成之后的文件已被使用 Complete() // 取消上传。如果在调用Complete之前调用,则应该删除合并后的文件。如果已经调用Complete,则应该不做任何事情。 diff --git a/go.mod b/go.mod index 14b6185..00267d3 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/gin-gonic/gin v1.7.7 github.com/go-sql-driver/mysql v1.7.1 github.com/hashicorp/golang-lru/v2 v2.0.5 + github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.9+incompatible github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.131 github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf github.com/jedib0t/go-pretty/v6 v6.4.7 @@ -76,7 +77,7 @@ require ( github.com/sirupsen/logrus v1.9.2 github.com/smarty/assertions v1.15.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/streadway/amqp v1.1.0 // indirect + github.com/streadway/amqp v1.1.0 github.com/ugorji/go/codec v1.2.11 // indirect github.com/zyedidia/generic v1.2.1 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect diff --git a/go.sum b/go.sum index a3f7d3d..ab95866 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.9+incompatible h1:XQVXdk+WAJ4fSNB6mMRuYNvFWou7BZs6SZB925hPrnk= +github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.9+incompatible/go.mod h1:l7VUhRbTKCzdOacdT4oWCwATKyvZqUOlOqr0Ous3k4s= github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.131 h1:34E2+lzM/yi0GlYAEQEUuf4/3mAoAadA+7oaq9q3Mys= github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.131/go.mod h1:JWz2ujO9X3oU5wb6kXp+DpR2UuDj2SldDbX8T0FSuhI= github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=