From c260c05f441fa2c30782c2d5b5f0a630d98f0a02 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 27 May 2025 10:26:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E7=8B=AC=E7=AB=8B=E8=BF=90=E8=A1=8C=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/serve.go | 3 + client/internal/downloader/iterator.go | 7 +- client/internal/metacache/storagemeta.go | 43 ++--- client/internal/repl/ticktock.go | 16 ++ client/internal/services/object.go | 6 +- client/internal/ticktock/ticktock.go | 5 + client/types/types.go | 2 +- common/pkgs/ioswitch2/ops2/base_store.go | 8 +- common/pkgs/ioswitch2/ops2/ec.go | 22 +-- common/pkgs/ioswitch2/ops2/ops.go | 4 + common/pkgs/ioswitch2/ops2/s2s.go | 15 +- common/pkgs/ioswitch2/ops2/shard_store.go | 4 +- common/pkgs/ioswitch2/parser/gen/generator.go | 8 +- common/pkgs/ioswitch2/parser/opt/ec.go | 2 - common/pkgs/ioswitch2/parser/opt/misc.go | 40 ++++- common/pkgs/ioswitch2/parser/opt/multipart.go | 1 - common/pkgs/ioswitch2/parser/opt/s2s.go | 155 +++++------------- common/pkgs/ioswitch2/parser/opt/utils.go | 4 +- common/pkgs/ioswitch2/parser/parser.go | 4 +- common/pkgs/ioswitch2/parser/state/state.go | 10 +- common/pkgs/ioswitch2/plans/utils.go | 4 +- common/pkgs/ioswitchlrc/ops2/base_store.go | 128 ++------------- common/pkgs/ioswitchlrc/parser/passes.go | 2 +- common/pkgs/ioswitchlrc/parser/utils.go | 4 +- common/pkgs/rpc/utils.go | 5 + common/pkgs/storage/local/s2s.go | 4 + common/pkgs/storage/local/shard_store.go | 38 +++-- coordinator/types/types.go | 2 +- 28 files changed, 221 insertions(+), 325 deletions(-) diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index c57ed57..6ab1f0d 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -43,6 +43,7 @@ func init() { cmd.Flags().StringVarP(&opt.HTTPListenAddr, "listen", "", "", "http listen address, will override config file") cmd.Flags().BoolVarP(&opt.DisableMount, "no-mount", "", false, "disable mount") cmd.Flags().StringVarP(&opt.MountPoint, "mount", "", "", "mount point, will override config file") + cmd.Flags().BoolVarP(&opt.Standalone, "standalone", "", false, "standalone mode") RootCmd.AddCommand(&cmd) } @@ -51,6 +52,7 @@ type serveHTTPOptions struct { HTTPListenAddr string DisableMount bool MountPoint string + Standalone bool } func serveHTTP(configPath string, opts serveHTTPOptions) { @@ -68,6 +70,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitPools(&config.Cfg().HubRPC, &config.Cfg().CoordinatorRPC) + stgglb.StandaloneMode = opts.Standalone // 数据库 db, err := db.NewDB(&config.Cfg().DB) diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index bd7f18f..564ffdc 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -131,7 +131,12 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, i.downloader.stgPool) exec := plans.Execute(exeCtx) - go exec.Wait(context.TODO()) + go func() { + _, err := exec.Wait(context.TODO()) + if err != nil { + logger.Warnf("downloading object %v: %v", req.Raw.ObjectID, err) + } + }() return exec.BeginRead(strHandle) } diff --git a/client/internal/metacache/storagemeta.go b/client/internal/metacache/storagemeta.go index a833a7d..a05a3a2 100644 --- a/client/internal/metacache/storagemeta.go +++ b/client/internal/metacache/storagemeta.go @@ -62,34 +62,39 @@ func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, return vs, oks } - coorCli := stgglb.CoordinatorRPCPool.Get() - defer coorCli.Release() - - stgs := make([]cortypes.StorageType, len(spaces)) + detailMap := make(map[types.UserSpaceID]*types.UserSpaceDetail) for i := range spaces { - stgs[i] = spaces[i].Storage + detailMap[spaces[i].UserSpaceID] = &types.UserSpaceDetail{ + UserID: stgglb.Local.UserID, + UserSpace: spaces[i], + } } - selectHubs, cerr := coorCli.SelectStorageHub(context.Background(), &corrpc.SelectStorageHub{ - Storages: stgs, - }) - if cerr != nil { - logger.Warnf("get storage details: %v", cerr) - return vs, oks - } + if !stgglb.StandaloneMode { + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() - detailMap := make(map[types.UserSpaceID]types.UserSpaceDetail) - for i := range spaces { - detailMap[spaces[i].UserSpaceID] = types.UserSpaceDetail{ - UserID: stgglb.Local.UserID, - UserSpace: spaces[i], - RecommendHub: selectHubs.Hubs[i], + stgs := make([]cortypes.StorageType, len(spaces)) + for i := range spaces { + stgs[i] = spaces[i].Storage + } + + selectHubs, cerr := coorCli.SelectStorageHub(context.Background(), &corrpc.SelectStorageHub{ + Storages: stgs, + }) + if cerr != nil { + logger.Warnf("get storage details: %v", cerr) + return vs, oks + } + + for i := range spaces { + detailMap[spaces[i].UserSpaceID].RecommendHub = selectHubs.Hubs[i] } } for i := range keys { if detail, ok := detailMap[keys[i]]; ok { - vs[i] = detail + vs[i] = *detail oks[i] = true } } diff --git a/client/internal/repl/ticktock.go b/client/internal/repl/ticktock.go index 5a9c69f..2ae40b2 100644 --- a/client/internal/repl/ticktock.go +++ b/client/internal/repl/ticktock.go @@ -9,6 +9,15 @@ func init() { } RootCmd.AddCommand(ttCmd) + lsCmd := &cobra.Command{ + Use: "ls", + Short: "list all jobs", + Run: func(cmd *cobra.Command, args []string) { + tickTockLs(GetCmdCtx(cmd)) + }, + } + ttCmd.AddCommand(lsCmd) + runCmd := &cobra.Command{ Use: "run [jobName]", Short: "run job now", @@ -20,6 +29,13 @@ func init() { ttCmd.AddCommand(runCmd) } +func tickTockLs(ctx *CommandContext) { + names := ctx.repl.tktk.GetJobNames() + for _, name := range names { + println(name) + } +} + func tickTockRun(ctx *CommandContext, jobName string) { ctx.repl.tktk.RunNow(jobName) } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 2d480dc..8f9204a 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -255,12 +255,12 @@ func (svc *ObjectService) Download(req downloader.DownloadReqeust) (*downloader. // 初始化下载过程 downloading, err := iter.MoveNext() - if downloading == nil { - return nil, fmt.Errorf("object %v not found", req.ObjectID) - } if err != nil { return nil, err } + if downloading.Object == nil { + return nil, fmt.Errorf("object %v not found", req.ObjectID) + } return downloading, nil } diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index 7d7d61d..d0c726f 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/go-co-op/gocron/v2" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" @@ -57,6 +58,10 @@ func (t *TickTock) Stop() { t.sch.Shutdown() } +func (t *TickTock) GetJobNames() []string { + return lo.Keys(t.jobs) +} + func (t *TickTock) RunNow(jobName string) { j, ok := t.jobs[jobName] if !ok { diff --git a/client/types/types.go b/client/types/types.go index 8d89f36..b4ef370 100644 --- a/client/types/types.go +++ b/client/types/types.go @@ -82,7 +82,7 @@ type UserSpace struct { ShardStore *cotypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json; serializer:json" json:"shardStore"` // 存储服务特性功能的配置 Features []cotypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` - // 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根。 + // 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根(除了BaseStore)。 WorkingDir string `gorm:"column:WorkingDir; type:varchar(1024); not null" json:"workingDir"` // 工作目录在存储系统中的真实路径。当工作路径在挂载点内时,这个字段记录的是挂载背后的真实路径。部分直接与存储系统交互的组件需要知道真实路径。 // RealWorkingDir string `gorm:"column:RealWorkingDir; type:varchar(1024); not null" json:"realWorkingDir"` diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index 4f63b6f..844ec24 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -18,6 +18,7 @@ import ( func init() { exec.UseOp[*BaseWrite]() exec.UseOp[*BaseRead]() + exec.UseOp[*BaseReadDyn]() } type BaseRead struct { @@ -62,7 +63,7 @@ func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *BaseRead) String() string { - return fmt.Sprintf("PublicRead %v:%v -> %v", o.UserSpace, o.Path, o.Output) + return fmt.Sprintf("BaseRead(opt=%v) %v:%v -> %v", o.Option, o.UserSpace, o.Path, o.Output) } type BaseReadDyn struct { @@ -97,6 +98,7 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { stream, err := store.Read(info.Path, o.Option) if err != nil { + logger.Warnf("reading file %v: %v", info.Path, err) return fmt.Errorf("reading object %v: %w", o.FileInfo, err) } @@ -112,7 +114,7 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *BaseReadDyn) String() string { - return fmt.Sprintf("BaseReadDyn %v:%v -> %v", o.UserSpace, o.FileInfo, o.Output) + return fmt.Sprintf("BaseReadDyn(opt=%v) %v:%v -> %v", o.Option, o.UserSpace, o.FileInfo, o.Output) } type BaseWrite struct { @@ -156,7 +158,7 @@ func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *BaseWrite) String() string { - return fmt.Sprintf("PublicWrite %v -> %v:%v", o.Input, o.UserSpace, o.Path) + return fmt.Sprintf("BaseWrite %v -> %v:%v, %v", o.Input, o.UserSpace, o.Path, o.FileInfo) } type BaseReadNode struct { diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 8dc2e2e..d98daa7 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -9,7 +9,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sync2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ec" @@ -50,9 +49,11 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { outputWrs[i] = wr } + /// !!! 缓冲区大小必须是ChunkSize大小,因为Chunk数据很有可能来自于一个被Split的完整文件,此时必须保证按顺序读取每一个Chunk的数据 !!! + inputChunks := make([][]byte, len(o.Inputs)) for i := range o.Inputs { - inputChunks[i] = make([]byte, math2.Min(o.ChunkSize, 64*1024)) + inputChunks[i] = make([]byte, o.ChunkSize) } // 输出用两个缓冲轮换 @@ -60,7 +61,7 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for i := 0; i < 2; i++ { outputChunks := make([][]byte, len(o.Outputs)) for i := range o.Outputs { - outputChunks[i] = make([]byte, math2.Min(o.ChunkSize, 64*1024)) + outputChunks[i] = make([]byte, o.ChunkSize) } outputBufPool.PutEmpty(outputChunks) } @@ -69,22 +70,12 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { go func() { mul := ec.GaloisMultiplier().BuildGalois() defer outputBufPool.Close() - - readLens := math2.SplitLessThan(o.ChunkSize, 64*1024) - readLenIdx := 0 - for { - curReadLen := readLens[readLenIdx] - for i := range inputChunks { - inputChunks[i] = inputChunks[i][:curReadLen] - } - err := sync2.ParallelDo(inputs, func(s *exec.StreamValue, i int) error { _, err := io.ReadFull(s.Stream, inputChunks[i]) return err }) if err == io.EOF { - fut.SetVoid() return } if err != nil { @@ -96,9 +87,6 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { if !ok { return } - for i := range outputBuf { - outputBuf[i] = outputBuf[i][:curReadLen] - } err = mul.Multiply(o.Coef, inputChunks, outputBuf) if err != nil { @@ -107,7 +95,6 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } outputBufPool.PutFilled(outputBuf) - readLenIdx = (readLenIdx + 1) % len(readLens) } }() @@ -117,6 +104,7 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for { outputChunks, ok := outputBufPool.GetFilled() if !ok { + fut.SetVoid() return } diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index d284a28..c402194 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -35,3 +35,7 @@ type FileInfoValue struct { func (v *FileInfoValue) Clone() exec.VarValue { return &FileInfoValue{v.FileInfo} } + +func init() { + exec.UseVarValue[*FileInfoValue]() +} diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index be14b55..aa58c26 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -11,6 +11,7 @@ import ( func init() { exec.UseOp[*S2STransfer]() + exec.UseOp[*S2STransferDyn]() } type S2STransfer struct { @@ -45,7 +46,12 @@ func (o *S2STransfer) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *S2STransfer) String() string { - return fmt.Sprintf("S2STransfer %v:%v -> %v:%v", o.SrcSpace.UserSpace.Storage.String(), o.SrcPath, o.DstSpace.UserSpace.Storage.String(), o.Output) + return fmt.Sprintf("S2STransfer %v:%v -> %v:%v, %v", + o.SrcSpace.UserSpace.Storage.String(), + o.SrcPath, + o.DstSpace.UserSpace.Storage.String(), + o.DstPath, + o.Output) } type S2STransferDyn struct { @@ -85,7 +91,12 @@ func (o *S2STransferDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error } func (o *S2STransferDyn) String() string { - return fmt.Sprintf("S2STransferDyn %v:%v -> %v:%v", o.SrcSpace.UserSpace.Storage.String(), o.SrcFileInfo, o.DstSpace.UserSpace.Storage.String(), o.Output) + return fmt.Sprintf("S2STransferDyn %v:%v -> %v:%v, %v", + o.SrcSpace.UserSpace.Storage.String(), + o.SrcFileInfo, + o.DstSpace.UserSpace.Storage.String(), + o.DstPath, + o.Output) } type S2STransferNode struct { diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 92200c8..75779b3 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -42,7 +42,7 @@ func (o *GetShardInfo) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *GetShardInfo) String() string { - return fmt.Sprintf("GetShardInfo(%v)", o.FileHash) + return fmt.Sprintf("GetShardInfo: %v:%v-> %v", o.UserSpace, o.FileHash, o.ShardInfo) } type StoreShard struct { @@ -79,7 +79,7 @@ func (o *StoreShard) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *StoreShard) String() string { - return fmt.Sprintf("StoreShard: addInfo=%v, shardInfo=%v", o.FileInfo, o.ShardInfo) + return fmt.Sprintf("StoreShard(space=%v): %v -> %v", o.UserSpace, o.FileInfo, o.ShardInfo) } type GetShardInfoNode struct { diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index df7d4cb..be6567a 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -113,7 +113,6 @@ func Extend(ctx *state.GenerateState) error { if err != nil { return err } - ctx.FromNodes[fr] = frNode ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{ Stream: frNode.Output().Var(), @@ -229,7 +228,6 @@ func Extend(ctx *state.GenerateState) error { if err != nil { return err } - ctx.ToNodes[to] = toNode str := findOutputStream(ctx, to.GetStreamIndex()) if str == nil { @@ -266,6 +264,8 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, types.DefaultOpen()) + getShard.FileInfoVar().ToSlot(read.FileInfoSlot()) + if f.StreamIndex.IsRaw() { read.Option.WithNullableLength(repRange.Offset, repRange.Length) } else if f.StreamIndex.IsEC() { @@ -393,10 +393,10 @@ func setEnvBySpace(n dag.Node, space *clitypes.UserSpaceDetail) error { switch addr := space.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, false) + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, true) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, false) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, true) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 620112a..9c91f85 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -123,7 +123,6 @@ func UseECMultiplier(ctx *state.GenerateState) { // 只有完全没有输出的ShardReadNode才可以被删除 if brNode.Output().Var().Dst.Len() == 0 { ctx.DAG.RemoveNode(brNode) - delete(ctx.FromNodes, brNode.From) } hbr := ctx.DAG.NewGetShardHTTPRequest(brNode.UserSpace, fromShards[i].FileHash) @@ -133,7 +132,6 @@ func UseECMultiplier(ctx *state.GenerateState) { for i, bwNode := range bwNodes { ctx.DAG.RemoveNode(bwNode) - delete(ctx.ToNodes, bwNode.To) for _, dstSlot := range bwNode.FileInfoVar().ListDstSlots() { callMul.FileInfoVar(i).ToSlot(dstSlot) diff --git a/common/pkgs/ioswitch2/parser/opt/misc.go b/common/pkgs/ioswitch2/parser/opt/misc.go index b0dd50c..5be5581 100644 --- a/common/pkgs/ioswitch2/parser/opt/misc.go +++ b/common/pkgs/ioswitch2/parser/opt/misc.go @@ -7,16 +7,38 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" ) -// 删除未使用的From流,不会删除FromDriver -func RemoveUnusedFromNode(ctx *state.GenerateState) { - dag.WalkOnlyType[ops2.FromNode](ctx.DAG.Graph, func(node ops2.FromNode) bool { - if _, ok := node.(*ops2.FromDriverNode); ok { - return true +// 删除未使用的BaseRead指令,如果是FromNode则会一并从FromNodes中删除。 +// 对于BaseReadDyn指令,如果它的上级是GetShardInfo,且只有一个输出,则会一并删除这个节点 +func RemoveUnusedBaseRead(ctx *state.GenerateState) { + dag.WalkOnlyType[*ops2.BaseReadNode](ctx.DAG.Graph, func(node *ops2.BaseReadNode) bool { + if node.Output().Var().Dst.Len() == 0 { + ctx.DAG.RemoveNode(node) } + return true + }) + dag.WalkOnlyType[*ops2.BaseReadDynNode](ctx.DAG.Graph, func(node *ops2.BaseReadDynNode) bool { if node.Output().Var().Dst.Len() == 0 { ctx.DAG.RemoveNode(node) + + srcVar := node.FileInfoSlot().Var() + if srcVar == nil { + return true + } + + srcVar.NotTo(node) + + // 暂时限定只处理GetShardInfo + _, ok := srcVar.Src.(*ops2.GetShardInfoNode) + if !ok { + return true + } + + if srcVar.Dst.Len() == 0 { + ctx.DAG.RemoveNode(srcVar.Src) + } } + return true }) } @@ -52,7 +74,9 @@ func StoreShardWriteResult(ctx *state.GenerateState) { // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func GenerateRange(ctx *state.GenerateState) { - for to, toNode := range ctx.ToNodes { + dag.WalkOnlyType[ops2.ToNode](ctx.DAG.Graph, func(toNode ops2.ToNode) bool { + to := toNode.GetTo() + toStrIdx := to.GetStreamIndex() toRng := to.GetRange() @@ -103,7 +127,9 @@ func GenerateRange(ctx *state.GenerateState) { // toInput.Var().NotTo(toNode, toInput.Index) // toNode.SetInput(rnged) } - } + + return true + }) } // 生成Clone指令 diff --git a/common/pkgs/ioswitch2/parser/opt/multipart.go b/common/pkgs/ioswitch2/parser/opt/multipart.go index 128f49a..0a8b9a6 100644 --- a/common/pkgs/ioswitch2/parser/opt/multipart.go +++ b/common/pkgs/ioswitch2/parser/opt/multipart.go @@ -97,7 +97,6 @@ func UseMultipartUploadToShardStore(ctx *state.GenerateState) { // 最后删除Join指令和ToShardStore指令 ctx.DAG.RemoveNode(joinNode) ctx.DAG.RemoveNode(bwNode) - delete(ctx.ToNodes, bwNode.GetTo()) return true }) } diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index 31f79dc..a279c86 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -1,7 +1,7 @@ package opt import ( - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory" @@ -14,133 +14,62 @@ func UseS2STransfer(ctx *state.GenerateState) { return } - for fr, frNode := range ctx.FromNodes { - switch fr := fr.(type) { - case *ioswitch2.FromShardStore: - s2sFromShardStore(ctx, fr, frNode) - case *ioswitch2.FromBaseStore: - s2sFromBaseStore(ctx, fr, frNode) + dag.WalkOnlyType[*ops2.BaseWriteNode](ctx.DAG.Graph, func(node *ops2.BaseWriteNode) bool { + inputVar := node.Input().Var() + if inputVar == nil { + return true } - } -} - -func s2sFromShardStore(ctx *state.GenerateState, fromShard *ioswitch2.FromShardStore, frNode ops2.FromNode) { - fromStgBld := factory.GetBuilder(&fromShard.UserSpace) - s2s, err := fromStgBld.CreateS2STransfer(true) - if err != nil { - return - } - - // 此输出流的所有目的地都要能支持S2S传输 - outVar := frNode.Output().Var() - if outVar.Dst.Len() == 0 { - return - } - - failed := false - var toBases []*ops2.BaseWriteNode - -loop: - for i := 0; i < outVar.Dst.Len(); i++ { - dstNode := outVar.Dst.Get(i) - - switch dstNode := dstNode.(type) { - case *ops2.BaseWriteNode: - if !s2s.CanTransfer(&fromShard.UserSpace, &dstNode.UserSpace) { - failed = true - break - } - - toBases = append(toBases, dstNode) - - default: - failed = true - break loop - } - } - if failed { - return - } - - for _, toBase := range toBases { - s2sNode := ctx.DAG.NewS2STransferDyn(fromShard.UserSpace, toBase.UserSpace, toBase.Path) - // 直传指令在目的地Hub上执行 - s2sNode.Env().CopyFrom(toBase.Env()) - // 先获取文件路径,送到S2S节点 - gsNode := ctx.DAG.NewGetShardInfo(fromShard.UserSpace, fromShard.FileHash) - gsNode.Env().ToEnvDriver(true) - gsNode.FileInfoVar().ToSlot(s2sNode.SrcFileInfoSlot()) - - // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar - for _, dstSlot := range toBase.FileInfoVar().ListDstSlots() { - s2sNode.FileInfoVar().ToSlot(dstSlot) + s2s, err := factory.GetBuilder(&node.UserSpace).CreateS2STransfer(true) + if err != nil { + return true } - // 从计划中删除目标节点 - ctx.DAG.RemoveNode(toBase) - delete(ctx.ToNodes, toBase.To) - } - - // 从计划中删除源节点 - ctx.DAG.RemoveNode(frNode) - delete(ctx.FromNodes, frNode.GetFrom()) -} + // 只有BaseRead->BaseWrite的情况才可以进行S2S传输 + switch inputNode := inputVar.Src.(type) { + case *ops2.BaseReadNode: + if !s2s.CanTransfer(&inputNode.UserSpace, &node.UserSpace) { + return true + } -func s2sFromBaseStore(ctx *state.GenerateState, fromBase *ioswitch2.FromBaseStore, frNode ops2.FromNode) { - fromStgBld := factory.GetBuilder(&fromBase.UserSpace) - s2s, err := fromStgBld.CreateS2STransfer(true) - if err != nil { - return - } + s2sNode := ctx.DAG.NewS2STransfer(inputNode.UserSpace, inputNode.Path, node.UserSpace, node.Path) + // 直传指令在目的地Hub上执行 + s2sNode.Env().CopyFrom(node.Env()) - // 此输出流的所有目的地都要能支持S2S传输 - outVar := frNode.Output().Var() - if outVar.Dst.Len() == 0 { - return - } + // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar + for _, dstSlot := range node.FileInfoVar().ListDstSlots() { + s2sNode.FileInfoVar().ToSlot(dstSlot) + } - failed := false - var toBases []*ops2.BaseWriteNode + case *ops2.BaseReadDynNode: + if !s2s.CanTransfer(&inputNode.UserSpace, &node.UserSpace) { + return true + } -loop: - for i := 0; i < outVar.Dst.Len(); i++ { - dstNode := outVar.Dst.Get(i) + s2sNode := ctx.DAG.NewS2STransferDyn(inputNode.UserSpace, node.UserSpace, node.Path) + // 直传指令在目的地Hub上执行 + s2sNode.Env().CopyFrom(node.Env()) - switch dstNode := dstNode.(type) { - case *ops2.BaseWriteNode: - if !s2s.CanTransfer(&fromBase.UserSpace, &dstNode.UserSpace) { - failed = true - break + // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar + for _, dstSlot := range node.FileInfoVar().ListDstSlots() { + s2sNode.FileInfoVar().ToSlot(dstSlot) } - toBases = append(toBases, dstNode) + // 传递给BaseReadDyn的FileInfo也给S2S一份 + srcFileInfoVar := inputNode.FileInfoSlot().Var() + if srcFileInfoVar != nil { + srcFileInfoVar.ToSlot(s2sNode.SrcFileInfoSlot()) + } default: - failed = true - break loop + return true } - } - if failed { - return - } - - for _, toBase := range toBases { - s2sNode := ctx.DAG.NewS2STransfer(fromBase.UserSpace, fromBase.Path, toBase.UserSpace, toBase.Path) - // 直传指令在目的地Hub上执行 - s2sNode.Env().CopyFrom(toBase.Env()) - // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar - for _, dstSlot := range toBase.FileInfoVar().ListDstSlots() { - s2sNode.FileInfoVar().ToSlot(dstSlot) - } + // 中断srcVar的流向 + inputVar.NotTo(node) // 从计划中删除目标节点 - ctx.DAG.RemoveNode(toBase) - delete(ctx.ToNodes, toBase.To) - } - - // 从计划中删除源节点 - ctx.DAG.RemoveNode(frNode) - delete(ctx.FromNodes, frNode.GetFrom()) + ctx.DAG.RemoveNode(node) + return true + }) } diff --git a/common/pkgs/ioswitch2/parser/opt/utils.go b/common/pkgs/ioswitch2/parser/opt/utils.go index c0674c0..e36dee4 100644 --- a/common/pkgs/ioswitch2/parser/opt/utils.go +++ b/common/pkgs/ioswitch2/parser/opt/utils.go @@ -17,10 +17,10 @@ func setEnvBySpace(n dag.Node, space *clitypes.UserSpaceDetail) error { switch addr := space.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, false) + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, true) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, false) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, true) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 05d6fc7..fbc985f 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -76,10 +76,10 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { } // 下面这些只需要执行一次,但需要按顺序 - opt.RemoveUnusedFromNode(state) - opt.UseECMultiplier(state) opt.UseS2STransfer(state) + opt.UseECMultiplier(state) opt.UseMultipartUploadToShardStore(state) + opt.RemoveUnusedBaseRead(state) opt.DropUnused(state) opt.StoreShardWriteResult(state) opt.GenerateRange(state) diff --git a/common/pkgs/ioswitch2/parser/state/state.go b/common/pkgs/ioswitch2/parser/state/state.go index 2fd63cc..725ee29 100644 --- a/common/pkgs/ioswitch2/parser/state/state.go +++ b/common/pkgs/ioswitch2/parser/state/state.go @@ -17,19 +17,15 @@ type GenerateState struct { DAG *ops2.GraphNodeBuilder // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 - ToNodes map[ioswitch2.To]ops2.ToNode - FromNodes map[ioswitch2.From]ops2.FromNode - IndexedStreams []IndexedStream StreamRange math2.Range + IndexedStreams []IndexedStream UseEC bool // 是否使用纠删码 UseSegment bool // 是否使用分段 } func InitGenerateState(ft ioswitch2.FromTo) *GenerateState { return &GenerateState{ - Ft: ft, - DAG: ops2.NewGraphNodeBuilder(), - ToNodes: make(map[ioswitch2.To]ops2.ToNode), - FromNodes: make(map[ioswitch2.From]ops2.FromNode), + Ft: ft, + DAG: ops2.NewGraphNodeBuilder(), } } diff --git a/common/pkgs/ioswitch2/plans/utils.go b/common/pkgs/ioswitch2/plans/utils.go index c58ab7e..e160b77 100644 --- a/common/pkgs/ioswitch2/plans/utils.go +++ b/common/pkgs/ioswitch2/plans/utils.go @@ -17,10 +17,10 @@ func setEnvBySpace(n dag.Node, space *clitypes.UserSpaceDetail) error { switch addr := space.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, false) + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, true) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, false) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, true) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/ioswitchlrc/ops2/base_store.go b/common/pkgs/ioswitchlrc/ops2/base_store.go index 8a55c1b..394343f 100644 --- a/common/pkgs/ioswitchlrc/ops2/base_store.go +++ b/common/pkgs/ioswitchlrc/ops2/base_store.go @@ -18,6 +18,7 @@ import ( func init() { exec.UseOp[*BaseWrite]() exec.UseOp[*BaseRead]() + exec.UseOp[*BaseReadDyn]() } type BaseRead struct { @@ -65,14 +66,14 @@ func (o *BaseRead) String() string { return fmt.Sprintf("PublicRead %v:%v -> %v", o.UserSpace, o.Path, o.Output) } -type BaseReadPathVar struct { +type BaseReadDyn struct { UserSpace clitypes.UserSpaceDetail Output exec.VarID Path exec.VarID Option types.OpenOption } -func (o *BaseReadPathVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { +func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Output", o.Output). WithField("UserSpace", o.UserSpace). @@ -111,7 +112,7 @@ func (o *BaseReadPathVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error return fut.Wait(ctx.Context) } -func (o *BaseReadPathVar) String() string { +func (o *BaseReadDyn) String() string { return fmt.Sprintf("BaseReadPathVar %v:%v -> %v", o.UserSpace, o.Path, o.Output) } @@ -159,56 +160,6 @@ func (o *BaseWrite) String() string { return fmt.Sprintf("PublicWrite %v -> %v:%v", o.Input, o.UserSpace, o.Path) } -// type BaseWriteTemp struct { -// Input exec.VarID -// UserSpace clitypes.UserSpaceDetail -// Path string -// Signal exec.VarID -// WriteResult exec.VarID -// } - -// func (o *BaseWriteTemp) Execute(ctx *exec.ExecContext, e *exec.Executor) error { -// logger. -// WithField("Input", o.Input). -// Debugf("write file to base store") -// defer logger.Debugf("write file to base store finished") - -// stgPool, err := exec.GetValueByType[*pool.Pool](ctx) -// if err != nil { -// return fmt.Errorf("getting storage pool: %w", err) -// } - -// _, err = exec.BindVar[*exec.SignalValue](e, ctx.Context, o.Signal) -// if err != nil { -// return err -// } - -// store, err := stgPool.GetBaseStore(&o.UserSpace) -// if err != nil { -// return fmt.Errorf("getting base store of storage %v: %w", o.UserSpace, err) -// } - -// input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) -// if err != nil { -// return err -// } -// defer input.Stream.Close() - -// ret, err := store.Write(o.Path, input.Stream) -// if err != nil { -// return err -// } - -// e.PutVar(o.WriteResult, &WriteResultValue{ -// WriteResult: ret, -// }) -// return nil -// } - -// func (o *BaseWriteTemp) String() string { -// return fmt.Sprintf("PublicWriteTemp[signal: %v] %v -> %v:%v", o.Signal, o.Input, o.UserSpace, o.Path) -// } - type BaseReadNode struct { dag.NodeBase From ioswitchlrc.From @@ -250,15 +201,15 @@ func (t *BaseReadNode) GenerateOp() (exec.Op, error) { }, nil } -type BaseReadPathVarNode struct { +type BaseReadDynNode struct { dag.NodeBase From ioswitchlrc.From UserSpace clitypes.UserSpaceDetail Option types.OpenOption } -func (b *GraphNodeBuilder) NewBaseReadPathVar(from ioswitchlrc.From, userSpace clitypes.UserSpaceDetail, opt types.OpenOption) *BaseReadPathVarNode { - node := &BaseReadPathVarNode{ +func (b *GraphNodeBuilder) NewBaseReadDyn(from ioswitchlrc.From, userSpace clitypes.UserSpaceDetail, opt types.OpenOption) *BaseReadDynNode { + node := &BaseReadDynNode{ From: from, UserSpace: userSpace, Option: opt, @@ -270,26 +221,26 @@ func (b *GraphNodeBuilder) NewBaseReadPathVar(from ioswitchlrc.From, userSpace c return node } -func (t *BaseReadPathVarNode) GetFrom() ioswitchlrc.From { +func (t *BaseReadDynNode) GetFrom() ioswitchlrc.From { return t.From } -func (t *BaseReadPathVarNode) Output() dag.StreamOutputSlot { +func (t *BaseReadDynNode) Output() dag.StreamOutputSlot { return dag.StreamOutputSlot{ Node: t, Index: 0, } } -func (t *BaseReadPathVarNode) PathVar() dag.ValueInputSlot { +func (t *BaseReadDynNode) PathVar() dag.ValueInputSlot { return dag.ValueInputSlot{ Node: t, Index: 0, } } -func (t *BaseReadPathVarNode) GenerateOp() (exec.Op, error) { - return &BaseReadPathVar{ +func (t *BaseReadDynNode) GenerateOp() (exec.Op, error) { + return &BaseReadDyn{ UserSpace: t.UserSpace, Output: t.Output().Var().VarID, Path: t.PathVar().Var().VarID, @@ -343,58 +294,3 @@ func (t *BaseWriteNode) GenerateOp() (exec.Op, error) { WriteResult: t.FileInfoVar().Var().VarID, }, nil } - -// type BaseWriteTempNode struct { -// dag.NodeBase -// To ioswitchlrc.To -// UserSpace clitypes.UserSpaceDetail -// Path string -// } - -// func (b *GraphNodeBuilder) NewBaseWriteTemp(to ioswitchlrc.To, userSpace clitypes.UserSpaceDetail, path string) *BaseWriteTempNode { -// node := &BaseWriteTempNode{ -// To: to, -// UserSpace: userSpace, -// Path: path, -// } -// b.AddNode(node) - -// node.InputStreams().Init(1) -// node.InputValues().Init(1) -// return node -// } - -// func (t *BaseWriteTempNode) GetTo() ioswitchlrc.To { -// return t.To -// } - -// func (t *BaseWriteTempNode) Input() dag.StreamInputSlot { -// return dag.StreamInputSlot{ -// Node: t, -// Index: 0, -// } -// } - -// func (t *BaseWriteTempNode) TargetSignal() dag.ValueInputSlot { -// return dag.ValueInputSlot{ -// Node: t, -// Index: 0, -// } -// } - -// func (t *BaseWriteTempNode) FileInfoVar() dag.ValueOutputSlot { -// return dag.ValueOutputSlot{ -// Node: t, -// Index: 0, -// } -// } - -// func (t *BaseWriteTempNode) GenerateOp() (exec.Op, error) { -// return &BaseWriteTemp{ -// Input: t.Input().Var().VarID, -// UserSpace: t.UserSpace, -// Path: t.Path, -// Signal: t.TargetSignal().Var().VarID, -// WriteResult: t.FileInfoVar().Var().VarID, -// }, nil -// } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index bca57c4..badb059 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -66,7 +66,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err getShard := ctx.DAG.NewGetShardInfo(f.UserSpace, f.FileHash) getShard.Env().ToEnvDriver(true) - read := ctx.DAG.NewBaseReadPathVar(f, f.UserSpace, types.DefaultOpen()) + read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, types.DefaultOpen()) if f.DataIndex == -1 { read.Option.WithNullableLength(repRange.Offset, repRange.Length) diff --git a/common/pkgs/ioswitchlrc/parser/utils.go b/common/pkgs/ioswitchlrc/parser/utils.go index 7d06caa..5bb2c38 100644 --- a/common/pkgs/ioswitchlrc/parser/utils.go +++ b/common/pkgs/ioswitchlrc/parser/utils.go @@ -17,10 +17,10 @@ func setEnvBySpace(n dag.Node, space *clitypes.UserSpaceDetail) error { switch addr := space.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, false) + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, true) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, false) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, true) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/rpc/utils.go b/common/pkgs/rpc/utils.go index 98a0f1b..5766701 100644 --- a/common/pkgs/rpc/utils.go +++ b/common/pkgs/rpc/utils.go @@ -235,6 +235,11 @@ func DownloadStreamServer[Resp DownloadStreamResp, Req any, APIRet DownloadStrea return makeCodeError(errorcode.OperationFailed, err.Error()) } + err = cw.Finish() + if err != nil { + return makeCodeError(errorcode.OperationFailed, err.Error()) + } + return nil } diff --git a/common/pkgs/storage/local/s2s.go b/common/pkgs/storage/local/s2s.go index bbc68f4..b3a1ffb 100644 --- a/common/pkgs/storage/local/s2s.go +++ b/common/pkgs/storage/local/s2s.go @@ -18,6 +18,10 @@ type S2STransfer struct { // 只有同一个机器的存储之间才可以进行数据直传 func (*S2STransfer) CanTransfer(src, dst *clitypes.UserSpaceDetail) bool { + if types.FindFeature[*cortypes.S2STransferFeature](dst) == nil { + return false + } + _, ok := src.UserSpace.Storage.(*cortypes.LocalType) if !ok { return false diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index edb337e..ff54b4e 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -14,27 +14,29 @@ import ( ) type ShardStore struct { - detail *clitypes.UserSpaceDetail - absRoot string - lock sync.Mutex - done chan any + detail *clitypes.UserSpaceDetail + stgRoot string + storeAbsRoot string + lock sync.Mutex + done chan any } func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) { - absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir)) + storeAbsRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir)) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) } return &ShardStore{ - detail: detail, - absRoot: absRoot, - done: make(chan any, 1), + detail: detail, + stgRoot: root, + storeAbsRoot: storeAbsRoot, + done: make(chan any, 1), }, nil } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("component start, root: %v, max size: %v", s.absRoot, s.detail.UserSpace.ShardStore.MaxSize) + s.getLogger().Infof("component start, root: %v, max size: %v", s.storeAbsRoot, s.detail.UserSpace.ShardStore.MaxSize) } func (s *ShardStore) Stop() { @@ -42,12 +44,14 @@ func (s *ShardStore) Stop() { } func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (types.FileInfo, error) { + fullTempPath := filepath.Join(s.stgRoot, path) + s.lock.Lock() defer s.lock.Unlock() log := s.getLogger() - log.Debugf("%v bypass uploaded, size: %v, hash: %v", path, size, hash) + log.Debugf("%v bypass uploaded, size: %v, hash: %v", fullTempPath, size, hash) blockDir := s.getFileDirFromHash(hash) err := os.MkdirAll(blockDir, 0755) @@ -59,9 +63,9 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ newPath := filepath.Join(blockDir, string(hash)) _, err = os.Stat(newPath) if os.IsNotExist(err) { - err = os.Rename(path, newPath) + err = os.Rename(fullTempPath, newPath) if err != nil { - log.Warnf("rename %v to %v: %v", path, newPath, err) + log.Warnf("rename %v to %v: %v", fullTempPath, newPath, err) return types.FileInfo{}, fmt.Errorf("rename file: %w", err) } @@ -90,7 +94,7 @@ func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) { return types.FileInfo{ Hash: hash, Size: info.Size(), - Path: filePath, + Path: s.getSlashFilePathFromHash(hash), }, nil } @@ -100,7 +104,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(s.storeAbsRoot, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -144,7 +148,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { cnt := 0 - err := filepath.WalkDir(s.absRoot, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(s.storeAbsRoot, func(path string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -196,11 +200,11 @@ func (s *ShardStore) getLogger() logger.Logger { } func (s *ShardStore) getFileDirFromHash(hash clitypes.FileHash) string { - return filepath.Join(s.absRoot, hash.GetHashPrefix(2)) + return filepath.Join(s.storeAbsRoot, hash.GetHashPrefix(2)) } func (s *ShardStore) getFilePathFromHash(hash clitypes.FileHash) string { - return filepath.Join(s.absRoot, hash.GetHashPrefix(2), string(hash)) + return filepath.Join(s.storeAbsRoot, hash.GetHashPrefix(2), string(hash)) } func (s *ShardStore) getSlashFilePathFromHash(hash clitypes.FileHash) string { diff --git a/coordinator/types/types.go b/coordinator/types/types.go index 664cc1b..a83b2cb 100644 --- a/coordinator/types/types.go +++ b/coordinator/types/types.go @@ -78,7 +78,7 @@ func (User) TableName() string { } type HubLocation struct { - HubID HubID `gorm:"column:HubID; primaryKey; type:bigint" json:"hubID"` + HubID HubID `gorm:"column:HubID; type:bigint" json:"hubID"` StorageName string `gorm:"column:StorageName; type:varchar(255); not null" json:"storageName"` Location string `gorm:"column:Location; type:varchar(255); not null" json:"location"` }