From ccc86b647a6cba856c993324e2a1842d6cad9916 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 23 May 2025 10:20:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=96=E6=B6=88Hub=E4=B8=8E=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E7=9B=B8=E5=85=B3=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/serve.go | 2 +- client/internal/cmdline/test.go | 2 +- client/internal/cmdline/vfstest.go | 2 +- client/internal/services/object.go | 1 + client/internal/services/service.go | 4 + client/internal/services/user_space.go | 85 +++++---- client/internal/ticktock/check_shardstore.go | 45 +++-- client/internal/ticktock/shardstore_gc.go | 48 +++--- client/internal/uploader/user_space_upload.go | 38 ++-- common/pkgs/ioswitch2/ops2/file.go | 145 ---------------- common/pkgs/ioswitch2/parser/gen/generator.go | 22 +-- common/pkgs/ioswitch2/parser/opt/ec.go | 8 +- common/pkgs/ioswitch2/parser/opt/misc.go | 2 +- common/pkgs/ioswitch2/parser/opt/s2s.go | 2 +- .../ioswitch2/plans/complete_multipart.go | 17 +- common/pkgs/ioswitchlrc/parser/passes.go | 19 +- common/pkgs/rpc/hub/cache.go | 2 + common/pkgs/rpc/hub/hub.pb.go | 78 ++++----- common/pkgs/rpc/hub/hub.proto | 6 - common/pkgs/rpc/hub/hub_grpc.pb.go | 162 +----------------- common/pkgs/rpc/hub/server.go | 4 +- common/pkgs/rpc/hub/user_space.go | 2 + hub/internal/rpc/cache.go | 2 + hub/internal/rpc/user_space.go | 2 + 24 files changed, 208 insertions(+), 492 deletions(-) delete mode 100644 common/pkgs/ioswitch2/ops2/file.go diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 6d45ab1..c57ed57 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -142,7 +142,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool) // HTTP接口 httpCfg := config.Cfg().HTTP diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index f2af776..12e0f10 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -133,7 +133,7 @@ func test(configPath string) { // 上传器 uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool) go func() { doTest(svc) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 607a877..0bc0ab0 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -126,7 +126,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool) // HTTP接口 httpCfg := config.Cfg().HTTP diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 8789b53..2d480dc 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -709,6 +709,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index } exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, svc.StgPool) ret, err := bld.Execute(exeCtx).Wait(context.Background()) if err != nil { return types.Object{}, err diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 164f9e8..65eade3 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" ) @@ -23,6 +24,7 @@ type Service struct { DB *db.DB EvtPub *sysevent.Publisher Mount *mount.Mount + StgPool *pool.Pool } func NewService( @@ -35,6 +37,7 @@ func NewService( db *db.DB, evtPub *sysevent.Publisher, mount *mount.Mount, + stgPool *pool.Pool, ) *Service { return &Service{ PubLock: publock, @@ -46,5 +49,6 @@ func NewService( DB: db, EvtPub: evtPub, Mount: mount, + StgPool: stgPool, } } diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 9ac9011..5680d3f 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/trie" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" @@ -19,7 +18,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" - hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) @@ -103,8 +101,9 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users for _, obj := range details { svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1) } - - drv := plans.Execute(exec.NewExecContext()) + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, svc.StgPool) + drv := plans.Execute(exeCtx) _, err = drv.Wait(context.Background()) if err != nil { return err @@ -119,24 +118,32 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace not found: %d", srcSpaceID) } - srcAddr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - if !ok { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID) + srcStore, err := svc.StgPool.GetBaseStore(srcSpace) + if err != nil { + return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get source userspace store: %w", err) } - srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, srcAddr)) - defer srcSpaceCli.Release() + // srcAddr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) + // if !ok { + // return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID) + // } + // srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, srcAddr)) + // defer srcSpaceCli.Release() dstSpace := svc.UserSpaceMeta.Get(dstSpaceID) if dstSpace == nil { return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace not found: %d", dstSpaceID) } - dstAddr, ok := dstSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - if !ok { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID) + dstStore, err := svc.StgPool.GetBaseStore(dstSpace) + if err != nil { + return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get destination userspace store: %w", err) } - dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&dstSpace.RecommendHub, dstAddr)) - defer dstSpaceCli.Release() + // dstAddr, ok := dstSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) + // if !ok { + // return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID) + // } + // dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&dstSpace.RecommendHub, dstAddr)) + // defer dstSpaceCli.Release() srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator) dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator) @@ -149,19 +156,16 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination path is empty") } - listAllResp, cerr := srcSpaceCli.BaseStoreListAll(context.Background(), &hubrpc.BaseStoreListAll{ - UserSpace: *srcSpace, - Path: srcPath, - }) + entries, cerr := srcStore.ListAll(srcPath) if cerr != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr.ToError()) + return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr) } srcPathComps := clitypes.SplitObjectPath(srcPath) srcDirCompLen := len(srcPathComps) - 1 entryTree := trie.NewTrie[*types.ListEntry]() - for _, e := range listAllResp.Entries { + for _, e := range entries { pa, ok := strings.CutSuffix(e.Path, clitypes.ObjectPathSeparator) comps := clitypes.SplitObjectPath(pa) e.Path = pa @@ -221,8 +225,9 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa logger.Warnf("s2s: parse plan of file %v: %v", f, err) continue } - - _, cerr := plans.Execute(exec.NewExecContext()).Wait(context.Background()) + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, svc.StgPool) + _, cerr := plans.Execute(exeCtx).Wait(context.Background()) if cerr != nil { failed = append(failed, f) logger.Warnf("s2s: execute plan of file %v: %v", f, cerr) @@ -237,22 +242,30 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa newDirPathes = append(newDirPathes, strings.Replace(dirPathes[i], srcPath, dstPath, 1)) } - mkdirResp, err := dstSpaceCli.BaseStoreMkdirs(context.Background(), &hubrpc.BaseStoreMkdirs{ - UserSpace: *dstSpace, - Pathes: newDirPathes, - }) - if err != nil { - failed = append(failed, dirPathes...) - logger.Warnf("s2s: mkdirs to destination userspace: %v", err) - } else { - for i := range dirPathes { - if mkdirResp.Successes[i] { - success = append(success, dirPathes[i]) - } else { - failed = append(failed, dirPathes[i]) - } + for _, d := range newDirPathes { + // mkdirResp, err := dstStore.Mkdir(context.Background(), &hubrpc.BaseStoreMkdirs{ + // UserSpace: *dstSpace, + // Pathes: newDirPathes, + // }) + err := dstStore.Mkdir(d) + if err != nil { + failed = append(failed, d) + } else { + success = append(success, d) } } + // if err != nil { + // failed = append(failed, dirPathes...) + // logger.Warnf("s2s: mkdirs to destination userspace: %v", err) + // } else { + // for i := range dirPathes { + // if mkdirResp.Successes[i] { + // success = append(success, dirPathes[i]) + // } else { + // failed = append(failed, dirPathes[i]) + // } + // } + // } return clitypes.SpaceToSpaceResult{ Success: success, diff --git a/client/internal/ticktock/check_shardstore.go b/client/internal/ticktock/check_shardstore.go index b56cc0c..b99b7e4 100644 --- a/client/internal/ticktock/check_shardstore.go +++ b/client/internal/ticktock/check_shardstore.go @@ -1,7 +1,6 @@ package ticktock import ( - "context" "fmt" "time" @@ -11,9 +10,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) // CheckShardStore 代表一个用于处理代理缓存检查事件的结构体 @@ -56,24 +53,36 @@ func (j *CheckShardStore) Execute(t *TickTock) { } func (j *CheckShardStore) checkOne(t *TickTock, space *clitypes.UserSpaceDetail) error { - addr, ok := space.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - if !ok { - return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) + // addr, ok := space.RecommendHub.Address.(*cortypes.GRPCAddressInfo) + // if !ok { + // return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) + // } + // agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) + // defer agtCli.Release() + + // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + // defer cancel() + + // checkResp, cerr := agtCli.CheckCache(ctx, &hubrpc.CheckCache{ + // UserSpace: *space, + // }) + // if cerr != nil { + // return fmt.Errorf("request to check cache: %w", cerr.ToError()) + // } + + store, err := t.stgPool.GetShardStore(space) + if err != nil { + return fmt.Errorf("getting shard store: %w", err) } - agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) - defer agtCli.Release() - - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - defer cancel() - checkResp, cerr := agtCli.CheckCache(ctx, &hubrpc.CheckCache{ - UserSpace: *space, - }) - if cerr != nil { - return fmt.Errorf("request to check cache: %w", cerr.ToError()) + infos, err := store.ListAll() + if err != nil { + return fmt.Errorf("listing all files: %w", err) } - realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true }) + fileHashes := lo.Map(infos, func(info stgtypes.FileInfo, _ int) clitypes.FileHash { return info.Hash }) + + realFileHashes := lo.SliceToMap(fileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true }) // 在事务中执行缓存更新操作 t.db.DoTx(func(tx db.SQLContext) error { diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index 5ab4c04..31441b6 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -1,7 +1,6 @@ package ticktock import ( - "context" "fmt" "time" @@ -9,11 +8,8 @@ import ( "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/types" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" - hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" ) type ShardStoreGC struct { @@ -86,24 +82,36 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { return err } - // 获取与节点通信的代理客户端 - addr, ok := space.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - if !ok { - return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) + store, err := t.stgPool.GetShardStore(space) + if err != nil { + return fmt.Errorf("getting shard store: %w", err) } - agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) - defer agtCli.Release() - - // 向代理发送垃圾回收请求 - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - defer cancel() - _, cerr := agtCli.CacheGC(ctx, &hubrpc.CacheGC{ - UserSpace: *space, - Availables: allFileHashes, - }) - if cerr != nil { - return fmt.Errorf("request to cache gc: %w", cerr.ToError()) + err = store.GC(allFileHashes) + if err != nil { + return fmt.Errorf("gc shard store: %w", err) } + return nil + + // // 获取与节点通信的代理客户端 + // addr, ok := space.RecommendHub.Address.(*cortypes.GRPCAddressInfo) + // if !ok { + // return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) + // } + // agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) + // defer agtCli.Release() + + // // 向代理发送垃圾回收请求 + // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + // defer cancel() + + // _, cerr := agtCli.CacheGC(ctx, &hubrpc.CacheGC{ + // UserSpace: *space, + // Availables: allFileHashes, + // }) + // if cerr != nil { + // return fmt.Errorf("request to cache gc: %w", cerr.ToError()) + // } + // return nil } diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index c75954d..f3115c6 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -16,7 +16,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" - hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -90,24 +89,35 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity) - addr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - if !ok { + // addr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) + // if !ok { + // delPkg() + // return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace) + // } + // srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, addr)) + // defer srcHubCli.Release() + + // listAllResp, cerr := srcHubCli.BaseStoreListAll(context.Background(), &hubrpc.BaseStoreListAll{ + // UserSpace: *srcSpace, + // Path: rootPath, + // }) + // if cerr != nil { + // delPkg() + // return nil, fmt.Errorf("listing base store: %w", cerr.ToError()) + // } + + store, err := u.stgPool.GetBaseStore(srcSpace) + if err != nil { delPkg() - return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace) + return nil, fmt.Errorf("getting base store: %w", err) } - srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, addr)) - defer srcHubCli.Release() - - listAllResp, cerr := srcHubCli.BaseStoreListAll(context.Background(), &hubrpc.BaseStoreListAll{ - UserSpace: *srcSpace, - Path: rootPath, - }) - if cerr != nil { + entries, err := store.ListAll(rootPath) + if err != nil { delPkg() - return nil, fmt.Errorf("listing base store: %w", cerr.ToError()) + return nil, fmt.Errorf("listing base store: %w", err) } - adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath) + adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) if err != nil { delPkg() return nil, fmt.Errorf("uploading from base store: %w", err) diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go deleted file mode 100644 index edd1fff..0000000 --- a/common/pkgs/ioswitch2/ops2/file.go +++ /dev/null @@ -1,145 +0,0 @@ -package ops2 - -import ( - "fmt" - "io" - "os" - "path" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/utils/io2" -) - -func init() { - exec.UseOp[*FileRead]() - exec.UseOp[*FileWrite]() -} - -type FileWrite struct { - Input exec.VarID `json:"input"` - FilePath string `json:"filePath"` -} - -func (o *FileWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) - if err != nil { - return err - } - defer input.Stream.Close() - - dir := path.Dir(o.FilePath) - err = os.MkdirAll(dir, 0777) - if err != nil { - return fmt.Errorf("mkdir: %w", err) - } - - file, err := os.Create(o.FilePath) - if err != nil { - return fmt.Errorf("opening file: %w", err) - } - defer file.Close() - - _, err = io.Copy(file, input.Stream) - if err != nil { - return fmt.Errorf("copying data to file: %w", err) - } - - return nil -} - -func (o *FileWrite) String() string { - return fmt.Sprintf("FileWrite %v -> %s", o.Input, o.FilePath) -} - -type FileRead struct { - Output exec.VarID `json:"output"` - FilePath string `json:"filePath"` -} - -func (o *FileRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - file, err := os.Open(o.FilePath) - if err != nil { - return fmt.Errorf("opening file: %w", err) - } - - fut := future.NewSetVoid() - e.PutVar(o.Output, &exec.StreamValue{ - Stream: io2.AfterReadClosed(file, func(closer io.ReadCloser) { - fut.SetVoid() - }), - }) - fut.Wait(ctx.Context) - - return nil -} - -func (o *FileRead) String() string { - return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output) -} - -type FileReadNode struct { - dag.NodeBase - FilePath string -} - -func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { - node := &FileReadNode{ - FilePath: filePath, - } - b.AddNode(node) - node.OutputStreams().Init(node, 1) - return node -} - -func (t *FileReadNode) Output() dag.StreamOutputSlot { - return dag.StreamOutputSlot{ - Node: t, - Index: 0, - } -} - -func (t *FileReadNode) GenerateOp() (exec.Op, error) { - return &FileRead{ - Output: t.OutputStreams().Get(0).VarID, - FilePath: t.FilePath, - }, nil -} - -// func (t *FileReadType) String() string { -// return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) -// } - -type FileWriteNode struct { - dag.NodeBase - FilePath string -} - -func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { - node := &FileWriteNode{ - FilePath: filePath, - } - b.AddNode(node) - - node.InputStreams().Init(1) - return node -} - -func (t *FileWriteNode) Input() dag.StreamOutputSlot { - return dag.StreamOutputSlot{ - Node: t, - Index: 0, - } -} - -func (t *FileWriteNode) SetInput(str *dag.StreamVar) { - str.To(t, 0) -} - -func (t *FileWriteNode) GenerateOp() (exec.Op, error) { - return &FileWrite{ - Input: t.InputStreams().Get(0).VarID, - FilePath: t.FilePath, - }, nil -} diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index 01e2b1f..77d25e6 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -261,8 +261,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e switch f := f.(type) { case *ioswitch2.FromShardStore: getShard := ctx.DAG.NewGetShardInfo(f.UserSpace, f.FileHash) - getShard.Env().ToEnvDriver() - getShard.Env().Pinned = true + getShard.Env().ToEnvDriver(true) read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, types.DefaultOpen()) @@ -293,12 +292,10 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e switch addr := f.UserSpace.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - read.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.UserSpace.RecommendHub}) - read.Env().Pinned = true + read.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.UserSpace.RecommendHub}, false) case *cortypes.GRPCAddressInfo: - read.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: f.UserSpace.RecommendHub, Address: *addr}) - read.Env().Pinned = true + read.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: f.UserSpace.RecommendHub, Address: *addr}, false) default: return nil, fmt.Errorf("unsupported node address type %T", addr) @@ -308,8 +305,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e case *ioswitch2.FromDriver: n := ctx.DAG.NewFromDriver(f, f.Handle) - n.Env().ToEnvDriver() - n.Env().Pinned = true + n.Env().ToEnvDriver(true) if f.StreamIndex.IsRaw() { f.Handle.RangeHint.Offset = repRange.Offset @@ -367,8 +363,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) write.Env().Pinned = true add := ctx.DAG.NewStoreShard(t.Space, t.ResultStoreKey) - add.Env().ToEnvDriver() - add.Env().Pinned = true + add.Env().ToEnvDriver(true) write.FileInfoVar().ToSlot(add.FileInfoSlot()) @@ -376,8 +371,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) case *ioswitch2.ToDriver: n := ctx.DAG.NewToDriver(t, t.Handle) - n.Env().ToEnvDriver() - n.Env().Pinned = true + n.Env().ToEnvDriver(true) return n, nil @@ -400,10 +394,10 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) func setEnvByAddress(n dag.Node, hub cortypes.Hub, addr cortypes.HubAddressInfo) error { switch addr := addr.(type) { case *cortypes.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}) + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}, false) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: hub, Address: *addr}) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: hub, Address: *addr}, false) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 85a0c78..06d3e89 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -116,12 +116,10 @@ func UseECMultiplier(ctx *state.GenerateState) { callMul := ctx.DAG.NewCallECMultiplier(to.Space) switch addr := to.Space.RecommendHub.Address.(type) { case *cortypes.HttpAddressInfo: - callMul.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: to.Space.RecommendHub}) - callMul.Env().Pinned = true + callMul.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: to.Space.RecommendHub}, false) case *cortypes.GRPCAddressInfo: - callMul.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: to.Space.RecommendHub, Address: *addr}) - callMul.Env().Pinned = true + callMul.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: to.Space.RecommendHub, Address: *addr}, false) default: return true @@ -137,7 +135,7 @@ func UseECMultiplier(ctx *state.GenerateState) { } hbr := ctx.DAG.NewGetShardHTTPRequest(brNode.UserSpace, fromShards[i].FileHash) - hbr.Env().CopyFrom(brNode.Env()) + hbr.Env().ToEnvDriver(true) hbr.HTTPRequestVar().ToSlot(callMul.HTTPRequestSlot(i)) } diff --git a/common/pkgs/ioswitch2/parser/opt/misc.go b/common/pkgs/ioswitch2/parser/opt/misc.go index 1860500..b0dd50c 100644 --- a/common/pkgs/ioswitch2/parser/opt/misc.go +++ b/common/pkgs/ioswitch2/parser/opt/misc.go @@ -43,7 +43,7 @@ func StoreShardWriteResult(ctx *state.GenerateState) { } storeNode := ctx.DAG.NewStore() - storeNode.Env().ToEnvDriver() + storeNode.Env().ToEnvDriver(true) storeNode.Store(n.ShardInfoKey, n.ShardInfoVar().Var()) return true diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index 7b20b0c..31f79dc 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -69,7 +69,7 @@ loop: // 先获取文件路径,送到S2S节点 gsNode := ctx.DAG.NewGetShardInfo(fromShard.UserSpace, fromShard.FileHash) - gsNode.Env().CopyFrom(toBase.Env()) + gsNode.Env().ToEnvDriver(true) gsNode.FileInfoVar().ToSlot(s2sNode.SrcFileInfoSlot()) // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index be5b404..2860a0b 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -17,17 +17,14 @@ func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.Use sizes[i] = blk.Size } joinNode := da.NewSegmentJoin(sizes) - joinNode.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub)) - joinNode.Env().Pinned = true + joinNode.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub), true) for i, blk := range blocks { gs := da.NewGetShardInfo(blockSpaces[i], blk.FileHash) - gs.Env().ToEnvDriver() - gs.Env().Pinned = true + gs.Env().ToEnvDriver(true) br := da.NewBaseReadDyn(nil, blockSpaces[i], types.DefaultOpen()) - br.Env().ToEnvWorker(getWorkerInfo(blockSpaces[i].RecommendHub)) - br.Env().Pinned = true + br.Env().ToEnvWorker(getWorkerInfo(blockSpaces[i].RecommendHub), true) gs.FileInfoVar().ToSlot(br.FileInfoSlot()) @@ -36,18 +33,16 @@ func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.Use // TODO 应该采取更合理的方式同时支持Parser和直接生成DAG br := da.NewBaseWrite(nil, targetSpace, os2.GenerateRandomFileName(20)) - br.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub)) - br.Env().Pinned = true + br.Env().ToEnvWorker(getWorkerInfo(targetSpace.RecommendHub), true) as := da.NewStoreShard(targetSpace, shardInfoKey) - as.Env().ToEnvDriver() - as.Env().Pinned = true + as.Env().ToEnvDriver(true) joinNode.Joined().ToSlot(br.Input()) if shardInfoKey != "" { store := da.NewStore() - store.Env().ToEnvDriver() + store.Env().ToEnvDriver(true) store.Store(shardInfoKey, as.ShardInfoVar().Var()) } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 034054f..65bb909 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -65,8 +65,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err switch f := f.(type) { case *ioswitchlrc.FromNode: getShard := ctx.DAG.NewGetShardInfo(f.Space, f.FileHash) - getShard.Env().ToEnvDriver() - getShard.Env().Pinned = true + getShard.Env().ToEnvDriver(true) read := ctx.DAG.NewBaseReadPathVar(f, f.Space, types.DefaultOpen()) @@ -77,15 +76,13 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err } // TODO2 支持HTTP协议 - getShard.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: f.Space.RecommendHub, Address: *f.Space.RecommendHub.Address.(*cortypes.GRPCAddressInfo)}) - getShard.Env().Pinned = true + getShard.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: f.Space.RecommendHub, Address: *f.Space.RecommendHub.Address.(*cortypes.GRPCAddressInfo)}, false) return read, nil case *ioswitchlrc.FromDriver: n := ctx.DAG.NewFromDriver(f.Handle) - n.Env().ToEnvDriver() - n.Env().Pinned = true + n.Env().ToEnvDriver(true) if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -113,7 +110,7 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub}) // TODO2 支持HTTP协议 case *cortypes.GRPCAddressInfo: - write.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: t.Space.RecommendHub, Address: *addr}) + write.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: t.Space.RecommendHub, Address: *addr}, false) default: return nil, fmt.Errorf("unsupported node address type %T", addr) @@ -122,8 +119,7 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { write.Env().Pinned = true add := ctx.DAG.NewStoreShard(t.Space, t.FileHashStoreKey) - add.Env().ToEnvDriver() - add.Env().Pinned = true + add.Env().ToEnvDriver(true) write.FileInfoVar().ToSlot(add.FileInfoSlot()) @@ -131,8 +127,7 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { case *ioswitchlrc.ToDriver: n := ctx.DAG.NewToDriver(t.Handle) - n.Env().ToEnvDriver() - n.Env().Pinned = true + n.Env().ToEnvDriver(true) return n, nil @@ -226,7 +221,7 @@ func storeIPFSWriteResult(ctx *GenerateContext) { } storeNode := ctx.DAG.NewStore() - storeNode.Env().ToEnvDriver() + storeNode.Env().ToEnvDriver(true) storeNode.Store(n.ShardInfoKey, n.ShardInfoVar().Var()) return true diff --git a/common/pkgs/rpc/hub/cache.go b/common/pkgs/rpc/hub/cache.go index d74281f..c301e21 100644 --- a/common/pkgs/rpc/hub/cache.go +++ b/common/pkgs/rpc/hub/cache.go @@ -1,5 +1,6 @@ package hubrpc +/* import ( "context" @@ -46,3 +47,4 @@ func (c *Client) CacheGC(ctx context.Context, req *CacheGC) (*CacheGCResp, *rpc. func (s *Server) CacheGC(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { return rpc.UnaryServer(s.svrImpl.CacheGC, ctx, req) } +*/ diff --git a/common/pkgs/rpc/hub/hub.pb.go b/common/pkgs/rpc/hub/hub.pb.go index 6d2921e..9f92568 100644 --- a/common/pkgs/rpc/hub/hub.pb.go +++ b/common/pkgs/rpc/hub/hub.pb.go @@ -26,7 +26,7 @@ var file_pkgs_rpc_hub_hub_proto_rawDesc = []byte{ 0x0a, 0x16, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x68, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xf0, 0x03, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2c, 0x0a, 0x0d, + 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xb8, 0x02, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2c, 0x0a, 0x0d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0c, 0x53, 0x65, @@ -41,28 +41,16 @@ var file_pkgs_rpc_hub_hub_proto_rawDesc = []byte{ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x49, 0x4f, 0x56, 0x61, 0x72, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x31, 0x0a, 0x12, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x74, 0x6f, 0x72, 0x65, - 0x4c, 0x69, 0x73, 0x74, 0x41, 0x6c, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x74, - 0x6f, 0x72, 0x65, 0x4d, 0x6b, 0x64, 0x69, 0x72, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, + 0x65, 0x12, 0x23, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x0a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x43, - 0x61, 0x63, 0x68, 0x65, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x26, 0x0a, 0x07, 0x43, 0x61, 0x63, 0x68, 0x65, 0x47, 0x43, 0x12, 0x0c, 0x2e, 0x72, - 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x04, 0x50, 0x69, 0x6e, - 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, - 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, - 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, - 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, - 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x72, - 0x70, 0x63, 0x3b, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, + 0x74, 0x65, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, + 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, + 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, + 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x3b, 0x68, 0x75, 0x62, 0x72, 0x70, + 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_pkgs_rpc_hub_hub_proto_goTypes = []any{ @@ -71,33 +59,25 @@ var file_pkgs_rpc_hub_hub_proto_goTypes = []any{ (*rpc.Response)(nil), // 2: rpc.Response } var file_pkgs_rpc_hub_hub_proto_depIdxs = []int32{ - 0, // 0: hubrpc.Hub.ExecuteIOPlan:input_type -> rpc.Request - 1, // 1: hubrpc.Hub.SendIOStream:input_type -> rpc.ChunkedData - 0, // 2: hubrpc.Hub.GetIOStream:input_type -> rpc.Request - 0, // 3: hubrpc.Hub.SendIOVar:input_type -> rpc.Request - 0, // 4: hubrpc.Hub.GetIOVar:input_type -> rpc.Request - 0, // 5: hubrpc.Hub.BaseStoreListAll:input_type -> rpc.Request - 0, // 6: hubrpc.Hub.BaseStoreMkdirs:input_type -> rpc.Request - 0, // 7: hubrpc.Hub.CheckCache:input_type -> rpc.Request - 0, // 8: hubrpc.Hub.CacheGC:input_type -> rpc.Request - 0, // 9: hubrpc.Hub.Ping:input_type -> rpc.Request - 0, // 10: hubrpc.Hub.GetState:input_type -> rpc.Request - 2, // 11: hubrpc.Hub.ExecuteIOPlan:output_type -> rpc.Response - 2, // 12: hubrpc.Hub.SendIOStream:output_type -> rpc.Response - 1, // 13: hubrpc.Hub.GetIOStream:output_type -> rpc.ChunkedData - 2, // 14: hubrpc.Hub.SendIOVar:output_type -> rpc.Response - 2, // 15: hubrpc.Hub.GetIOVar:output_type -> rpc.Response - 2, // 16: hubrpc.Hub.BaseStoreListAll:output_type -> rpc.Response - 2, // 17: hubrpc.Hub.BaseStoreMkdirs:output_type -> rpc.Response - 2, // 18: hubrpc.Hub.CheckCache:output_type -> rpc.Response - 2, // 19: hubrpc.Hub.CacheGC:output_type -> rpc.Response - 2, // 20: hubrpc.Hub.Ping:output_type -> rpc.Response - 2, // 21: hubrpc.Hub.GetState:output_type -> rpc.Response - 11, // [11:22] is the sub-list for method output_type - 0, // [0:11] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: hubrpc.Hub.ExecuteIOPlan:input_type -> rpc.Request + 1, // 1: hubrpc.Hub.SendIOStream:input_type -> rpc.ChunkedData + 0, // 2: hubrpc.Hub.GetIOStream:input_type -> rpc.Request + 0, // 3: hubrpc.Hub.SendIOVar:input_type -> rpc.Request + 0, // 4: hubrpc.Hub.GetIOVar:input_type -> rpc.Request + 0, // 5: hubrpc.Hub.Ping:input_type -> rpc.Request + 0, // 6: hubrpc.Hub.GetState:input_type -> rpc.Request + 2, // 7: hubrpc.Hub.ExecuteIOPlan:output_type -> rpc.Response + 2, // 8: hubrpc.Hub.SendIOStream:output_type -> rpc.Response + 1, // 9: hubrpc.Hub.GetIOStream:output_type -> rpc.ChunkedData + 2, // 10: hubrpc.Hub.SendIOVar:output_type -> rpc.Response + 2, // 11: hubrpc.Hub.GetIOVar:output_type -> rpc.Response + 2, // 12: hubrpc.Hub.Ping:output_type -> rpc.Response + 2, // 13: hubrpc.Hub.GetState:output_type -> rpc.Response + 7, // [7:14] is the sub-list for method output_type + 0, // [0:7] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_pkgs_rpc_hub_hub_proto_init() } diff --git a/common/pkgs/rpc/hub/hub.proto b/common/pkgs/rpc/hub/hub.proto index 58d86c5..2d5f5c4 100644 --- a/common/pkgs/rpc/hub/hub.proto +++ b/common/pkgs/rpc/hub/hub.proto @@ -14,12 +14,6 @@ service Hub { rpc SendIOVar(rpc.Request)returns(rpc.Response); rpc GetIOVar(rpc.Request)returns(rpc.Response); - rpc BaseStoreListAll(rpc.Request) returns(rpc.Response); - rpc BaseStoreMkdirs(rpc.Request) returns(rpc.Response); - - rpc CheckCache(rpc.Request) returns(rpc.Response); - rpc CacheGC(rpc.Request) returns(rpc.Response); - rpc Ping(rpc.Request) returns(rpc.Response); rpc GetState(rpc.Request) returns(rpc.Response); } \ No newline at end of file diff --git a/common/pkgs/rpc/hub/hub_grpc.pb.go b/common/pkgs/rpc/hub/hub_grpc.pb.go index 075d899..177a722 100644 --- a/common/pkgs/rpc/hub/hub_grpc.pb.go +++ b/common/pkgs/rpc/hub/hub_grpc.pb.go @@ -20,17 +20,13 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Hub_ExecuteIOPlan_FullMethodName = "/hubrpc.Hub/ExecuteIOPlan" - Hub_SendIOStream_FullMethodName = "/hubrpc.Hub/SendIOStream" - Hub_GetIOStream_FullMethodName = "/hubrpc.Hub/GetIOStream" - Hub_SendIOVar_FullMethodName = "/hubrpc.Hub/SendIOVar" - Hub_GetIOVar_FullMethodName = "/hubrpc.Hub/GetIOVar" - Hub_BaseStoreListAll_FullMethodName = "/hubrpc.Hub/BaseStoreListAll" - Hub_BaseStoreMkdirs_FullMethodName = "/hubrpc.Hub/BaseStoreMkdirs" - Hub_CheckCache_FullMethodName = "/hubrpc.Hub/CheckCache" - Hub_CacheGC_FullMethodName = "/hubrpc.Hub/CacheGC" - Hub_Ping_FullMethodName = "/hubrpc.Hub/Ping" - Hub_GetState_FullMethodName = "/hubrpc.Hub/GetState" + Hub_ExecuteIOPlan_FullMethodName = "/hubrpc.Hub/ExecuteIOPlan" + Hub_SendIOStream_FullMethodName = "/hubrpc.Hub/SendIOStream" + Hub_GetIOStream_FullMethodName = "/hubrpc.Hub/GetIOStream" + Hub_SendIOVar_FullMethodName = "/hubrpc.Hub/SendIOVar" + Hub_GetIOVar_FullMethodName = "/hubrpc.Hub/GetIOVar" + Hub_Ping_FullMethodName = "/hubrpc.Hub/Ping" + Hub_GetState_FullMethodName = "/hubrpc.Hub/GetState" ) // HubClient is the client API for Hub service. @@ -42,10 +38,6 @@ type HubClient interface { GetIOStream(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (Hub_GetIOStreamClient, error) SendIOVar(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetIOVar(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) - BaseStoreListAll(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) - BaseStoreMkdirs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) - CheckCache(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) - CacheGC(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) Ping(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetState(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) } @@ -151,42 +143,6 @@ func (c *hubClient) GetIOVar(ctx context.Context, in *rpc.Request, opts ...grpc. return out, nil } -func (c *hubClient) BaseStoreListAll(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { - out := new(rpc.Response) - err := c.cc.Invoke(ctx, Hub_BaseStoreListAll_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) BaseStoreMkdirs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { - out := new(rpc.Response) - err := c.cc.Invoke(ctx, Hub_BaseStoreMkdirs_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) CheckCache(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { - out := new(rpc.Response) - err := c.cc.Invoke(ctx, Hub_CheckCache_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *hubClient) CacheGC(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { - out := new(rpc.Response) - err := c.cc.Invoke(ctx, Hub_CacheGC_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *hubClient) Ping(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { out := new(rpc.Response) err := c.cc.Invoke(ctx, Hub_Ping_FullMethodName, in, out, opts...) @@ -214,10 +170,6 @@ type HubServer interface { GetIOStream(*rpc.Request, Hub_GetIOStreamServer) error SendIOVar(context.Context, *rpc.Request) (*rpc.Response, error) GetIOVar(context.Context, *rpc.Request) (*rpc.Response, error) - BaseStoreListAll(context.Context, *rpc.Request) (*rpc.Response, error) - BaseStoreMkdirs(context.Context, *rpc.Request) (*rpc.Response, error) - CheckCache(context.Context, *rpc.Request) (*rpc.Response, error) - CacheGC(context.Context, *rpc.Request) (*rpc.Response, error) Ping(context.Context, *rpc.Request) (*rpc.Response, error) GetState(context.Context, *rpc.Request) (*rpc.Response, error) mustEmbedUnimplementedHubServer() @@ -242,18 +194,6 @@ func (UnimplementedHubServer) SendIOVar(context.Context, *rpc.Request) (*rpc.Res func (UnimplementedHubServer) GetIOVar(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetIOVar not implemented") } -func (UnimplementedHubServer) BaseStoreListAll(context.Context, *rpc.Request) (*rpc.Response, error) { - return nil, status.Errorf(codes.Unimplemented, "method BaseStoreListAll not implemented") -} -func (UnimplementedHubServer) BaseStoreMkdirs(context.Context, *rpc.Request) (*rpc.Response, error) { - return nil, status.Errorf(codes.Unimplemented, "method BaseStoreMkdirs not implemented") -} -func (UnimplementedHubServer) CheckCache(context.Context, *rpc.Request) (*rpc.Response, error) { - return nil, status.Errorf(codes.Unimplemented, "method CheckCache not implemented") -} -func (UnimplementedHubServer) CacheGC(context.Context, *rpc.Request) (*rpc.Response, error) { - return nil, status.Errorf(codes.Unimplemented, "method CacheGC not implemented") -} func (UnimplementedHubServer) Ping(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } @@ -374,78 +314,6 @@ func _Hub_GetIOVar_Handler(srv interface{}, ctx context.Context, dec func(interf return interceptor(ctx, in, info, handler) } -func _Hub_BaseStoreListAll_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(rpc.Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).BaseStoreListAll(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_BaseStoreListAll_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).BaseStoreListAll(ctx, req.(*rpc.Request)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_BaseStoreMkdirs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(rpc.Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).BaseStoreMkdirs(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_BaseStoreMkdirs_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).BaseStoreMkdirs(ctx, req.(*rpc.Request)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_CheckCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(rpc.Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).CheckCache(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_CheckCache_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).CheckCache(ctx, req.(*rpc.Request)) - } - return interceptor(ctx, in, info, handler) -} - -func _Hub_CacheGC_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(rpc.Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HubServer).CacheGC(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Hub_CacheGC_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HubServer).CacheGC(ctx, req.(*rpc.Request)) - } - return interceptor(ctx, in, info, handler) -} - func _Hub_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(rpc.Request) if err := dec(in); err != nil { @@ -501,22 +369,6 @@ var Hub_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetIOVar", Handler: _Hub_GetIOVar_Handler, }, - { - MethodName: "BaseStoreListAll", - Handler: _Hub_BaseStoreListAll_Handler, - }, - { - MethodName: "BaseStoreMkdirs", - Handler: _Hub_BaseStoreMkdirs_Handler, - }, - { - MethodName: "CheckCache", - Handler: _Hub_CheckCache_Handler, - }, - { - MethodName: "CacheGC", - Handler: _Hub_CacheGC_Handler, - }, { MethodName: "Ping", Handler: _Hub_Ping_Handler, diff --git a/common/pkgs/rpc/hub/server.go b/common/pkgs/rpc/hub/server.go index 549e692..a741388 100644 --- a/common/pkgs/rpc/hub/server.go +++ b/common/pkgs/rpc/hub/server.go @@ -5,10 +5,10 @@ import ( ) type HubAPI interface { - CacheSvc + // CacheSvc IOSwitchSvc MicsSvc - UserSpaceSvc + // UserSpaceSvc } type Server struct { diff --git a/common/pkgs/rpc/hub/user_space.go b/common/pkgs/rpc/hub/user_space.go index 346d4a8..8852c34 100644 --- a/common/pkgs/rpc/hub/user_space.go +++ b/common/pkgs/rpc/hub/user_space.go @@ -1,5 +1,6 @@ package hubrpc +/* import ( "context" @@ -51,3 +52,4 @@ func (c *Client) BaseStoreMkdirs(ctx context.Context, req *BaseStoreMkdirs) (*Ba func (s *Server) BaseStoreMkdirs(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { return rpc.UnaryServer(s.svrImpl.BaseStoreMkdirs, ctx, req) } +*/ diff --git a/hub/internal/rpc/cache.go b/hub/internal/rpc/cache.go index e557f89..c10861a 100644 --- a/hub/internal/rpc/cache.go +++ b/hub/internal/rpc/cache.go @@ -1,5 +1,6 @@ package rpc +/* import ( "context" "fmt" @@ -42,3 +43,4 @@ func (svc *Service) CacheGC(context context.Context, msg *hubrpc.CacheGC) (*hubr return &hubrpc.CacheGCResp{}, nil } +*/ diff --git a/hub/internal/rpc/user_space.go b/hub/internal/rpc/user_space.go index 473d134..f86f222 100644 --- a/hub/internal/rpc/user_space.go +++ b/hub/internal/rpc/user_space.go @@ -1,5 +1,6 @@ package rpc +/* import ( "context" @@ -45,3 +46,4 @@ func (svc *Service) BaseStoreMkdirs(context context.Context, msg *hubrpc.BaseSto Successes: suc, }, nil } +*/