From f47ddcee23b365ed983e6f4b34fe3b1c6d9a9d7b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 17:21:43 +0800 Subject: [PATCH] =?UTF-8?q?agent=E9=87=8D=E5=91=BD=E5=90=8D=E4=B8=BAhub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .devops/基于gitlink构建.yml | 8 +- Dockerfile | 10 +- README.md | 6 +- client/internal/cmdline/mount.go | 4 +- client/internal/cmdline/serve.go | 2 +- client/internal/config/config.go | 4 +- client/internal/services/cache.go | 20 +- client/internal/services/storage.go | 8 +- common/assets/confs/agent.config.json | 4 +- common/assets/confs/client.config.json | 2 +- common/globals/pools.go | 18 +- common/magefiles/main.go | 2 +- common/pkgs/connectivity/collector.go | 8 +- common/pkgs/grpc/agent/agent.pb.go | 178 ++++++++-------- common/pkgs/grpc/agent/agent.proto | 4 +- common/pkgs/grpc/agent/agent_grpc.pb.go | 190 +++++++++--------- common/pkgs/grpc/agent/client.go | 8 +- common/pkgs/grpc/agent/pool.go | 4 +- common/pkgs/ioswitch2/agent_worker.go | 34 ++-- common/pkgs/ioswitch2/http_hub_worker.go | 2 +- common/pkgs/ioswitch2/ops2/faas.go | 2 +- common/pkgs/ioswitch2/parser/gen/generator.go | 4 +- common/pkgs/ioswitch2/parser/opt/ec.go | 2 +- common/pkgs/ioswitch2/plans/utils.go | 2 +- common/pkgs/ioswitchlrc/agent_worker.go | 34 ++-- common/pkgs/ioswitchlrc/parser/passes.go | 4 +- common/pkgs/mq/agent/agent.go | 6 +- common/pkgs/mq/agent/client.go | 4 +- common/pkgs/mq/agent/server.go | 6 +- common/pkgs/mq/agent/storage.go | 2 +- common/pkgs/mq/consts.go | 4 +- .../scanner/event/agent_check_shardstore.go | 8 +- .../mq/scanner/event/agent_check_state.go | 8 +- .../mq/scanner/event/agent_check_storage.go | 8 +- .../mq/scanner/event/agent_shardstore_gc.go | 8 +- .../pkgs/mq/scanner/event/agent_storage_gc.go | 8 +- hub/internal/cmd/serve.go | 26 +-- hub/internal/config/config.go | 2 +- hub/internal/grpc/io.go | 6 +- hub/internal/grpc/ping.go | 2 +- hub/internal/grpc/service.go | 4 +- hub/internal/mq/agent.go | 2 +- hub/internal/mq/storage.go | 2 +- hub/internal/task/task.go | 4 +- hub/internal/tickevent/report_hub_stats.go | 2 +- .../tickevent/report_storage_stats.go | 4 +- hub/main.go | 2 +- magefiles/main.go | 22 +- .../internal/event/agent_check_shardstore.go | 48 ++--- scanner/internal/event/agent_check_state.go | 30 +-- scanner/internal/event/agent_shardstore_gc.go | 38 ++-- .../event/check_package_redundancy.go | 6 +- scanner/internal/event/event.go | 4 +- .../batch_all_agent_check_shardstore.go | 16 +- .../internal/tickevent/check_agent_state.go | 12 +- scanner/internal/tickevent/storage_gc.go | 2 +- scanner/main.go | 12 +- 57 files changed, 430 insertions(+), 442 deletions(-) diff --git a/.devops/基于gitlink构建.yml b/.devops/基于gitlink构建.yml index 29dbf30..8b7b028 100644 --- a/.devops/基于gitlink构建.yml +++ b/.devops/基于gitlink构建.yml @@ -100,7 +100,7 @@ workflow: - ssh_cmd_0 - dockerBuild_0 - ref: docker_image_build_0 - name: agent镜像构建 + name: hub镜像构建 cache: GOCACHE: /root/.cache/go-build GOMODCACHE: /go/pkg/mod @@ -108,14 +108,14 @@ workflow: input: docker_username: ((docker_registry.registry_user)) docker_password: ((docker_registry.registry_password)) - image_name: '"112.95.163.90:5010/agentservice-x86"' + image_name: '"112.95.163.90:5010/hubservice-x86"' image_tag: '"latest"' registry_address: '"123.60.146.162:5010"' docker_file: '"Dockerfile"' docker_build_path: '"."' workspace: git_clone_0.git_path image_push: true - build_args: '"TARGET=agentimage"' + build_args: '"TARGET=hubimage"' needs: - shell_0 - ref: git_clone_1 @@ -145,7 +145,7 @@ workflow: context: git_clone_0.git_path dockerfile: '"Dockerfile"' docker_build_tags: '"latest"' - docker_build_args: '"TARGET=agentimage"' + docker_build_args: '"TARGET=hubimage"' docker_login_registry: '""' docker_login_username: ((docker_registry.registry_user)) docker_login_password: ((docker_registry.registry_password)) diff --git a/Dockerfile b/Dockerfile index c48175a..60c5c8a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,11 +23,11 @@ RUN apk add --no-cache tzdata ENV TZ=Asia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -# agent -FROM base AS agentimage -COPY --from=builder /app/storage/build/agent . -RUN chmod +x agent/agent -ENTRYPOINT ["./agent/agent"] +# hub +FROM base AS hubimage +COPY --from=builder /app/storage/build/hub . +RUN chmod +x hub/hub +ENTRYPOINT ["./hub/hub"] # coordinator FROM base AS coorimage diff --git a/README.md b/README.md index 193d132..6320899 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ## 目录结构 此仓库是一个go module,但包含了多个服务的源码,你可以在每个服务的目录中找到main.go。可以通过编译脚本的参数来指定生成哪一个服务。 -- `agent`:Agent服务的源码。 +- `hub`:Hub服务的源码。 - `client`:Client服务的源码。 - `common`:存放在几个服务之间共享的代码以及一些数据结构定义。 - `coordinator`:Coordinator服务的源码。 @@ -17,12 +17,12 @@ ## 编译 运行编译脚本需要使用mage工具,此处是[仓库链接](https://github.com/magefile/mage)。 -安装好mage工具之后,进入到仓库根目录,使用`mage agent`即可编译Agent服务。与此相同的还有`mage client`、`mage coodinator`、`mage scanner`。可以同时指定多个参数来编译多个服务,如果要一次性编译所有服务,可以使用`mage bin`。 +安装好mage工具之后,进入到仓库根目录,使用`mage hub`即可编译Hub服务。与此相同的还有`mage client`、`mage coodinator`、`mage scanner`。可以同时指定多个参数来编译多个服务,如果要一次性编译所有服务,可以使用`mage bin`。 使用`mage confs`命令可以将`common/assets/confs`的配置文件拷贝到输出目录,使用`mage scripts`将`scripts`目录里的脚本拷贝到输出目录。 使用`mage all`可以一次性完成编译、拷贝工作。 -可以通过增加额外的参数来指定编译目标平台,比如`mage win amd64 agent`。支持的操作系统参数有`win`、`linux`,支持的CPU架构参数有`amd64`、`arm64`。这些参数同样可以和`bin`、`all`参数一起使用。 +可以通过增加额外的参数来指定编译目标平台,比如`mage win amd64 hub`。支持的操作系统参数有`win`、`linux`,支持的CPU架构参数有`amd64`、`arm64`。这些参数同样可以和`bin`、`all`参数一起使用。 注意:编译目标平台参数必须在编译二进制参数之前。 diff --git a/client/internal/cmdline/mount.go b/client/internal/cmdline/mount.go index 166a7aa..39e8a1c 100644 --- a/client/internal/cmdline/mount.go +++ b/client/internal/cmdline/mount.go @@ -18,7 +18,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) @@ -52,7 +52,7 @@ func mountCmd(mountPoint string, configPath string) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 21d5072..474a607 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -55,7 +55,7 @@ func serveHTTP(configPath string, listenAddr string) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) + stgglb.InitHubRPCPool(&config.Cfg().HubGRPC) // 数据库 db, err := db.NewDB(&config.Cfg().DB) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 09048d0..07fb281 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -11,12 +11,12 @@ import ( clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) type Config struct { Local stgglb.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + HubGRPC agtrpc.PoolConfig `json:"hubGRPC"` Logger logger.Config `json:"logger"` DB db.Config `json:"db"` RabbitMQ mq.Config `json:"rabbitMQ"` diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index 7f94375..1c62521 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -8,7 +8,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" ) @@ -37,13 +37,13 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c return 0, "", fmt.Errorf("shard storage is not enabled") } - agentCli, err := stgglb.AgentMQPool.Acquire(getStg.Storages[0].MasterHub.HubID) + hubCli, err := stgglb.HubMQPool.Acquire(getStg.Storages[0].MasterHub.HubID) if err != nil { - return 0, "", fmt.Errorf("new agent client: %w", err) + return 0, "", fmt.Errorf("new hub client: %w", err) } - defer stgglb.AgentMQPool.Release(agentCli) + defer stgglb.HubMQPool.Release(hubCli) - startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID, stgID)) + startResp, err := hubCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID, stgID)) if err != nil { return 0, "", fmt.Errorf("start cache move package: %w", err) } @@ -52,13 +52,13 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c } func (svc *CacheService) WaitCacheMovePackage(hubID cdssdk.HubID, taskID string, waitTimeout time.Duration) (bool, error) { - agentCli, err := stgglb.AgentMQPool.Acquire(hubID) + hubCli, err := stgglb.HubMQPool.Acquire(hubID) if err != nil { - return true, fmt.Errorf("new agent client: %w", err) + return true, fmt.Errorf("new hub client: %w", err) } - defer stgglb.AgentMQPool.Release(agentCli) + defer stgglb.HubMQPool.Release(hubCli) - waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) + waitResp, err := hubCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) if err != nil { return true, fmt.Errorf("wait cache move package: %w", err) } @@ -77,7 +77,7 @@ func (svc *CacheService) WaitCacheMovePackage(hubID cdssdk.HubID, taskID string, func (svc *CacheService) CacheRemovePackage(packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return fmt.Errorf("new agent client: %w", err) + return fmt.Errorf("new hub client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 3297bbf..1dc5bbe 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -138,13 +138,13 @@ func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, na // return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled") // } - // agentCli, err := stgglb.AgentMQPool.Acquire(spaceDetail.MasterHub.HubID) + // hubCli, err := stgglb.HubMQPool.Acquire(spaceDetail.MasterHub.HubID) // if err != nil { - // return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err) + // return cdssdk.Package{}, fmt.Errorf("new hub client: %w", err) // } - // defer stgglb.AgentMQPool.Release(agentCli) + // defer stgglb.HubMQPool.Release(hubCli) - // createResp, err := agentCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) + // createResp, err := hubCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) // if err != nil { // return cdssdk.Package{}, err // } diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 21c08b9..5d002a9 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -11,7 +11,7 @@ }, "logger": { "output": "file", - "outputFileName": "agent", + "outputFileName": "hub", "outputDirectory": "log", "level": "debug" }, @@ -31,7 +31,7 @@ "etcdPassword": "", "etcdLockLeaseTimeSec": 5, "randomReleasingDelayMs": 3000, - "serviceDescription": "I am a agent" + "serviceDescription": "I am a hub" }, "connectivity": { "testInterval": 300 diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 570b47e..2f9e2c5 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -5,7 +5,7 @@ "externalIP": "127.0.0.1", "locationID": 1 }, - "agentGRPC": { + "hubGRPC": { "port": 5010 }, "logger": { diff --git a/common/globals/pools.go b/common/globals/pools.go index 7b7f264..6524859 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -2,13 +2,13 @@ package stgglb import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" scmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner" ) -var AgentMQPool agtmq.Pool +var HubMQPool agtmq.Pool var CoordinatorMQPool coormq.Pool @@ -19,7 +19,7 @@ var ScannerMQPool scmq.Pool // @Description: 初始化MQ连接池 // @param cfg func InitMQPool(cfg mq.Config) { - AgentMQPool = agtmq.NewPool(cfg) + HubMQPool = agtmq.NewPool(cfg) CoordinatorMQPool = coormq.NewPool(cfg) @@ -27,12 +27,12 @@ func InitMQPool(cfg mq.Config) { } -var AgentRPCPool *agtrpc.Pool +var HubRPCPool *agtrpc.Pool -// InitAgentRPCPool +// InitHubRPCPool // -// @Description: 初始化AgentRPC连接池 +// @Description: 初始化HubRPC连接池 // @param cfg -func InitAgentRPCPool(cfg *agtrpc.PoolConfig) { - AgentRPCPool = agtrpc.NewPool(cfg) +func InitHubRPCPool(cfg *agtrpc.PoolConfig) { + HubRPCPool = agtrpc.NewPool(cfg) } diff --git a/common/magefiles/main.go b/common/magefiles/main.go index b97191c..c036f3c 100644 --- a/common/magefiles/main.go +++ b/common/magefiles/main.go @@ -9,7 +9,7 @@ import ( ) func Protos() error { - return proto("pkgs/grpc/agent", "agent.proto") + return proto("pkgs/grpc/hub", "hub.proto") } func proto(dir string, fileName string) error { diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index ac33363..e551b74 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -173,7 +173,7 @@ func (r *Collector) ping(hub cortypes.Hub) Connectivity { port = addr.ExternalGRPCPort } default: - // TODO 增加对HTTP模式的agent的支持 + // TODO 增加对HTTP模式的hub的支持 log.Warnf("unsupported address type: %v", addr) @@ -184,16 +184,16 @@ func (r *Collector) ping(hub cortypes.Hub) Connectivity { } } - agtCli, err := stgglb.AgentRPCPool.Acquire(ip, port) + agtCli, err := stgglb.HubRPCPool.Acquire(ip, port) if err != nil { - log.Warnf("new agent %v:%v rpc client: %w", ip, port, err) + log.Warnf("new hub %v:%v rpc client: %w", ip, port, err) return Connectivity{ ToHubID: hub.HubID, Latency: nil, TestTime: time.Now(), } } - defer stgglb.AgentRPCPool.Release(agtCli) + defer stgglb.HubRPCPool.Release(agtCli) // 第一次ping保证网络连接建立成功 err = agtCli.Ping() diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index ce50847..65ae49f 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -4,9 +4,9 @@ // versions: // protoc-gen-go v1.34.2 // protoc v4.22.3 -// source: pkgs/grpc/agent/agent.proto +// source: pkgs/grpc/hub/hub.proto -package agent +package hub import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -55,11 +55,11 @@ func (x StreamDataPacketType) String() string { } func (StreamDataPacketType) Descriptor() protoreflect.EnumDescriptor { - return file_pkgs_grpc_agent_agent_proto_enumTypes[0].Descriptor() + return file_pkgs_grpc_hub_hub_proto_enumTypes[0].Descriptor() } func (StreamDataPacketType) Type() protoreflect.EnumType { - return &file_pkgs_grpc_agent_agent_proto_enumTypes[0] + return &file_pkgs_grpc_hub_hub_proto_enumTypes[0] } func (x StreamDataPacketType) Number() protoreflect.EnumNumber { @@ -68,7 +68,7 @@ func (x StreamDataPacketType) Number() protoreflect.EnumNumber { // Deprecated: Use StreamDataPacketType.Descriptor instead. func (StreamDataPacketType) EnumDescriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{0} } type ExecuteIOPlanReq struct { @@ -82,7 +82,7 @@ type ExecuteIOPlanReq struct { func (x *ExecuteIOPlanReq) Reset() { *x = ExecuteIOPlanReq{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -95,7 +95,7 @@ func (x *ExecuteIOPlanReq) String() string { func (*ExecuteIOPlanReq) ProtoMessage() {} func (x *ExecuteIOPlanReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -108,7 +108,7 @@ func (x *ExecuteIOPlanReq) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteIOPlanReq.ProtoReflect.Descriptor instead. func (*ExecuteIOPlanReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{0} } func (x *ExecuteIOPlanReq) GetPlan() string { @@ -127,7 +127,7 @@ type ExecuteIOPlanResp struct { func (x *ExecuteIOPlanResp) Reset() { *x = ExecuteIOPlanResp{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -140,7 +140,7 @@ func (x *ExecuteIOPlanResp) String() string { func (*ExecuteIOPlanResp) ProtoMessage() {} func (x *ExecuteIOPlanResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -153,7 +153,7 @@ func (x *ExecuteIOPlanResp) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteIOPlanResp.ProtoReflect.Descriptor instead. func (*ExecuteIOPlanResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{1} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{1} } // 文件数据。注意:只在Type为Data或EOF的时候,Data字段才能有数据 @@ -169,7 +169,7 @@ type FileDataPacket struct { func (x *FileDataPacket) Reset() { *x = FileDataPacket{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -182,7 +182,7 @@ func (x *FileDataPacket) String() string { func (*FileDataPacket) ProtoMessage() {} func (x *FileDataPacket) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -195,7 +195,7 @@ func (x *FileDataPacket) ProtoReflect() protoreflect.Message { // Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead. func (*FileDataPacket) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{2} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{2} } func (x *FileDataPacket) GetType() StreamDataPacketType { @@ -227,7 +227,7 @@ type StreamDataPacket struct { func (x *StreamDataPacket) Reset() { *x = StreamDataPacket{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -240,7 +240,7 @@ func (x *StreamDataPacket) String() string { func (*StreamDataPacket) ProtoMessage() {} func (x *StreamDataPacket) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -253,7 +253,7 @@ func (x *StreamDataPacket) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamDataPacket.ProtoReflect.Descriptor instead. func (*StreamDataPacket) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{3} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{3} } func (x *StreamDataPacket) GetType() StreamDataPacketType { @@ -293,7 +293,7 @@ type SendStreamResp struct { func (x *SendStreamResp) Reset() { *x = SendStreamResp{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -306,7 +306,7 @@ func (x *SendStreamResp) String() string { func (*SendStreamResp) ProtoMessage() {} func (x *SendStreamResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -319,7 +319,7 @@ func (x *SendStreamResp) ProtoReflect() protoreflect.Message { // Deprecated: Use SendStreamResp.ProtoReflect.Descriptor instead. func (*SendStreamResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{4} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{4} } type GetStreamReq struct { @@ -336,7 +336,7 @@ type GetStreamReq struct { func (x *GetStreamReq) Reset() { *x = GetStreamReq{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -349,7 +349,7 @@ func (x *GetStreamReq) String() string { func (*GetStreamReq) ProtoMessage() {} func (x *GetStreamReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -362,7 +362,7 @@ func (x *GetStreamReq) ProtoReflect() protoreflect.Message { // Deprecated: Use GetStreamReq.ProtoReflect.Descriptor instead. func (*GetStreamReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{5} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{5} } func (x *GetStreamReq) GetPlanID() string { @@ -406,7 +406,7 @@ type SendVarReq struct { func (x *SendVarReq) Reset() { *x = SendVarReq{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -419,7 +419,7 @@ func (x *SendVarReq) String() string { func (*SendVarReq) ProtoMessage() {} func (x *SendVarReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -432,7 +432,7 @@ func (x *SendVarReq) ProtoReflect() protoreflect.Message { // Deprecated: Use SendVarReq.ProtoReflect.Descriptor instead. func (*SendVarReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{6} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{6} } func (x *SendVarReq) GetPlanID() string { @@ -465,7 +465,7 @@ type SendVarResp struct { func (x *SendVarResp) Reset() { *x = SendVarResp{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -478,7 +478,7 @@ func (x *SendVarResp) String() string { func (*SendVarResp) ProtoMessage() {} func (x *SendVarResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -491,7 +491,7 @@ func (x *SendVarResp) ProtoReflect() protoreflect.Message { // Deprecated: Use SendVarResp.ProtoReflect.Descriptor instead. func (*SendVarResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{7} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{7} } type GetVarReq struct { @@ -508,7 +508,7 @@ type GetVarReq struct { func (x *GetVarReq) Reset() { *x = GetVarReq{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[8] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -521,7 +521,7 @@ func (x *GetVarReq) String() string { func (*GetVarReq) ProtoMessage() {} func (x *GetVarReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[8] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -534,7 +534,7 @@ func (x *GetVarReq) ProtoReflect() protoreflect.Message { // Deprecated: Use GetVarReq.ProtoReflect.Descriptor instead. func (*GetVarReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{8} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{8} } func (x *GetVarReq) GetPlanID() string { @@ -576,7 +576,7 @@ type GetVarResp struct { func (x *GetVarResp) Reset() { *x = GetVarResp{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[9] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -589,7 +589,7 @@ func (x *GetVarResp) String() string { func (*GetVarResp) ProtoMessage() {} func (x *GetVarResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[9] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -602,7 +602,7 @@ func (x *GetVarResp) ProtoReflect() protoreflect.Message { // Deprecated: Use GetVarResp.ProtoReflect.Descriptor instead. func (*GetVarResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{9} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{9} } func (x *GetVarResp) GetVar() string { @@ -621,7 +621,7 @@ type PingReq struct { func (x *PingReq) Reset() { *x = PingReq{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[10] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -634,7 +634,7 @@ func (x *PingReq) String() string { func (*PingReq) ProtoMessage() {} func (x *PingReq) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[10] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -647,7 +647,7 @@ func (x *PingReq) ProtoReflect() protoreflect.Message { // Deprecated: Use PingReq.ProtoReflect.Descriptor instead. func (*PingReq) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{10} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{10} } type PingResp struct { @@ -659,7 +659,7 @@ type PingResp struct { func (x *PingResp) Reset() { *x = PingResp{} if protoimpl.UnsafeEnabled { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[11] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -672,7 +672,7 @@ func (x *PingResp) String() string { func (*PingResp) ProtoMessage() {} func (x *PingResp) ProtoReflect() protoreflect.Message { - mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[11] + mi := &file_pkgs_grpc_hub_hub_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -685,12 +685,12 @@ func (x *PingResp) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResp.ProtoReflect.Descriptor instead. func (*PingResp) Descriptor() ([]byte, []int) { - return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{11} + return file_pkgs_grpc_hub_hub_proto_rawDescGZIP(), []int{11} } -var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor +var File_pkgs_grpc_hub_hub_proto protoreflect.FileDescriptor -var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ +var file_pkgs_grpc_hub_hub_proto_rawDesc = []byte{ 0x0a, 0x1b, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x26, 0x0a, 0x10, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, @@ -761,20 +761,20 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ } var ( - file_pkgs_grpc_agent_agent_proto_rawDescOnce sync.Once - file_pkgs_grpc_agent_agent_proto_rawDescData = file_pkgs_grpc_agent_agent_proto_rawDesc + file_pkgs_grpc_hub_hub_proto_rawDescOnce sync.Once + file_pkgs_grpc_hub_hub_proto_rawDescData = file_pkgs_grpc_hub_hub_proto_rawDesc ) -func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { - file_pkgs_grpc_agent_agent_proto_rawDescOnce.Do(func() { - file_pkgs_grpc_agent_agent_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_grpc_agent_agent_proto_rawDescData) +func file_pkgs_grpc_hub_hub_proto_rawDescGZIP() []byte { + file_pkgs_grpc_hub_hub_proto_rawDescOnce.Do(func() { + file_pkgs_grpc_hub_hub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_grpc_hub_hub_proto_rawDescData) }) - return file_pkgs_grpc_agent_agent_proto_rawDescData + return file_pkgs_grpc_hub_hub_proto_rawDescData } -var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 12) -var file_pkgs_grpc_agent_agent_proto_goTypes = []any{ +var file_pkgs_grpc_hub_hub_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pkgs_grpc_hub_hub_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_pkgs_grpc_hub_hub_proto_goTypes = []any{ (StreamDataPacketType)(0), // 0: StreamDataPacketType (*ExecuteIOPlanReq)(nil), // 1: ExecuteIOPlanReq (*ExecuteIOPlanResp)(nil), // 2: ExecuteIOPlanResp @@ -789,21 +789,21 @@ var file_pkgs_grpc_agent_agent_proto_goTypes = []any{ (*PingReq)(nil), // 11: PingReq (*PingResp)(nil), // 12: PingResp } -var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ +var file_pkgs_grpc_hub_hub_proto_depIdxs = []int32{ 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType 0, // 1: StreamDataPacket.Type:type_name -> StreamDataPacketType - 1, // 2: Agent.ExecuteIOPlan:input_type -> ExecuteIOPlanReq - 4, // 3: Agent.SendStream:input_type -> StreamDataPacket - 6, // 4: Agent.GetStream:input_type -> GetStreamReq - 7, // 5: Agent.SendVar:input_type -> SendVarReq - 9, // 6: Agent.GetVar:input_type -> GetVarReq - 11, // 7: Agent.Ping:input_type -> PingReq - 2, // 8: Agent.ExecuteIOPlan:output_type -> ExecuteIOPlanResp - 5, // 9: Agent.SendStream:output_type -> SendStreamResp - 4, // 10: Agent.GetStream:output_type -> StreamDataPacket - 8, // 11: Agent.SendVar:output_type -> SendVarResp - 10, // 12: Agent.GetVar:output_type -> GetVarResp - 12, // 13: Agent.Ping:output_type -> PingResp + 1, // 2: Hub.ExecuteIOPlan:input_type -> ExecuteIOPlanReq + 4, // 3: Hub.SendStream:input_type -> StreamDataPacket + 6, // 4: Hub.GetStream:input_type -> GetStreamReq + 7, // 5: Hub.SendVar:input_type -> SendVarReq + 9, // 6: Hub.GetVar:input_type -> GetVarReq + 11, // 7: Hub.Ping:input_type -> PingReq + 2, // 8: Hub.ExecuteIOPlan:output_type -> ExecuteIOPlanResp + 5, // 9: Hub.SendStream:output_type -> SendStreamResp + 4, // 10: Hub.GetStream:output_type -> StreamDataPacket + 8, // 11: Hub.SendVar:output_type -> SendVarResp + 10, // 12: Hub.GetVar:output_type -> GetVarResp + 12, // 13: Hub.Ping:output_type -> PingResp 8, // [8:14] is the sub-list for method output_type 2, // [2:8] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name @@ -811,13 +811,13 @@ var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ 0, // [0:2] is the sub-list for field type_name } -func init() { file_pkgs_grpc_agent_agent_proto_init() } -func file_pkgs_grpc_agent_agent_proto_init() { - if File_pkgs_grpc_agent_agent_proto != nil { +func init() { file_pkgs_grpc_hub_hub_proto_init() } +func file_pkgs_grpc_hub_hub_proto_init() { + if File_pkgs_grpc_hub_hub_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_pkgs_grpc_agent_agent_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*ExecuteIOPlanReq); i { case 0: return &v.state @@ -829,7 +829,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*ExecuteIOPlanResp); i { case 0: return &v.state @@ -841,7 +841,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[2].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[2].Exporter = func(v any, i int) any { switch v := v.(*FileDataPacket); i { case 0: return &v.state @@ -853,7 +853,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[3].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*StreamDataPacket); i { case 0: return &v.state @@ -865,7 +865,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[4].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[4].Exporter = func(v any, i int) any { switch v := v.(*SendStreamResp); i { case 0: return &v.state @@ -877,7 +877,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[5].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*GetStreamReq); i { case 0: return &v.state @@ -889,7 +889,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*SendVarReq); i { case 0: return &v.state @@ -901,7 +901,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[7].Exporter = func(v any, i int) any { switch v := v.(*SendVarResp); i { case 0: return &v.state @@ -913,7 +913,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[8].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[8].Exporter = func(v any, i int) any { switch v := v.(*GetVarReq); i { case 0: return &v.state @@ -925,7 +925,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[9].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[9].Exporter = func(v any, i int) any { switch v := v.(*GetVarResp); i { case 0: return &v.state @@ -937,7 +937,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[10].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[10].Exporter = func(v any, i int) any { switch v := v.(*PingReq); i { case 0: return &v.state @@ -949,7 +949,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[11].Exporter = func(v any, i int) any { + file_pkgs_grpc_hub_hub_proto_msgTypes[11].Exporter = func(v any, i int) any { switch v := v.(*PingResp); i { case 0: return &v.state @@ -966,19 +966,19 @@ func file_pkgs_grpc_agent_agent_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, + RawDescriptor: file_pkgs_grpc_hub_hub_proto_rawDesc, NumEnums: 1, NumMessages: 12, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_pkgs_grpc_agent_agent_proto_goTypes, - DependencyIndexes: file_pkgs_grpc_agent_agent_proto_depIdxs, - EnumInfos: file_pkgs_grpc_agent_agent_proto_enumTypes, - MessageInfos: file_pkgs_grpc_agent_agent_proto_msgTypes, + GoTypes: file_pkgs_grpc_hub_hub_proto_goTypes, + DependencyIndexes: file_pkgs_grpc_hub_hub_proto_depIdxs, + EnumInfos: file_pkgs_grpc_hub_hub_proto_enumTypes, + MessageInfos: file_pkgs_grpc_hub_hub_proto_msgTypes, }.Build() - File_pkgs_grpc_agent_agent_proto = out.File - file_pkgs_grpc_agent_agent_proto_rawDesc = nil - file_pkgs_grpc_agent_agent_proto_goTypes = nil - file_pkgs_grpc_agent_agent_proto_depIdxs = nil + File_pkgs_grpc_hub_hub_proto = out.File + file_pkgs_grpc_hub_hub_proto_rawDesc = nil + file_pkgs_grpc_hub_hub_proto_goTypes = nil + file_pkgs_grpc_hub_hub_proto_depIdxs = nil } diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index 3233dee..c943357 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -2,7 +2,7 @@ syntax = "proto3"; // 生成的go文件包 -option go_package = ".;agent";//grpc这里生效了 +option go_package = ".;hub";//grpc这里生效了 @@ -61,7 +61,7 @@ message GetVarResp { message PingReq {} message PingResp {} -service Agent { +service Hub { rpc ExecuteIOPlan(ExecuteIOPlanReq) returns(ExecuteIOPlanResp){} rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/agent/agent_grpc.pb.go index 6d0a338..af595c9 100644 --- a/common/pkgs/grpc/agent/agent_grpc.pb.go +++ b/common/pkgs/grpc/agent/agent_grpc.pb.go @@ -4,9 +4,9 @@ // versions: // - protoc-gen-go-grpc v1.3.0 // - protoc v4.22.3 -// source: pkgs/grpc/agent/agent.proto +// source: pkgs/grpc/hub/hub.proto -package agent +package hub import ( context "context" @@ -21,67 +21,67 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Agent_ExecuteIOPlan_FullMethodName = "/Agent/ExecuteIOPlan" - Agent_SendStream_FullMethodName = "/Agent/SendStream" - Agent_GetStream_FullMethodName = "/Agent/GetStream" - Agent_SendVar_FullMethodName = "/Agent/SendVar" - Agent_GetVar_FullMethodName = "/Agent/GetVar" - Agent_Ping_FullMethodName = "/Agent/Ping" + Hub_ExecuteIOPlan_FullMethodName = "/Hub/ExecuteIOPlan" + Hub_SendStream_FullMethodName = "/Hub/SendStream" + Hub_GetStream_FullMethodName = "/Hub/GetStream" + Hub_SendVar_FullMethodName = "/Hub/SendVar" + Hub_GetVar_FullMethodName = "/Hub/GetVar" + Hub_Ping_FullMethodName = "/Hub/Ping" ) -// AgentClient is the client API for Agent service. +// HubClient is the client API for Hub service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type AgentClient interface { +type HubClient interface { ExecuteIOPlan(ctx context.Context, in *ExecuteIOPlanReq, opts ...grpc.CallOption) (*ExecuteIOPlanResp, error) - SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) - GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Agent_GetStreamClient, error) + SendStream(ctx context.Context, opts ...grpc.CallOption) (Hub_SendStreamClient, error) + GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Hub_GetStreamClient, error) SendVar(ctx context.Context, in *SendVarReq, opts ...grpc.CallOption) (*SendVarResp, error) GetVar(ctx context.Context, in *GetVarReq, opts ...grpc.CallOption) (*GetVarResp, error) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) } -type agentClient struct { +type hubClient struct { cc grpc.ClientConnInterface } -func NewAgentClient(cc grpc.ClientConnInterface) AgentClient { - return &agentClient{cc} +func NewHubClient(cc grpc.ClientConnInterface) HubClient { + return &hubClient{cc} } -func (c *agentClient) ExecuteIOPlan(ctx context.Context, in *ExecuteIOPlanReq, opts ...grpc.CallOption) (*ExecuteIOPlanResp, error) { +func (c *hubClient) ExecuteIOPlan(ctx context.Context, in *ExecuteIOPlanReq, opts ...grpc.CallOption) (*ExecuteIOPlanResp, error) { out := new(ExecuteIOPlanResp) - err := c.cc.Invoke(ctx, Agent_ExecuteIOPlan_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Hub_ExecuteIOPlan_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *agentClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[0], Agent_SendStream_FullMethodName, opts...) +func (c *hubClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (Hub_SendStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], Hub_SendStream_FullMethodName, opts...) if err != nil { return nil, err } - x := &agentSendStreamClient{stream} + x := &hubSendStreamClient{stream} return x, nil } -type Agent_SendStreamClient interface { +type Hub_SendStreamClient interface { Send(*StreamDataPacket) error CloseAndRecv() (*SendStreamResp, error) grpc.ClientStream } -type agentSendStreamClient struct { +type hubSendStreamClient struct { grpc.ClientStream } -func (x *agentSendStreamClient) Send(m *StreamDataPacket) error { +func (x *hubSendStreamClient) Send(m *StreamDataPacket) error { return x.ClientStream.SendMsg(m) } -func (x *agentSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { +func (x *hubSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { if err := x.ClientStream.CloseSend(); err != nil { return nil, err } @@ -92,12 +92,12 @@ func (x *agentSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { return m, nil } -func (c *agentClient) GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Agent_GetStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[1], Agent_GetStream_FullMethodName, opts...) +func (c *hubClient) GetStream(ctx context.Context, in *GetStreamReq, opts ...grpc.CallOption) (Hub_GetStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[1], Hub_GetStream_FullMethodName, opts...) if err != nil { return nil, err } - x := &agentGetStreamClient{stream} + x := &hubGetStreamClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -107,16 +107,16 @@ func (c *agentClient) GetStream(ctx context.Context, in *GetStreamReq, opts ...g return x, nil } -type Agent_GetStreamClient interface { +type Hub_GetStreamClient interface { Recv() (*StreamDataPacket, error) grpc.ClientStream } -type agentGetStreamClient struct { +type hubGetStreamClient struct { grpc.ClientStream } -func (x *agentGetStreamClient) Recv() (*StreamDataPacket, error) { +func (x *hubGetStreamClient) Recv() (*StreamDataPacket, error) { m := new(StreamDataPacket) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -124,118 +124,118 @@ func (x *agentGetStreamClient) Recv() (*StreamDataPacket, error) { return m, nil } -func (c *agentClient) SendVar(ctx context.Context, in *SendVarReq, opts ...grpc.CallOption) (*SendVarResp, error) { +func (c *hubClient) SendVar(ctx context.Context, in *SendVarReq, opts ...grpc.CallOption) (*SendVarResp, error) { out := new(SendVarResp) - err := c.cc.Invoke(ctx, Agent_SendVar_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Hub_SendVar_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *agentClient) GetVar(ctx context.Context, in *GetVarReq, opts ...grpc.CallOption) (*GetVarResp, error) { +func (c *hubClient) GetVar(ctx context.Context, in *GetVarReq, opts ...grpc.CallOption) (*GetVarResp, error) { out := new(GetVarResp) - err := c.cc.Invoke(ctx, Agent_GetVar_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Hub_GetVar_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *agentClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) { +func (c *hubClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) { out := new(PingResp) - err := c.cc.Invoke(ctx, Agent_Ping_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Hub_Ping_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -// AgentServer is the server API for Agent service. -// All implementations must embed UnimplementedAgentServer +// HubServer is the server API for Hub service. +// All implementations must embed UnimplementedHubServer // for forward compatibility -type AgentServer interface { +type HubServer interface { ExecuteIOPlan(context.Context, *ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) - SendStream(Agent_SendStreamServer) error - GetStream(*GetStreamReq, Agent_GetStreamServer) error + SendStream(Hub_SendStreamServer) error + GetStream(*GetStreamReq, Hub_GetStreamServer) error SendVar(context.Context, *SendVarReq) (*SendVarResp, error) GetVar(context.Context, *GetVarReq) (*GetVarResp, error) Ping(context.Context, *PingReq) (*PingResp, error) - mustEmbedUnimplementedAgentServer() + mustEmbedUnimplementedHubServer() } -// UnimplementedAgentServer must be embedded to have forward compatible implementations. -type UnimplementedAgentServer struct { +// UnimplementedHubServer must be embedded to have forward compatible implementations. +type UnimplementedHubServer struct { } -func (UnimplementedAgentServer) ExecuteIOPlan(context.Context, *ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) { +func (UnimplementedHubServer) ExecuteIOPlan(context.Context, *ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) { return nil, status.Errorf(codes.Unimplemented, "method ExecuteIOPlan not implemented") } -func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error { +func (UnimplementedHubServer) SendStream(Hub_SendStreamServer) error { return status.Errorf(codes.Unimplemented, "method SendStream not implemented") } -func (UnimplementedAgentServer) GetStream(*GetStreamReq, Agent_GetStreamServer) error { +func (UnimplementedHubServer) GetStream(*GetStreamReq, Hub_GetStreamServer) error { return status.Errorf(codes.Unimplemented, "method GetStream not implemented") } -func (UnimplementedAgentServer) SendVar(context.Context, *SendVarReq) (*SendVarResp, error) { +func (UnimplementedHubServer) SendVar(context.Context, *SendVarReq) (*SendVarResp, error) { return nil, status.Errorf(codes.Unimplemented, "method SendVar not implemented") } -func (UnimplementedAgentServer) GetVar(context.Context, *GetVarReq) (*GetVarResp, error) { +func (UnimplementedHubServer) GetVar(context.Context, *GetVarReq) (*GetVarResp, error) { return nil, status.Errorf(codes.Unimplemented, "method GetVar not implemented") } -func (UnimplementedAgentServer) Ping(context.Context, *PingReq) (*PingResp, error) { +func (UnimplementedHubServer) Ping(context.Context, *PingReq) (*PingResp, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } -func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} +func (UnimplementedHubServer) mustEmbedUnimplementedHubServer() {} -// UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to AgentServer will +// UnsafeHubServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HubServer will // result in compilation errors. -type UnsafeAgentServer interface { - mustEmbedUnimplementedAgentServer() +type UnsafeHubServer interface { + mustEmbedUnimplementedHubServer() } -func RegisterAgentServer(s grpc.ServiceRegistrar, srv AgentServer) { - s.RegisterService(&Agent_ServiceDesc, srv) +func RegisterHubServer(s grpc.ServiceRegistrar, srv HubServer) { + s.RegisterService(&Hub_ServiceDesc, srv) } -func _Agent_ExecuteIOPlan_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Hub_ExecuteIOPlan_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ExecuteIOPlanReq) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(AgentServer).ExecuteIOPlan(ctx, in) + return srv.(HubServer).ExecuteIOPlan(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Agent_ExecuteIOPlan_FullMethodName, + FullMethod: Hub_ExecuteIOPlan_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AgentServer).ExecuteIOPlan(ctx, req.(*ExecuteIOPlanReq)) + return srv.(HubServer).ExecuteIOPlan(ctx, req.(*ExecuteIOPlanReq)) } return interceptor(ctx, in, info, handler) } -func _Agent_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(AgentServer).SendStream(&agentSendStreamServer{stream}) +func _Hub_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HubServer).SendStream(&hubSendStreamServer{stream}) } -type Agent_SendStreamServer interface { +type Hub_SendStreamServer interface { SendAndClose(*SendStreamResp) error Recv() (*StreamDataPacket, error) grpc.ServerStream } -type agentSendStreamServer struct { +type hubSendStreamServer struct { grpc.ServerStream } -func (x *agentSendStreamServer) SendAndClose(m *SendStreamResp) error { +func (x *hubSendStreamServer) SendAndClose(m *SendStreamResp) error { return x.ServerStream.SendMsg(m) } -func (x *agentSendStreamServer) Recv() (*StreamDataPacket, error) { +func (x *hubSendStreamServer) Recv() (*StreamDataPacket, error) { m := new(StreamDataPacket) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err @@ -243,116 +243,116 @@ func (x *agentSendStreamServer) Recv() (*StreamDataPacket, error) { return m, nil } -func _Agent_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error { +func _Hub_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(GetStreamReq) if err := stream.RecvMsg(m); err != nil { return err } - return srv.(AgentServer).GetStream(m, &agentGetStreamServer{stream}) + return srv.(HubServer).GetStream(m, &hubGetStreamServer{stream}) } -type Agent_GetStreamServer interface { +type Hub_GetStreamServer interface { Send(*StreamDataPacket) error grpc.ServerStream } -type agentGetStreamServer struct { +type hubGetStreamServer struct { grpc.ServerStream } -func (x *agentGetStreamServer) Send(m *StreamDataPacket) error { +func (x *hubGetStreamServer) Send(m *StreamDataPacket) error { return x.ServerStream.SendMsg(m) } -func _Agent_SendVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Hub_SendVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SendVarReq) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(AgentServer).SendVar(ctx, in) + return srv.(HubServer).SendVar(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Agent_SendVar_FullMethodName, + FullMethod: Hub_SendVar_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AgentServer).SendVar(ctx, req.(*SendVarReq)) + return srv.(HubServer).SendVar(ctx, req.(*SendVarReq)) } return interceptor(ctx, in, info, handler) } -func _Agent_GetVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Hub_GetVar_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetVarReq) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(AgentServer).GetVar(ctx, in) + return srv.(HubServer).GetVar(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Agent_GetVar_FullMethodName, + FullMethod: Hub_GetVar_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AgentServer).GetVar(ctx, req.(*GetVarReq)) + return srv.(HubServer).GetVar(ctx, req.(*GetVarReq)) } return interceptor(ctx, in, info, handler) } -func _Agent_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Hub_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PingReq) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(AgentServer).Ping(ctx, in) + return srv.(HubServer).Ping(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Agent_Ping_FullMethodName, + FullMethod: Hub_Ping_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AgentServer).Ping(ctx, req.(*PingReq)) + return srv.(HubServer).Ping(ctx, req.(*PingReq)) } return interceptor(ctx, in, info, handler) } -// Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. +// Hub_ServiceDesc is the grpc.ServiceDesc for Hub service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) -var Agent_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "Agent", - HandlerType: (*AgentServer)(nil), +var Hub_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Hub", + HandlerType: (*HubServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "ExecuteIOPlan", - Handler: _Agent_ExecuteIOPlan_Handler, + Handler: _Hub_ExecuteIOPlan_Handler, }, { MethodName: "SendVar", - Handler: _Agent_SendVar_Handler, + Handler: _Hub_SendVar_Handler, }, { MethodName: "GetVar", - Handler: _Agent_GetVar_Handler, + Handler: _Hub_GetVar_Handler, }, { MethodName: "Ping", - Handler: _Agent_Ping_Handler, + Handler: _Hub_Ping_Handler, }, }, Streams: []grpc.StreamDesc{ { StreamName: "SendStream", - Handler: _Agent_SendStream_Handler, + Handler: _Hub_SendStream_Handler, ClientStreams: true, }, { StreamName: "GetStream", - Handler: _Agent_GetStream_Handler, + Handler: _Hub_GetStream_Handler, ServerStreams: true, }, }, - Metadata: "pkgs/grpc/agent/agent.proto", + Metadata: "pkgs/grpc/hub/hub.proto", } diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 3588002..0bfd25e 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -1,4 +1,4 @@ -package agent +package hub import ( "context" @@ -13,7 +13,7 @@ import ( type Client struct { con *grpc.ClientConn - cli AgentClient + cli HubClient } func NewClient(addr string) (*Client, error) { @@ -24,7 +24,7 @@ func NewClient(addr string) (*Client, error) { return &Client{ con: con, - cli: NewAgentClient(con), + cli: NewHubClient(con), }, nil } @@ -42,7 +42,7 @@ func (c *Client) ExecuteIOPlan(ctx context.Context, plan exec.Plan) error { type grpcStreamReadCloser struct { io.ReadCloser - stream Agent_GetStreamClient + stream Hub_GetStreamClient cancelFn context.CancelFunc readingData []byte recvEOF bool diff --git a/common/pkgs/grpc/agent/pool.go b/common/pkgs/grpc/agent/pool.go index 3b06ec9..5831124 100644 --- a/common/pkgs/grpc/agent/pool.go +++ b/common/pkgs/grpc/agent/pool.go @@ -1,4 +1,4 @@ -package agent +package hub import ( "fmt" @@ -30,7 +30,7 @@ func NewPool(grpcCfg *PoolConfig) *Pool { } } -// 获取一个GRPC客户端。由于事先不能知道所有agent的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, +// 获取一个GRPC客户端。由于事先不能知道所有hub的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来, // Pool来决定要不要新建客户端。 func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) { addr := fmt.Sprintf("%s:%d", ip, port) diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go index 9846d35..db812b9 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -9,35 +9,35 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( - (*AgentWorker)(nil), + (*HubWorker)(nil), (*HttpHubWorker)(nil), ))) -type AgentWorker struct { +type HubWorker struct { Hub cortypes.Hub Address cortypes.GRPCAddressInfo } -func (w *AgentWorker) NewClient() (exec.WorkerClient, error) { - cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(w.Hub, w.Address)) +func (w *HubWorker) NewClient() (exec.WorkerClient, error) { + cli, err := stgglb.HubRPCPool.Acquire(stgglb.SelectGRPCAddress(w.Hub, w.Address)) if err != nil { return nil, err } - return &AgentWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil + return &HubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil } -func (w *AgentWorker) String() string { +func (w *HubWorker) String() string { return w.Hub.String() } -func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool { - aw, ok := worker.(*AgentWorker) +func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { + aw, ok := worker.(*HubWorker) if !ok { return false } @@ -45,25 +45,25 @@ func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool { return w.Hub.HubID == aw.Hub.HubID } -type AgentWorkerClient struct { +type HubWorkerClient struct { hubID cortypes.HubID cli *agtrpc.PoolClient } -func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { +func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { return c.cli.ExecuteIOPlan(ctx, plan) } -func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { +func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { return c.cli.SendStream(ctx, planID, id, io2.CounterCloser(stream, func(cnt int64, err error) { if stgglb.Stats.HubTransfer != nil { stgglb.Stats.HubTransfer.RecordOutput(c.hubID, cnt, err == nil || err == io.EOF) } })) } -func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { +func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { return c.cli.SendVar(ctx, planID, id, value) } -func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { +func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { str, err := c.cli.GetStream(ctx, planID, streamID, signalID, signal) if err != nil { return nil, err @@ -75,10 +75,10 @@ func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, s } }), nil } -func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { +func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { return c.cli.GetVar(ctx, planID, varID, signalID, signal) } -func (c *AgentWorkerClient) Close() error { - stgglb.AgentRPCPool.Release(c.cli) +func (c *HubWorkerClient) Close() error { + stgglb.HubRPCPool.Release(c.cli) return nil } diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index 2e1447a..ab64acc 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -106,6 +106,6 @@ func (c *HttpHubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, va return resp.Value, err } func (c *HttpHubWorkerClient) Close() error { - //stgglb.AgentRPCPool.Release(c.cli) + //stgglb.HubRPCPool.Release(c.cli) return nil } diff --git a/common/pkgs/ioswitch2/ops2/faas.go b/common/pkgs/ioswitch2/ops2/faas.go index b4f4f47..a9da667 100644 --- a/common/pkgs/ioswitch2/ops2/faas.go +++ b/common/pkgs/ioswitch2/ops2/faas.go @@ -20,7 +20,7 @@ type InternalFaaSGalMultiply struct { } func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + stgAgts, err := exec.GetValueByType[*agtpool.HubPool](ctx) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index 90453e2..3701586 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -292,7 +292,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e t.Env().Pinned = true case *cortypes.GRPCAddressInfo: - t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr}) + t.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: f.Hub, Address: *addr}) t.Env().Pinned = true default: @@ -383,7 +383,7 @@ func setEnvByAddress(n dag.Node, hub cortypes.Hub, addr cortypes.HubAddressInfo) n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}) case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr}) + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: hub, Address: *addr}) default: return fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 60569f0..cbb43b3 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -103,7 +103,7 @@ func UseECMultiplier(ctx *state.GenerateState) { callMul.Env().Pinned = true case *cortypes.GRPCAddressInfo: - callMul.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: to.Hub, Address: *addr}) + callMul.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: to.Hub, Address: *addr}) callMul.Env().Pinned = true default: diff --git a/common/pkgs/ioswitch2/plans/utils.go b/common/pkgs/ioswitch2/plans/utils.go index 5522b2b..25f2285 100644 --- a/common/pkgs/ioswitch2/plans/utils.go +++ b/common/pkgs/ioswitch2/plans/utils.go @@ -12,7 +12,7 @@ func getWorkerInfo(hub cortypes.Hub) exec.WorkerInfo { return &ioswitch2.HttpHubWorker{Hub: hub} case *cortypes.GRPCAddressInfo: - return &ioswitch2.AgentWorker{Hub: hub, Address: *addr} + return &ioswitch2.HubWorker{Hub: hub, Address: *addr} default: return nil diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index d0de079..3638e9c 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -6,34 +6,34 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) // var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( -// (*AgentWorker)(nil), +// (*HubWorker)(nil), // ))) -type AgentWorker struct { +type HubWorker struct { Hub cortypes.Hub Address cortypes.GRPCAddressInfo } -func (w *AgentWorker) NewClient() (exec.WorkerClient, error) { - cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(w.Hub, w.Address)) +func (w *HubWorker) NewClient() (exec.WorkerClient, error) { + cli, err := stgglb.HubRPCPool.Acquire(stgglb.SelectGRPCAddress(w.Hub, w.Address)) if err != nil { return nil, err } - return &AgentWorkerClient{cli: cli}, nil + return &HubWorkerClient{cli: cli}, nil } -func (w *AgentWorker) String() string { +func (w *HubWorker) String() string { return w.Hub.String() } -func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool { - aw, ok := worker.(*AgentWorker) +func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { + aw, ok := worker.(*HubWorker) if !ok { return false } @@ -41,26 +41,26 @@ func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool { return w.Hub.HubID == aw.Hub.HubID } -type AgentWorkerClient struct { +type HubWorkerClient struct { cli *agtrpc.PoolClient } -func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { +func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { return c.cli.ExecuteIOPlan(ctx, plan) } -func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { +func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { return c.cli.SendStream(ctx, planID, id, stream) } -func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { +func (c *HubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { return c.cli.SendVar(ctx, planID, id, value) } -func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { +func (c *HubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { return c.cli.GetStream(ctx, planID, streamID, signalID, signal) } -func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { +func (c *HubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { return c.cli.GetVar(ctx, planID, varID, signalID, signal) } -func (c *AgentWorkerClient) Close() error { - stgglb.AgentRPCPool.Release(c.cli) +func (c *HubWorkerClient) Close() error { + stgglb.HubRPCPool.Release(c.cli) return nil } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index c9b194a..5587c67 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -72,7 +72,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err } // TODO2 支持HTTP协议 - t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Hub: f.Hub, Address: *f.Hub.Address.(*cortypes.GRPCAddressInfo)}) + t.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: f.Hub, Address: *f.Hub.Address.(*cortypes.GRPCAddressInfo)}) t.Env().Pinned = true return t, nil @@ -106,7 +106,7 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub}) // TODO2 支持HTTP协议 case *cortypes.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Hub: t.Hub, Address: *addr}) + n.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: t.Hub, Address: *addr}) default: return nil, fmt.Errorf("unsupported node address type %T", addr) diff --git a/common/pkgs/mq/agent/agent.go b/common/pkgs/mq/agent/agent.go index 3109663..9007335 100644 --- a/common/pkgs/mq/agent/agent.go +++ b/common/pkgs/mq/agent/agent.go @@ -1,14 +1,14 @@ -package agent +package hub import ( "gitlink.org.cn/cloudream/common/pkgs/mq" ) -type AgentService interface { +type HubService interface { GetState(msg *GetState) (*GetStateResp, *mq.CodeMessage) } -// 获取agent状态 +// 获取hub状态 var _ = Register(Service.GetState) type GetState struct { diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index 3a72bf3..7a30149 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -1,4 +1,4 @@ -package agent +package hub import ( "sync" @@ -14,7 +14,7 @@ type Client struct { } func NewClient(id cdssdk.HubID, cfg mq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.MakeAgentQueueName(int64(id)), "") + rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.MakeHubQueueName(int64(id)), "") if err != nil { return nil, err } diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 9a8b090..2e1c735 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -1,4 +1,4 @@ -package agent +package hub import ( "gitlink.org.cn/cloudream/common/pkgs/mq" @@ -12,7 +12,7 @@ type Service interface { // CacheService - AgentService + HubService } type Server struct { @@ -27,7 +27,7 @@ func NewServer(svc Service, id cdssdk.HubID, cfg mq.Config) (*Server, error) { rabbitSvr, err := mq.NewRabbitMQServer( cfg, - mymq.MakeAgentQueueName(int64(id)), + mymq.MakeHubQueueName(int64(id)), func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index 88ee3ab..71aadcd 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -1,4 +1,4 @@ -package agent +package hub /* import ( diff --git a/common/pkgs/mq/consts.go b/common/pkgs/mq/consts.go index e7085b0..d7562d4 100644 --- a/common/pkgs/mq/consts.go +++ b/common/pkgs/mq/consts.go @@ -8,6 +8,6 @@ const ( DATAMAP_QUEUE_NAME = "DataMap" ) -func MakeAgentQueueName(id int64) string { - return fmt.Sprintf("Agent@%d", id) +func MakeHubQueueName(id int64) string { + return fmt.Sprintf("Hub@%d", id) } diff --git a/common/pkgs/mq/scanner/event/agent_check_shardstore.go b/common/pkgs/mq/scanner/event/agent_check_shardstore.go index 6494b4e..a7ff49c 100644 --- a/common/pkgs/mq/scanner/event/agent_check_shardstore.go +++ b/common/pkgs/mq/scanner/event/agent_check_shardstore.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentCheckShardStore struct { +type HubCheckShardStore struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentCheckShardStore(stgID cdssdk.StorageID) *AgentCheckShardStore { - return &AgentCheckShardStore{ +func NewHubCheckShardStore(stgID cdssdk.StorageID) *HubCheckShardStore { + return &HubCheckShardStore{ StorageID: stgID, } } func init() { - Register[*AgentCheckShardStore]() + Register[*HubCheckShardStore]() } diff --git a/common/pkgs/mq/scanner/event/agent_check_state.go b/common/pkgs/mq/scanner/event/agent_check_state.go index bbc1fb3..529e0cb 100644 --- a/common/pkgs/mq/scanner/event/agent_check_state.go +++ b/common/pkgs/mq/scanner/event/agent_check_state.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentCheckState struct { +type HubCheckState struct { EventBase HubID cdssdk.HubID `json:"hubID"` } -func NewAgentCheckState(hubID cdssdk.HubID) *AgentCheckState { - return &AgentCheckState{ +func NewHubCheckState(hubID cdssdk.HubID) *HubCheckState { + return &HubCheckState{ HubID: hubID, } } func init() { - Register[*AgentCheckState]() + Register[*HubCheckState]() } diff --git a/common/pkgs/mq/scanner/event/agent_check_storage.go b/common/pkgs/mq/scanner/event/agent_check_storage.go index 078e55c..d3f8c0f 100644 --- a/common/pkgs/mq/scanner/event/agent_check_storage.go +++ b/common/pkgs/mq/scanner/event/agent_check_storage.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentCheckStorage struct { +type HubCheckStorage struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentCheckStorage(storageID cdssdk.StorageID) *AgentCheckStorage { - return &AgentCheckStorage{ +func NewHubCheckStorage(storageID cdssdk.StorageID) *HubCheckStorage { + return &HubCheckStorage{ StorageID: storageID, } } func init() { - Register[*AgentCheckStorage]() + Register[*HubCheckStorage]() } diff --git a/common/pkgs/mq/scanner/event/agent_shardstore_gc.go b/common/pkgs/mq/scanner/event/agent_shardstore_gc.go index 868b8d0..e4a7f62 100644 --- a/common/pkgs/mq/scanner/event/agent_shardstore_gc.go +++ b/common/pkgs/mq/scanner/event/agent_shardstore_gc.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentShardStoreGC struct { +type HubShardStoreGC struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentShardStoreGC(stgID cdssdk.StorageID) *AgentShardStoreGC { - return &AgentShardStoreGC{ +func NewHubShardStoreGC(stgID cdssdk.StorageID) *HubShardStoreGC { + return &HubShardStoreGC{ StorageID: stgID, } } func init() { - Register[*AgentShardStoreGC]() + Register[*HubShardStoreGC]() } diff --git a/common/pkgs/mq/scanner/event/agent_storage_gc.go b/common/pkgs/mq/scanner/event/agent_storage_gc.go index ee4c373..683c325 100644 --- a/common/pkgs/mq/scanner/event/agent_storage_gc.go +++ b/common/pkgs/mq/scanner/event/agent_storage_gc.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentStorageGC struct { +type HubStorageGC struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentStorageGC(storageID cdssdk.StorageID) *AgentStorageGC { - return &AgentStorageGC{ +func NewHubStorageGC(storageID cdssdk.StorageID) *HubStorageGC { + return &HubStorageGC{ StorageID: storageID, } } func init() { - Register[*AgentStorageGC]() + Register[*HubStorageGC]() } diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index c5d11ce..1ea8a85 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -8,27 +8,27 @@ import ( "github.com/go-co-op/gocron/v2" "github.com/spf13/cobra" - "gitlink.org.cn/cloudream/storage2/agent/internal/http" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" + "gitlink.org.cn/cloudream/storage2/hub/internal/http" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage2/agent/internal/config" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/models/datamap" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" + "gitlink.org.cn/cloudream/storage2/hub/internal/config" "google.golang.org/grpc" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" - grpcsvc "gitlink.org.cn/cloudream/storage2/agent/internal/grpc" - cmdsvc "gitlink.org.cn/cloudream/storage2/agent/internal/mq" + grpcsvc "gitlink.org.cn/cloudream/storage2/hub/internal/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage2/hub/internal/mq" ) func init() { @@ -60,7 +60,7 @@ func serve(configPath string, httpAddr string) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 @@ -151,15 +151,15 @@ func serve(configPath string, httpAddr string) { defer sch.Shutdown() // 启动命令服务器 - // TODO 需要设计AgentID持久化机制 + // TODO 需要设计HubID持久化机制 agtSvr, err := agtmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ) if err != nil { - logger.Fatalf("new agent server failed, err: %s", err.Error()) + logger.Fatalf("new hub server failed, err: %s", err.Error()) } agtSvr.OnError(func(err error) { - logger.Warnf("agent server err: %s", err.Error()) + logger.Warnf("hub server err: %s", err.Error()) }) - go serveAgentServer(agtSvr) + go serveHubServer(agtSvr) // 启动GRPC服务 listenAddr := config.Cfg().GRPC.MakeListenAddress() @@ -168,7 +168,7 @@ func serve(configPath string, httpAddr string) { logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgPool)) + agtrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool)) go serveGRPC(s, lis) go serveDistLock(distlock) @@ -251,7 +251,7 @@ func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Schedu return sch } -func serveAgentServer(server *agtmq.Server) { +func serveHubServer(server *agtmq.Server) { logger.Info("start serving command server") ch := server.Start() diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index 0556984..d3ee692 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -25,7 +25,7 @@ var cfg Config func Init(path string) error { if path == "" { - return c.DefaultLoad("agent", &cfg) + return c.DefaultLoad("hub", &cfg) } return c.Load(path, &cfg) diff --git a/hub/internal/grpc/io.go b/hub/internal/grpc/io.go index 81a4a20..be4dbd4 100644 --- a/hub/internal/grpc/io.go +++ b/hub/internal/grpc/io.go @@ -11,7 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) { @@ -40,7 +40,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe return &agtrpc.ExecuteIOPlanResp{}, nil } -func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { +func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error { msg, err := server.Recv() if err != nil { return fmt.Errorf("recving stream id packet: %w", err) @@ -114,7 +114,7 @@ func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { } } -func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStreamServer) error { +func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStreamServer) error { logger. WithField("PlanID", req.PlanID). WithField("VarID", req.VarID). diff --git a/hub/internal/grpc/ping.go b/hub/internal/grpc/ping.go index b344339..0e1313a 100644 --- a/hub/internal/grpc/ping.go +++ b/hub/internal/grpc/ping.go @@ -3,7 +3,7 @@ package grpc import ( "context" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) // Ping 是一个RPC方法,用于验证服务的可用性。 diff --git a/hub/internal/grpc/service.go b/hub/internal/grpc/service.go index 3cee6f8..7d6ef1d 100644 --- a/hub/internal/grpc/service.go +++ b/hub/internal/grpc/service.go @@ -2,12 +2,12 @@ package grpc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - agentserver "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + hubserver "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Service struct { - agentserver.AgentServer + hubserver.HubServer swWorker *exec.Worker stgAgts *pool.Pool } diff --git a/hub/internal/mq/agent.go b/hub/internal/mq/agent.go index 73f4c77..5f46a1b 100644 --- a/hub/internal/mq/agent.go +++ b/hub/internal/mq/agent.go @@ -2,7 +2,7 @@ package mq import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" ) func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { diff --git a/hub/internal/mq/storage.go b/hub/internal/mq/storage.go index 908bd08..f566645 100644 --- a/hub/internal/mq/storage.go +++ b/hub/internal/mq/storage.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" ) diff --git a/hub/internal/task/task.go b/hub/internal/task/task.go index 379346e..b1c4bdf 100644 --- a/hub/internal/task/task.go +++ b/hub/internal/task/task.go @@ -17,7 +17,7 @@ type TaskContext struct { connectivity *connectivity.Collector downloader *downloader.Downloader accessStat *accessstat.AccessStat - stgAgts *agtpool.AgentPool + stgAgts *agtpool.HubPool uploader *uploader.Uploader } @@ -36,7 +36,7 @@ type Task = task.Task[TaskContext] // CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.AgentPool, uploader *uploader.Uploader) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.HubPool, uploader *uploader.Uploader) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, diff --git a/hub/internal/tickevent/report_hub_stats.go b/hub/internal/tickevent/report_hub_stats.go index 60d67d3..66a4339 100644 --- a/hub/internal/tickevent/report_hub_stats.go +++ b/hub/internal/tickevent/report_hub_stats.go @@ -35,7 +35,7 @@ func ReportHubTransferStats(evtPub *sysevent.Publisher) { } } -func ReportHubStorageTransferStats(stgAgts *agtpool.AgentPool, evtPub *sysevent.Publisher) { +func ReportHubStorageTransferStats(stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) { if stgglb.Stats.HubStorageTransfer == nil { return } diff --git a/hub/internal/tickevent/report_storage_stats.go b/hub/internal/tickevent/report_storage_stats.go index 1ea1974..6858e0e 100644 --- a/hub/internal/tickevent/report_storage_stats.go +++ b/hub/internal/tickevent/report_storage_stats.go @@ -7,8 +7,8 @@ import ( "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) -func ReportStorageStats(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) { - stgs := agtPool.GetAllAgents() +func ReportStorageStats(agtPool *agtpool.HubPool, evtPub *sysevent.Publisher) { + stgs := agtPool.GetAllHubs() for _, stg := range stgs { shard, err := stg.GetShardStore() if err != nil { diff --git a/hub/main.go b/hub/main.go index 79d58c0..02f6831 100644 --- a/hub/main.go +++ b/hub/main.go @@ -1,6 +1,6 @@ package main -import "gitlink.org.cn/cloudream/storage2/agent/internal/cmd" +import "gitlink.org.cn/cloudream/storage2/hub/internal/cmd" func main() { cmd.RootCmd.Execute() diff --git a/magefiles/main.go b/magefiles/main.go index 2ee4010..fb34405 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -37,7 +37,7 @@ func All() error { } func Bin() error { - if err := Agent(); err != nil { + if err := Hub(); err != nil { return err } if err := Client(); err != nil { @@ -46,9 +46,6 @@ func Bin() error { if err := Coordinator(); err != nil { return err } - if err := Scanner(); err != nil { - return err - } return nil } @@ -99,12 +96,12 @@ func Confs() error { return cp.Copy(confDir, fullDirPath) } -func Agent() error { +func Hub() error { return magefiles.Build(magefiles.BuildArgs{ - OutputName: "agent", - OutputDir: "agent", + OutputName: "hub", + OutputDir: "hub", AssetsDir: "assets", - EntryFile: "agent/main.go", + EntryFile: "hub/main.go", }) } @@ -125,12 +122,3 @@ func Coordinator() error { EntryFile: "coordinator/main.go", }) } - -func Scanner() error { - return magefiles.Build(magefiles.BuildArgs{ - OutputName: "scanner", - OutputDir: "scanner", - AssetsDir: "assets", - EntryFile: "scanner/main.go", - }) -} diff --git a/scanner/internal/event/agent_check_shardstore.go b/scanner/internal/event/agent_check_shardstore.go index 17a7e9c..d3694bf 100644 --- a/scanner/internal/event/agent_check_shardstore.go +++ b/scanner/internal/event/agent_check_shardstore.go @@ -10,26 +10,26 @@ import ( stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" scevt "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner/event" ) -// AgentCheckShardStore 代表一个用于处理代理缓存检查事件的结构体 -type AgentCheckShardStore struct { - *scevt.AgentCheckShardStore +// HubCheckShardStore 代表一个用于处理代理缓存检查事件的结构体 +type HubCheckShardStore struct { + *scevt.HubCheckShardStore } -// NewAgentCheckShardStore 创建一个新的 AgentCheckCache 实例 -func NewAgentCheckShardStore(evt *scevt.AgentCheckShardStore) *AgentCheckShardStore { - return &AgentCheckShardStore{ - AgentCheckShardStore: evt, +// NewHubCheckShardStore 创建一个新的 HubCheckCache 实例 +func NewHubCheckShardStore(evt *scevt.HubCheckShardStore) *HubCheckShardStore { + return &HubCheckShardStore{ + HubCheckShardStore: evt, } } // TryMerge 尝试合并当前事件与另一个事件 // 如果另一个事件类型不匹配或节点ID不同,则不进行合并 -func (t *AgentCheckShardStore) TryMerge(other Event) bool { - event, ok := other.(*AgentCheckShardStore) +func (t *HubCheckShardStore) TryMerge(other Event) bool { + event, ok := other.(*HubCheckShardStore) if !ok { return false } @@ -42,10 +42,10 @@ func (t *AgentCheckShardStore) TryMerge(other Event) bool { } // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录 -func (t *AgentCheckShardStore) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentCheckShardStore]("Event") +func (t *HubCheckShardStore) Execute(execCtx ExecuteContext) { + log := logger.WithType[HubCheckShardStore]("Event") startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckShardStore)) + log.Debugf("begin with %v", logger.FormatStruct(t.HubCheckShardStore)) defer func() { log.Debugf("end, time: %v", time.Since(startTime)) }() @@ -56,12 +56,12 @@ func (t *AgentCheckShardStore) Execute(execCtx ExecuteContext) { return } - agtCli, err := stgglb.AgentMQPool.Acquire(stg.MasterHub) + agtCli, err := stgglb.HubMQPool.Acquire(stg.MasterHub) if err != nil { - log.WithField("StorageID", t.StorageID).Warnf("create agent client failed, err: %s", err.Error()) + log.WithField("StorageID", t.StorageID).Warnf("create hub client failed, err: %s", err.Error()) return } - defer stgglb.AgentMQPool.Release(agtCli) + defer stgglb.HubMQPool.Release(agtCli) checkResp, err := agtCli.CheckCache(agtmq.NewCheckCache(t.StorageID), mq.RequestOption{Timeout: time.Minute}) if err != nil { @@ -83,8 +83,8 @@ func (t *AgentCheckShardStore) Execute(execCtx ExecuteContext) { } // checkCache 对比Cache表中的记录,根据实际存在的文件哈希值,进行增加或删除操作 -func (t *AgentCheckShardStore) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckShardStore]("Event") +func (t *HubCheckShardStore) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[HubCheckShardStore]("Event") caches, err := execCtx.Args.DB.Cache().GetByStorageID(tx, t.StorageID) if err != nil { @@ -123,8 +123,8 @@ func (t *AgentCheckShardStore) checkCache(execCtx ExecuteContext, tx db2.SQLCont } // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作 -func (t *AgentCheckShardStore) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckShardStore]("Event") +func (t *HubCheckShardStore) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[HubCheckShardStore]("Event") objs, err := execCtx.Args.DB.PinnedObject().GetObjectsByStorageID(tx, t.StorageID) if err != nil { @@ -149,8 +149,8 @@ func (t *AgentCheckShardStore) checkPinnedObject(execCtx ExecuteContext, tx db2. } // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作 -func (t *AgentCheckShardStore) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckShardStore]("Event") +func (t *HubCheckShardStore) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[HubCheckShardStore]("Event") blocks, err := execCtx.Args.DB.ObjectBlock().GetByStorageID(tx, t.StorageID) if err != nil { @@ -174,7 +174,7 @@ func (t *AgentCheckShardStore) checkObjectBlock(execCtx ExecuteContext, tx db2.S } } -// init 注册AgentCheckCache消息转换器 +// init 注册HubCheckCache消息转换器 func init() { - RegisterMessageConvertor(NewAgentCheckShardStore) + RegisterMessageConvertor(NewHubCheckShardStore) } diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index 6e8d628..9a5f87d 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -8,23 +8,23 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage2/common/consts" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" scevt "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner/event" "gitlink.org.cn/cloudream/storage2/scanner/internal/config" ) -type AgentCheckState struct { - *scevt.AgentCheckState +type HubCheckState struct { + *scevt.HubCheckState } -func NewAgentCheckState(evt *scevt.AgentCheckState) *AgentCheckState { - return &AgentCheckState{ - AgentCheckState: evt, +func NewHubCheckState(evt *scevt.HubCheckState) *HubCheckState { + return &HubCheckState{ + HubCheckState: evt, } } -func (t *AgentCheckState) TryMerge(other Event) bool { - event, ok := other.(*AgentCheckState) +func (t *HubCheckState) TryMerge(other Event) bool { + event, ok := other.(*HubCheckState) if !ok { return false } @@ -32,9 +32,9 @@ func (t *AgentCheckState) TryMerge(other Event) bool { return t.HubID == event.HubID } -func (t *AgentCheckState) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentCheckState]("Event") - log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckState)) +func (t *HubCheckState) Execute(execCtx ExecuteContext) { + log := logger.WithType[HubCheckState]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t.HubCheckState)) defer log.Debugf("end") hub, err := execCtx.Args.DB.Hub().GetByID(execCtx.Args.DB.DefCtx(), t.HubID) @@ -47,12 +47,12 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - agtCli, err := stgglb.AgentMQPool.Acquire(t.HubID) + agtCli, err := stgglb.HubMQPool.Acquire(t.HubID) if err != nil { - log.WithField("HubID", t.HubID).Warnf("create agent client failed, err: %s", err.Error()) + log.WithField("HubID", t.HubID).Warnf("create hub client failed, err: %s", err.Error()) return } - defer stgglb.AgentMQPool.Release(agtCli) + defer stgglb.HubMQPool.Release(agtCli) _, err = agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) if err != nil { @@ -77,5 +77,5 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } func init() { - RegisterMessageConvertor(NewAgentCheckState) + RegisterMessageConvertor(NewHubCheckState) } diff --git a/scanner/internal/event/agent_shardstore_gc.go b/scanner/internal/event/agent_shardstore_gc.go index 9fe331c..290e364 100644 --- a/scanner/internal/event/agent_shardstore_gc.go +++ b/scanner/internal/event/agent_shardstore_gc.go @@ -11,28 +11,28 @@ import ( "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" + agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" scevt "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner/event" ) -// AgentShardStoreGC 类封装了扫描器事件中的AgentShardStoreGC结构。 -type AgentShardStoreGC struct { - *scevt.AgentShardStoreGC +// HubShardStoreGC 类封装了扫描器事件中的HubShardStoreGC结构。 +type HubShardStoreGC struct { + *scevt.HubShardStoreGC } -// NewAgentShardStoreGC 创建一个新的AgentCacheGC实例。 -// evt: 传入的扫描器事件中的AgentCacheGC实例。 -func NewAgentShardStoreGC(evt *scevt.AgentShardStoreGC) *AgentShardStoreGC { - return &AgentShardStoreGC{ - AgentShardStoreGC: evt, +// NewHubShardStoreGC 创建一个新的HubCacheGC实例。 +// evt: 传入的扫描器事件中的HubCacheGC实例。 +func NewHubShardStoreGC(evt *scevt.HubShardStoreGC) *HubShardStoreGC { + return &HubShardStoreGC{ + HubShardStoreGC: evt, } } // TryMerge 尝试合并当前事件与另一个事件。 // other: 待合并的另一个事件。 // 返回值表示是否成功合并。 -func (t *AgentShardStoreGC) TryMerge(other Event) bool { - event, ok := other.(*AgentShardStoreGC) +func (t *HubShardStoreGC) TryMerge(other Event) bool { + event, ok := other.(*HubShardStoreGC) if !ok { return false } @@ -46,10 +46,10 @@ func (t *AgentShardStoreGC) TryMerge(other Event) bool { // Execute 执行垃圾回收操作。 // execCtx: 执行上下文,包含执行所需的各种参数和环境。 -func (t *AgentShardStoreGC) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentShardStoreGC]("Event") +func (t *HubShardStoreGC) Execute(execCtx ExecuteContext) { + log := logger.WithType[HubShardStoreGC]("Event") startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.AgentShardStoreGC)) + log.Debugf("begin with %v", logger.FormatStruct(t.HubShardStoreGC)) defer func() { log.Debugf("end, time: %v", time.Since(startTime)) }() @@ -103,12 +103,12 @@ func (t *AgentShardStoreGC) Execute(execCtx ExecuteContext) { } // 获取与节点通信的代理客户端 - agtCli, err := stgglb.AgentMQPool.Acquire(masterHub.HubID) + agtCli, err := stgglb.HubMQPool.Acquire(masterHub.HubID) if err != nil { - log.WithField("HubID", t.StorageID).Warnf("create agent client failed, err: %s", err.Error()) + log.WithField("HubID", t.StorageID).Warnf("create hub client failed, err: %s", err.Error()) return } - defer stgglb.AgentMQPool.Release(agtCli) + defer stgglb.HubMQPool.Release(agtCli) // 向代理发送垃圾回收请求 _, err = agtCli.CacheGC(agtmq.ReqCacheGC(t.StorageID, allFileHashes), mq.RequestOption{Timeout: time.Minute}) @@ -118,7 +118,7 @@ func (t *AgentShardStoreGC) Execute(execCtx ExecuteContext) { } } -// 注册消息转换器,使系统能够处理AgentCacheGC消息。 +// 注册消息转换器,使系统能够处理HubCacheGC消息。 func init() { - RegisterMessageConvertor(NewAgentShardStoreGC) + RegisterMessageConvertor(NewHubShardStoreGC) } diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 2b35301..9923608 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -1356,11 +1356,11 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O } // func (t *CheckPackageRedundancy) pinObject(hubID cdssdk.HubID, fileHash string) error { -// agtCli, err := stgglb.AgentMQPool.Acquire(hubID) +// agtCli, err := stgglb.HubMQPool.Acquire(hubID) // if err != nil { -// return fmt.Errorf("new agent client: %w", err) +// return fmt.Errorf("new hub client: %w", err) // } -// defer stgglb.AgentMQPool.Release(agtCli) +// defer stgglb.HubMQPool.Release(agtCli) // _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) // if err != nil { diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index 293cd95..97da54e 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -16,7 +16,7 @@ import ( type ExecuteArgs struct { DB *db2.DB DistLock *distlock.Service - StgMgr *agtpool.AgentPool + StgMgr *agtpool.HubPool EvtPub *sysevent.Publisher } @@ -28,7 +28,7 @@ type Event = event.Event[ExecuteArgs] type ExecuteOption = event.ExecuteOption -func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.AgentPool, evtPub *sysevent.Publisher) Executor { +func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) Executor { return event.NewExecutor(ExecuteArgs{ DB: db, DistLock: distLock, diff --git a/scanner/internal/tickevent/batch_all_agent_check_shardstore.go b/scanner/internal/tickevent/batch_all_agent_check_shardstore.go index f60dcc4..3c4322a 100644 --- a/scanner/internal/tickevent/batch_all_agent_check_shardstore.go +++ b/scanner/internal/tickevent/batch_all_agent_check_shardstore.go @@ -7,18 +7,18 @@ import ( "gitlink.org.cn/cloudream/storage2/scanner/internal/event" ) -const AGENT_CHECK_CACHE_BATCH_SIZE = 2 +const HUB_CHECK_CACHE_BATCH_SIZE = 2 -type BatchAllAgentCheckShardStore struct { +type BatchAllHubCheckShardStore struct { stgIDs []cdssdk.StorageID } -func NewBatchAllAgentCheckShardStore() *BatchAllAgentCheckShardStore { - return &BatchAllAgentCheckShardStore{} +func NewBatchAllHubCheckShardStore() *BatchAllHubCheckShardStore { + return &BatchAllHubCheckShardStore{} } -func (e *BatchAllAgentCheckShardStore) Execute(ctx ExecuteContext) { - log := logger.WithType[BatchAllAgentCheckShardStore]("TickEvent") +func (e *BatchAllHubCheckShardStore) Execute(ctx ExecuteContext) { + log := logger.WithType[BatchAllHubCheckShardStore]("TickEvent") log.Debugf("begin") defer log.Debugf("end") @@ -34,9 +34,9 @@ func (e *BatchAllAgentCheckShardStore) Execute(ctx ExecuteContext) { } checkedCnt := 0 - for ; checkedCnt < len(e.stgIDs) && checkedCnt < AGENT_CHECK_CACHE_BATCH_SIZE; checkedCnt++ { + for ; checkedCnt < len(e.stgIDs) && checkedCnt < HUB_CHECK_CACHE_BATCH_SIZE; checkedCnt++ { // nil代表进行全量检查 - ctx.Args.EventExecutor.Post(event.NewAgentCheckShardStore(scevt.NewAgentCheckShardStore(e.stgIDs[checkedCnt]))) + ctx.Args.EventExecutor.Post(event.NewHubCheckShardStore(scevt.NewHubCheckShardStore(e.stgIDs[checkedCnt]))) } e.stgIDs = e.stgIDs[checkedCnt:] } diff --git a/scanner/internal/tickevent/check_agent_state.go b/scanner/internal/tickevent/check_agent_state.go index 2108af0..2195f1b 100644 --- a/scanner/internal/tickevent/check_agent_state.go +++ b/scanner/internal/tickevent/check_agent_state.go @@ -6,15 +6,15 @@ import ( "gitlink.org.cn/cloudream/storage2/scanner/internal/event" ) -type CheckAgentState struct { +type CheckHubState struct { } -func NewCheckAgentState() *CheckAgentState { - return &CheckAgentState{} +func NewCheckHubState() *CheckHubState { + return &CheckHubState{} } -func (e *CheckAgentState) Execute(ctx ExecuteContext) { - log := logger.WithType[CheckAgentState]("TickEvent") +func (e *CheckHubState) Execute(ctx ExecuteContext) { + log := logger.WithType[CheckHubState]("TickEvent") log.Debugf("begin") defer log.Debugf("end") @@ -25,7 +25,7 @@ func (e *CheckAgentState) Execute(ctx ExecuteContext) { } for _, hub := range hubs { - ctx.Args.EventExecutor.Post(event.NewAgentCheckState(scevt.NewAgentCheckState(hub.HubID)), event.ExecuteOption{ + ctx.Args.EventExecutor.Post(event.NewHubCheckState(scevt.NewHubCheckState(hub.HubID)), event.ExecuteOption{ IsEmergency: true, DontMerge: true, }) diff --git a/scanner/internal/tickevent/storage_gc.go b/scanner/internal/tickevent/storage_gc.go index 7dd5379..80ae417 100644 --- a/scanner/internal/tickevent/storage_gc.go +++ b/scanner/internal/tickevent/storage_gc.go @@ -40,6 +40,6 @@ func (e *StorageGC) Execute(ctx ExecuteContext) { return } - ctx.Args.EventExecutor.Post(event.NewAgentShardStoreGC(scevt.NewAgentShardStoreGC(e.storageIDs[0]))) + ctx.Args.EventExecutor.Post(event.NewHubShardStoreGC(scevt.NewHubShardStoreGC(e.storageIDs[0]))) e.storageIDs = e.storageIDs[1:] } diff --git a/scanner/main.go b/scanner/main.go index be42611..7155b3a 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -10,7 +10,7 @@ import ( stgmod "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" scmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" @@ -40,7 +40,7 @@ func main() { stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) // 启动分布式锁服务 distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) @@ -67,10 +67,10 @@ func main() { agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ) if err != nil { - logger.Fatalf("new agent server failed, err: %s", err.Error()) + logger.Fatalf("new hub server failed, err: %s", err.Error()) } agtSvr.OnError(func(err error) { - logger.Warnf("agent server err: %s", err.Error()) + logger.Warnf("hub server err: %s", err.Error()) }) go serveScannerServer(agtSvr) @@ -185,11 +185,11 @@ func startTickEvent(tickExecutor *tickevent.Executor) { interval := 5 * 60 * 1000 - tickExecutor.Start(tickevent.NewBatchAllAgentCheckShardStore(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) + tickExecutor.Start(tickevent.NewBatchAllHubCheckShardStore(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) tickExecutor.Start(tickevent.NewStorageGC(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) + tickExecutor.Start(tickevent.NewCheckHubState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})