diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 246bad2..56b81ee 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -129,7 +129,7 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa if !ok { return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID) } - srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *srcAddr)) + srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(srcSpace.MasterHub, srcAddr)) defer srcSpaceCli.Release() dstSpace := svc.UserSpaceMeta.Get(dstSpaceID) @@ -143,7 +143,7 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa if !ok { return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID) } - dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*dstSpace.MasterHub, *dstAddr)) + dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(dstSpace.MasterHub, dstAddr)) defer dstSpaceCli.Release() srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator) diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index 305169c..5c7fc49 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -27,9 +27,9 @@ func (j *ChangeRedundancy) Name() string { func (j *ChangeRedundancy) Execute(t *TickTock) { log := logger.WithType[ChangeRedundancy]("TickTock") startTime := time.Now() - log.Debugf("job start") + log.Infof("job start") defer func() { - log.Debugf("job end, time: %v", time.Since(startTime)) + log.Infof("job end, time: %v", time.Since(startTime)) }() ctx := &changeRedundancyContext{ diff --git a/client/internal/ticktock/check_shardstore.go b/client/internal/ticktock/check_shardstore.go index 57f4170..a7a1d59 100644 --- a/client/internal/ticktock/check_shardstore.go +++ b/client/internal/ticktock/check_shardstore.go @@ -28,9 +28,9 @@ func (j *CheckShardStore) Name() string { func (j *CheckShardStore) Execute(t *TickTock) { log := logger.WithType[CheckShardStore]("TickTock") startTime := time.Now() - log.Debugf("job start") + log.Infof("job start") defer func() { - log.Debugf("job end, time: %v", time.Since(startTime)) + log.Infof("job end, time: %v", time.Since(startTime)) }() db2 := t.db @@ -67,17 +67,17 @@ func (j *CheckShardStore) checkOne(t *TickTock, space *clitypes.UserSpaceDetail) if !ok { return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) } - agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*space.MasterHub, *addr)) + agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(space.MasterHub, addr)) defer agtCli.Release() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) defer cancel() - checkResp, err := agtCli.CheckCache(ctx, &hubrpc.CheckCache{ + checkResp, cerr := agtCli.CheckCache(ctx, &hubrpc.CheckCache{ UserSpace: *space, }) - if err != nil { - return fmt.Errorf("request to check cache: %w", err) + if cerr != nil { + return fmt.Errorf("request to check cache: %w", cerr.ToError()) } realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true }) diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index ac35fc2..0934fe5 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -27,9 +27,9 @@ func (j *ShardStoreGC) Name() string { func (j *ShardStoreGC) Execute(t *TickTock) { log := logger.WithType[ShardStoreGC]("Event") startTime := time.Now() - log.Debugf("job start") + log.Infof("job start") defer func() { - log.Debugf("job end, time: %v", time.Since(startTime)) + log.Infof("job end, time: %v", time.Since(startTime)) }() spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) @@ -91,7 +91,7 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { if !ok { return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) } - agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*space.MasterHub, *addr)) + agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(space.MasterHub, addr)) defer agtCli.Release() // 向代理发送垃圾回收请求 diff --git a/client/internal/ticktock/update_package_access_stat_amount.go b/client/internal/ticktock/update_package_access_stat_amount.go index 1fd6fea..01f86e8 100644 --- a/client/internal/ticktock/update_package_access_stat_amount.go +++ b/client/internal/ticktock/update_package_access_stat_amount.go @@ -17,9 +17,9 @@ func (j *UpdatePackageAccessStatAmount) Name() string { func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) { log := logger.WithType[UpdatePackageAccessStatAmount]("TickTock") startTime := time.Now() - log.Debugf("job start") + log.Infof("job start") defer func() { - log.Debugf("job end, time: %v", time.Since(startTime)) + log.Infof("job end, time: %v", time.Since(startTime)) }() err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index e7b6dc3..78e2e78 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -102,7 +102,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st delPkg() return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace) } - srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *addr)) + srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(srcSpace.MasterHub, addr)) defer srcHubCli.Release() listAllResp, cerr := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{ diff --git a/common/assets/confs/hub.config.json b/common/assets/confs/hub.config.json index 2c40e0d..a15c7ff 100644 --- a/common/assets/confs/hub.config.json +++ b/common/assets/confs/hub.config.json @@ -27,7 +27,7 @@ "retryInterval": 5000 } }, - "connectivity": { - "testInterval": 300 + "tickTock": { + "testHubConnectivitiesInterval": "5m" } } \ No newline at end of file diff --git a/common/globals/utils.go b/common/globals/utils.go index 1d9061b..81a8418 100644 --- a/common/globals/utils.go +++ b/common/globals/utils.go @@ -3,7 +3,7 @@ package stgglb import cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" // 根据当前节点与目标地址的距离关系,选择合适的地址 -func SelectGRPCAddress(hub cortypes.Hub, addr cortypes.GRPCAddressInfo) (string, int) { +func SelectGRPCAddress(hub *cortypes.Hub, addr *cortypes.GRPCAddressInfo) (string, int) { if Local != nil && Local.LocationID == hub.LocationID { return addr.LocalIP, addr.LocalGRPCPort } diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index a6f2c97..1370b97 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -24,7 +24,7 @@ type HubWorker struct { } func (w *HubWorker) NewClient() (exec.WorkerClient, error) { - cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(w.Hub, w.Address)) + cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address)) return &HubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil } diff --git a/common/pkgs/ioswitchlrc/hub_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go index 63a7c98..febfeab 100644 --- a/common/pkgs/ioswitchlrc/hub_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -20,7 +20,7 @@ type HubWorker struct { } func (w *HubWorker) NewClient() (exec.WorkerClient, error) { - cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(w.Hub, w.Address)) + cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address)) return &HubWorkerClient{cli: cli}, nil } diff --git a/common/pkgs/rpc/coordinator/coordinator.pb.go b/common/pkgs/rpc/coordinator/coordinator.pb.go index 710bfd8..4767db5 100644 --- a/common/pkgs/rpc/coordinator/coordinator.pb.go +++ b/common/pkgs/rpc/coordinator/coordinator.pb.go @@ -27,7 +27,7 @@ var file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = []byte{ 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xc9, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, + 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xff, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, @@ -36,15 +36,18 @@ var file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = []byte{ 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x69, 0x65, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, - 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x44, 0x65, 0x74, 0x61, - 0x69, 0x6c, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, - 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, - 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, - 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6f, 0x72, 0x72, - 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, + 0x0a, 0x15, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, + 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, + 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, + 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x70, + 0x63, 0x3b, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_pkgs_rpc_coordinator_coordinator_proto_goTypes = []any{ @@ -55,13 +58,15 @@ var file_pkgs_rpc_coordinator_coordinator_proto_depIdxs = []int32{ 0, // 0: corrpc.Coordinator.GetHubConfig:input_type -> rpc.Request 0, // 1: corrpc.Coordinator.GetHubs:input_type -> rpc.Request 0, // 2: corrpc.Coordinator.GetHubConnectivities:input_type -> rpc.Request - 0, // 3: corrpc.Coordinator.GetStorageDetails:input_type -> rpc.Request - 1, // 4: corrpc.Coordinator.GetHubConfig:output_type -> rpc.Response - 1, // 5: corrpc.Coordinator.GetHubs:output_type -> rpc.Response - 1, // 6: corrpc.Coordinator.GetHubConnectivities:output_type -> rpc.Response - 1, // 7: corrpc.Coordinator.GetStorageDetails:output_type -> rpc.Response - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 0, // 3: corrpc.Coordinator.ReportHubConnectivity:input_type -> rpc.Request + 0, // 4: corrpc.Coordinator.GetStorageDetails:input_type -> rpc.Request + 1, // 5: corrpc.Coordinator.GetHubConfig:output_type -> rpc.Response + 1, // 6: corrpc.Coordinator.GetHubs:output_type -> rpc.Response + 1, // 7: corrpc.Coordinator.GetHubConnectivities:output_type -> rpc.Response + 1, // 8: corrpc.Coordinator.ReportHubConnectivity:output_type -> rpc.Response + 1, // 9: corrpc.Coordinator.GetStorageDetails:output_type -> rpc.Response + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/common/pkgs/rpc/coordinator/coordinator.proto b/common/pkgs/rpc/coordinator/coordinator.proto index 1a3d06f..2910f7e 100644 --- a/common/pkgs/rpc/coordinator/coordinator.proto +++ b/common/pkgs/rpc/coordinator/coordinator.proto @@ -11,6 +11,7 @@ service Coordinator { rpc GetHubConfig(rpc.Request) returns(rpc.Response); rpc GetHubs(rpc.Request) returns(rpc.Response); rpc GetHubConnectivities(rpc.Request) returns(rpc.Response); + rpc ReportHubConnectivity(rpc.Request) returns(rpc.Response); rpc GetStorageDetails(rpc.Request) returns(rpc.Response); } \ No newline at end of file diff --git a/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go b/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go index 14044cf..e89ef19 100644 --- a/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go +++ b/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go @@ -20,10 +20,11 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Coordinator_GetHubConfig_FullMethodName = "/corrpc.Coordinator/GetHubConfig" - Coordinator_GetHubs_FullMethodName = "/corrpc.Coordinator/GetHubs" - Coordinator_GetHubConnectivities_FullMethodName = "/corrpc.Coordinator/GetHubConnectivities" - Coordinator_GetStorageDetails_FullMethodName = "/corrpc.Coordinator/GetStorageDetails" + Coordinator_GetHubConfig_FullMethodName = "/corrpc.Coordinator/GetHubConfig" + Coordinator_GetHubs_FullMethodName = "/corrpc.Coordinator/GetHubs" + Coordinator_GetHubConnectivities_FullMethodName = "/corrpc.Coordinator/GetHubConnectivities" + Coordinator_ReportHubConnectivity_FullMethodName = "/corrpc.Coordinator/ReportHubConnectivity" + Coordinator_GetStorageDetails_FullMethodName = "/corrpc.Coordinator/GetStorageDetails" ) // CoordinatorClient is the client API for Coordinator service. @@ -33,6 +34,7 @@ type CoordinatorClient interface { GetHubConfig(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetHubs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetHubConnectivities(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + ReportHubConnectivity(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) } @@ -71,6 +73,15 @@ func (c *coordinatorClient) GetHubConnectivities(ctx context.Context, in *rpc.Re return out, nil } +func (c *coordinatorClient) ReportHubConnectivity(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Coordinator_ReportHubConnectivity_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *coordinatorClient) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { out := new(rpc.Response) err := c.cc.Invoke(ctx, Coordinator_GetStorageDetails_FullMethodName, in, out, opts...) @@ -87,6 +98,7 @@ type CoordinatorServer interface { GetHubConfig(context.Context, *rpc.Request) (*rpc.Response, error) GetHubs(context.Context, *rpc.Request) (*rpc.Response, error) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) + ReportHubConnectivity(context.Context, *rpc.Request) (*rpc.Response, error) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) mustEmbedUnimplementedCoordinatorServer() } @@ -104,6 +116,9 @@ func (UnimplementedCoordinatorServer) GetHubs(context.Context, *rpc.Request) (*r func (UnimplementedCoordinatorServer) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetHubConnectivities not implemented") } +func (UnimplementedCoordinatorServer) ReportHubConnectivity(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReportHubConnectivity not implemented") +} func (UnimplementedCoordinatorServer) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetStorageDetails not implemented") } @@ -174,6 +189,24 @@ func _Coordinator_GetHubConnectivities_Handler(srv interface{}, ctx context.Cont return interceptor(ctx, in, info, handler) } +func _Coordinator_ReportHubConnectivity_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServer).ReportHubConnectivity(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Coordinator_ReportHubConnectivity_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServer).ReportHubConnectivity(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + func _Coordinator_GetStorageDetails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(rpc.Request) if err := dec(in); err != nil { @@ -211,6 +244,10 @@ var Coordinator_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetHubConnectivities", Handler: _Coordinator_GetHubConnectivities_Handler, }, + { + MethodName: "ReportHubConnectivity", + Handler: _Coordinator_ReportHubConnectivity_Handler, + }, { MethodName: "GetStorageDetails", Handler: _Coordinator_GetStorageDetails_Handler, diff --git a/common/pkgs/rpc/coordinator/hub.go b/common/pkgs/rpc/coordinator/hub.go index 0254bcf..9b26bca 100644 --- a/common/pkgs/rpc/coordinator/hub.go +++ b/common/pkgs/rpc/coordinator/hub.go @@ -13,6 +13,8 @@ type HubService interface { GetHubs(ctx context.Context, msg *GetHubs) (*GetHubsResp, *rpc.CodeError) GetHubConnectivities(ctx context.Context, msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *rpc.CodeError) + + ReportHubConnectivity(ctx context.Context, msg *ReportHubConnectivity) (*ReportHubConnectivityResp, *rpc.CodeError) } type GetHubConfig struct { @@ -98,3 +100,17 @@ func (c *Client) GetHubConnectivities(ctx context.Context, msg *GetHubConnectivi func (s *Server) GetHubConnectivities(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { return rpc.UnaryServer(s.svrImpl.GetHubConnectivities, ctx, req) } + +// 上报节点连通性信息 +type ReportHubConnectivity struct { + Connecttivities []cortypes.HubConnectivity +} +type ReportHubConnectivityResp struct { +} + +func (c *Client) ReportHubConnectivity(ctx context.Context, msg *ReportHubConnectivity) (*ReportHubConnectivityResp, *rpc.CodeError) { + return rpc.UnaryClient[*ReportHubConnectivityResp](c.cli.ReportHubConnectivity, ctx, msg) +} +func (s *Server) ReportHubConnectivity(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.ReportHubConnectivity, ctx, req) +} diff --git a/coordinator/internal/rpc/hub.go b/coordinator/internal/rpc/hub.go index 8bbec3e..813d089 100644 --- a/coordinator/internal/rpc/hub.go +++ b/coordinator/internal/rpc/hub.go @@ -72,3 +72,13 @@ func (svc *Service) GetHubConnectivities(ctx context.Context, msg *corrpc.GetHub return corrpc.RespGetHubConnectivities(cons), nil } + +func (svc *Service) ReportHubConnectivity(ctx context.Context, msg *corrpc.ReportHubConnectivity) (*corrpc.ReportHubConnectivityResp, *rpc.CodeError) { + err := svc.db.HubConnectivity().BatchUpdateOrCreate(svc.db.DefCtx(), msg.Connecttivities) + if err != nil { + logger.Warnf("batch update or create hub connectivities: %v", err) + return nil, rpc.Failed(errorcode.OperationFailed, "batch update or create hub connectivities: %v", err) + } + + return &corrpc.ReportHubConnectivityResp{}, nil +} diff --git a/coordinator/internal/ticktock/check_hub_state.go b/coordinator/internal/ticktock/check_hub_state.go index b2ad2db..dea4f79 100644 --- a/coordinator/internal/ticktock/check_hub_state.go +++ b/coordinator/internal/ticktock/check_hub_state.go @@ -22,10 +22,10 @@ func (j *CheckHubState) Name() string { func (j *CheckHubState) Execute(t *TickTock) { log := logger.WithType[CheckHubState]("TickTock") - log.Debugf("job start") + log.Infof("job start") startTime := time.Now() defer func() { - log.Debugf("job end, time: %v", time.Since(startTime)) + log.Infof("job end, time: %v", time.Since(startTime)) }() hubs, err := t.db.Hub().GetAllHubs(t.db.DefCtx()) @@ -50,7 +50,7 @@ func (j *CheckHubState) checkOne(t *TickTock, hub cortypes.Hub) error { return fmt.Errorf("hub has no grpc address") } - agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(hub, *addr)) + agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&hub, addr)) defer agtCli.Release() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) diff --git a/go.mod b/go.mod index 0f9fbb9..e78cd4d 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.10.0 gitlink.org.cn/cloudream/common v0.0.0 + golang.org/x/net v0.35.0 golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/term v0.31.0 @@ -71,7 +72,6 @@ require ( golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.37.0 // indirect golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect - golang.org/x/net v0.35.0 // indirect golang.org/x/text v0.24.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index 9382f54..764761a 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -6,7 +6,6 @@ import ( "os" "time" - "github.com/go-co-op/gocron/v2" "github.com/spf13/cobra" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" @@ -21,6 +20,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/config" + "gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" ) @@ -74,48 +74,6 @@ func serve(configPath string, httpAddr string) { } go serveHTTP(httpSvr) - // 启动网络连通性检测,并就地检测一次 - // conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { - // log := logger.WithField("Connectivity", "") - - // coorCli, err := stgglb.CoordinatorMQPool.Acquire() - // if err != nil { - // log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) - // return - // } - // defer stgglb.CoordinatorMQPool.Release(coorCli) - - // cons := collector.GetAll() - // hubCons := make([]cortypes.HubConnectivity, 0, len(cons)) - // for _, con := range cons { - // var delay *float32 - // if con.Latency != nil { - // v := float32(con.Latency.Microseconds()) / 1000 - // delay = &v - // } - - // hubCons = append(hubCons, cortypes.HubConnectivity{ - // FromHubID: *stgglb.Local.HubID, - // ToHubID: con.ToHubID, - // Latency: delay, - // TestTime: con.TestTime, - // }) - // } - - // _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons)) - // if err != nil { - // log.Warnf("update hub connectivities: %v", err) - // } - // }) - // conCol.CollectInPlace() - - // 初始化元数据缓存服务 - // metacacheHost := metacache.NewHost() - // go metacacheHost.Serve() - // stgMeta := metacacheHost.AddStorageMeta() - // hubMeta := metacacheHost.AddHubMeta() - // conMeta := metacacheHost.AddConnectivity() - // 启动访问统计服务 // acStat := accessstat.NewAccessStat(accessstat.Config{ // // TODO 考虑放到配置里 @@ -135,9 +93,9 @@ func serve(configPath string, httpAddr string) { go servePublisher(evtPub) // 初始化定时任务执行器 - sch := setupTickTask(stgPool, evtPub) - sch.Start() - defer sch.Shutdown() + tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool) + tktk.Start() + defer tktk.Stop() // RPC服务 rpcSvr := hubrpc.NewServer(config.Cfg().RPC, myrpc.NewService(&worker, stgPool)) @@ -222,28 +180,6 @@ loop: os.Exit(1) } -func setupTickTask(hubPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler { - sch, err := gocron.NewScheduler() - if err != nil { - logger.Errorf("new cron scheduler: %s", err.Error()) - os.Exit(1) - } - - // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - // gocron.NewAtTime(0, 0, 0), - // )), gocron.NewTask(tickevent.ReportStorageStats, hubPool, evtPub)) - - // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - // gocron.NewAtTime(0, 0, 1), - // )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) - - // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - // gocron.NewAtTime(0, 0, 2), - // )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) - - return sch -} - func serveHTTP(server *http.Server) { logger.Info("start serving http") diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index e72a354..6ec80ae 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -5,10 +5,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" + "gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock" ) type Config struct { @@ -18,7 +18,7 @@ type Config struct { CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"` Logger log.Config `json:"logger"` RabbitMQ mq.Config `json:"rabbitMQ"` - Connectivity connectivity.Config `json:"connectivity"` + TickTock ticktock.Config `json:"tickTock"` } var cfg Config diff --git a/hub/internal/ticktock/config.go b/hub/internal/ticktock/config.go new file mode 100644 index 0000000..e25f710 --- /dev/null +++ b/hub/internal/ticktock/config.go @@ -0,0 +1,7 @@ +package ticktock + +import "time" + +type Config struct { + TestHubConnectivitiesInterval time.Duration `json:"testHubConnectivitiesInterval"` +} diff --git a/hub/internal/ticktock/test_hub_connectivities.go b/hub/internal/ticktock/test_hub_connectivities.go new file mode 100644 index 0000000..3ed1baf --- /dev/null +++ b/hub/internal/ticktock/test_hub_connectivities.go @@ -0,0 +1,107 @@ +package ticktock + +import ( + "context" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/reflect2" + stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" + hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +type TestHubConnectivities struct { + myHubID cortypes.HubID +} + +func (t *TestHubConnectivities) Name() string { + return reflect2.TypeNameOf[TestHubConnectivities]() +} +func (j *TestHubConnectivities) Execute(t *TickTock) { + log := logger.WithType[TestHubConnectivities]("TickTock") + startTime := time.Now() + log.Infof("job start") + defer func() { + log.Infof("job end, time: %v", time.Since(startTime)) + }() + + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() + + getHubs, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil)) + if cerr != nil { + log.Warnf("get all hubs: %v", cerr) + return + } + + tests := make([]cortypes.HubConnectivity, len(getHubs.Hubs)) + + wg := sync.WaitGroup{} + for i, hub := range getHubs.Hubs { + wg.Add(1) + + go func(hub *cortypes.Hub, i int) { + defer wg.Done() + + tests[i] = j.testOne(hub) + }(hub, i) + } + wg.Wait() + + _, cerr = coorCli.ReportHubConnectivity(context.Background(), &corrpc.ReportHubConnectivity{ + Connecttivities: tests, + }) + if cerr != nil { + log.Warnf("report hub connectivities: %v", cerr) + } +} + +func (j *TestHubConnectivities) testOne(hub *cortypes.Hub) cortypes.HubConnectivity { + log := logger.WithType[TestHubConnectivities]("TickTock") + + rpcAddr, ok := hub.Address.(*cortypes.GRPCAddressInfo) + if !ok { + return cortypes.HubConnectivity{ + FromHubID: j.myHubID, + ToHubID: hub.HubID, + Latency: nil, + TestTime: time.Now(), + } + } + hubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(hub, rpcAddr)) + defer hubCli.Release() + + // 后几次ping计算延迟 + var avgLatency time.Duration + for i := 0; i < 3; i++ { + start := time.Now() + _, cerr := hubCli.Ping(context.Background(), &hubrpc.Ping{}) + if cerr != nil { + log.Warnf("ping %v: %v", hub.String(), cerr) + return cortypes.HubConnectivity{ + FromHubID: j.myHubID, + ToHubID: hub.HubID, + Latency: nil, + TestTime: time.Now(), + } + } + + latency := time.Since(start) + avgLatency += latency + + // 每次ping之间间隔1秒 + <-time.After(time.Second) + } + latency := avgLatency / 3 + latencyMs := float32(latency.Microseconds()) / 1000 + + return cortypes.HubConnectivity{ + FromHubID: j.myHubID, + ToHubID: hub.HubID, + Latency: &latencyMs, + TestTime: time.Now(), + } +} diff --git a/hub/internal/ticktock/ticktock.go b/hub/internal/ticktock/ticktock.go new file mode 100644 index 0000000..8fdb39a --- /dev/null +++ b/hub/internal/ticktock/ticktock.go @@ -0,0 +1,77 @@ +package ticktock + +import ( + "fmt" + + "github.com/go-co-op/gocron/v2" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +type Job interface { + Name() string + Execute(t *TickTock) +} + +type cronJob struct { + cronJob gocron.Job + job Job +} + +type TickTock struct { + cfg Config + sch gocron.Scheduler + jobs map[string]cronJob + + myHubID cortypes.HubID + stgPool *pool.Pool +} + +func New(cfg Config, myHubID cortypes.HubID, stgPool *pool.Pool) *TickTock { + sch, _ := gocron.NewScheduler() + t := &TickTock{ + cfg: cfg, + sch: sch, + jobs: map[string]cronJob{}, + + myHubID: myHubID, + stgPool: stgPool, + } + t.initJobs() + return t +} + +func (t *TickTock) Start() { + t.sch.Start() +} + +func (t *TickTock) Stop() { + t.sch.Shutdown() +} + +func (t *TickTock) RunNow(jobName string) { + j, ok := t.jobs[jobName] + if !ok { + logger.Warnf("job %s not found", jobName) + return + } + + j.cronJob.RunNow() +} + +func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) { + j, err := t.sch.NewJob(duration, gocron.NewTask(job.Execute, t)) + if err != nil { + panic(fmt.Errorf("add job %s: %w", job.Name(), err)) + } + + t.jobs[job.Name()] = cronJob{ + cronJob: j, + job: job, + } +} + +func (t *TickTock) initJobs() { + t.addJob(&TestHubConnectivities{myHubID: t.myHubID}, gocron.DurationJob(t.cfg.TestHubConnectivitiesInterval)) +}