Browse Source

Hub增加定时任务模块

feature_gxh
Sydonian 6 months ago
parent
commit
e322f9c2c6
22 changed files with 312 additions and 116 deletions
  1. +2
    -2
      client/internal/services/user_space.go
  2. +2
    -2
      client/internal/ticktock/change_redundancy.go
  3. +6
    -6
      client/internal/ticktock/check_shardstore.go
  4. +3
    -3
      client/internal/ticktock/shardstore_gc.go
  5. +2
    -2
      client/internal/ticktock/update_package_access_stat_amount.go
  6. +1
    -1
      client/internal/uploader/user_space_upload.go
  7. +2
    -2
      common/assets/confs/hub.config.json
  8. +1
    -1
      common/globals/utils.go
  9. +1
    -1
      common/pkgs/ioswitch2/hub_worker.go
  10. +1
    -1
      common/pkgs/ioswitchlrc/hub_worker.go
  11. +22
    -17
      common/pkgs/rpc/coordinator/coordinator.pb.go
  12. +1
    -0
      common/pkgs/rpc/coordinator/coordinator.proto
  13. +41
    -4
      common/pkgs/rpc/coordinator/coordinator_grpc.pb.go
  14. +16
    -0
      common/pkgs/rpc/coordinator/hub.go
  15. +10
    -0
      coordinator/internal/rpc/hub.go
  16. +3
    -3
      coordinator/internal/ticktock/check_hub_state.go
  17. +1
    -1
      go.mod
  18. +4
    -68
      hub/internal/cmd/serve.go
  19. +2
    -2
      hub/internal/config/config.go
  20. +7
    -0
      hub/internal/ticktock/config.go
  21. +107
    -0
      hub/internal/ticktock/test_hub_connectivities.go
  22. +77
    -0
      hub/internal/ticktock/ticktock.go

+ 2
- 2
client/internal/services/user_space.go View File

@@ -129,7 +129,7 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa
if !ok { if !ok {
return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID) return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID)
} }
srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *srcAddr))
srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(srcSpace.MasterHub, srcAddr))
defer srcSpaceCli.Release() defer srcSpaceCli.Release()


dstSpace := svc.UserSpaceMeta.Get(dstSpaceID) dstSpace := svc.UserSpaceMeta.Get(dstSpaceID)
@@ -143,7 +143,7 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa
if !ok { if !ok {
return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID) return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID)
} }
dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*dstSpace.MasterHub, *dstAddr))
dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(dstSpace.MasterHub, dstAddr))
defer dstSpaceCli.Release() defer dstSpaceCli.Release()


srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator) srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator)


+ 2
- 2
client/internal/ticktock/change_redundancy.go View File

@@ -27,9 +27,9 @@ func (j *ChangeRedundancy) Name() string {
func (j *ChangeRedundancy) Execute(t *TickTock) { func (j *ChangeRedundancy) Execute(t *TickTock) {
log := logger.WithType[ChangeRedundancy]("TickTock") log := logger.WithType[ChangeRedundancy]("TickTock")
startTime := time.Now() startTime := time.Now()
log.Debugf("job start")
log.Infof("job start")
defer func() { defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
log.Infof("job end, time: %v", time.Since(startTime))
}() }()


ctx := &changeRedundancyContext{ ctx := &changeRedundancyContext{


+ 6
- 6
client/internal/ticktock/check_shardstore.go View File

@@ -28,9 +28,9 @@ func (j *CheckShardStore) Name() string {
func (j *CheckShardStore) Execute(t *TickTock) { func (j *CheckShardStore) Execute(t *TickTock) {
log := logger.WithType[CheckShardStore]("TickTock") log := logger.WithType[CheckShardStore]("TickTock")
startTime := time.Now() startTime := time.Now()
log.Debugf("job start")
log.Infof("job start")
defer func() { defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
log.Infof("job end, time: %v", time.Since(startTime))
}() }()


db2 := t.db db2 := t.db
@@ -67,17 +67,17 @@ func (j *CheckShardStore) checkOne(t *TickTock, space *clitypes.UserSpaceDetail)
if !ok { if !ok {
return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace)
} }
agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*space.MasterHub, *addr))
agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(space.MasterHub, addr))
defer agtCli.Release() defer agtCli.Release()


ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
defer cancel() defer cancel()


checkResp, err := agtCli.CheckCache(ctx, &hubrpc.CheckCache{
checkResp, cerr := agtCli.CheckCache(ctx, &hubrpc.CheckCache{
UserSpace: *space, UserSpace: *space,
}) })
if err != nil {
return fmt.Errorf("request to check cache: %w", err)
if cerr != nil {
return fmt.Errorf("request to check cache: %w", cerr.ToError())
} }


realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true }) realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true })


+ 3
- 3
client/internal/ticktock/shardstore_gc.go View File

@@ -27,9 +27,9 @@ func (j *ShardStoreGC) Name() string {
func (j *ShardStoreGC) Execute(t *TickTock) { func (j *ShardStoreGC) Execute(t *TickTock) {
log := logger.WithType[ShardStoreGC]("Event") log := logger.WithType[ShardStoreGC]("Event")
startTime := time.Now() startTime := time.Now()
log.Debugf("job start")
log.Infof("job start")
defer func() { defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
log.Infof("job end, time: %v", time.Since(startTime))
}() }()


spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx())
@@ -91,7 +91,7 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error {
if !ok { if !ok {
return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace)
} }
agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*space.MasterHub, *addr))
agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(space.MasterHub, addr))
defer agtCli.Release() defer agtCli.Release()


// 向代理发送垃圾回收请求 // 向代理发送垃圾回收请求


+ 2
- 2
client/internal/ticktock/update_package_access_stat_amount.go View File

@@ -17,9 +17,9 @@ func (j *UpdatePackageAccessStatAmount) Name() string {
func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) { func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) {
log := logger.WithType[UpdatePackageAccessStatAmount]("TickTock") log := logger.WithType[UpdatePackageAccessStatAmount]("TickTock")
startTime := time.Now() startTime := time.Now()
log.Debugf("job start")
log.Infof("job start")
defer func() { defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
log.Infof("job end, time: %v", time.Since(startTime))
}() }()


err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight)


+ 1
- 1
client/internal/uploader/user_space_upload.go View File

@@ -102,7 +102,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st
delPkg() delPkg()
return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace) return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace)
} }
srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *addr))
srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(srcSpace.MasterHub, addr))
defer srcHubCli.Release() defer srcHubCli.Release()


listAllResp, cerr := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{ listAllResp, cerr := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{


+ 2
- 2
common/assets/confs/hub.config.json View File

@@ -27,7 +27,7 @@
"retryInterval": 5000 "retryInterval": 5000
} }
}, },
"connectivity": {
"testInterval": 300
"tickTock": {
"testHubConnectivitiesInterval": "5m"
} }
} }

+ 1
- 1
common/globals/utils.go View File

@@ -3,7 +3,7 @@ package stgglb
import cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" import cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"


// 根据当前节点与目标地址的距离关系,选择合适的地址 // 根据当前节点与目标地址的距离关系,选择合适的地址
func SelectGRPCAddress(hub cortypes.Hub, addr cortypes.GRPCAddressInfo) (string, int) {
func SelectGRPCAddress(hub *cortypes.Hub, addr *cortypes.GRPCAddressInfo) (string, int) {
if Local != nil && Local.LocationID == hub.LocationID { if Local != nil && Local.LocationID == hub.LocationID {
return addr.LocalIP, addr.LocalGRPCPort return addr.LocalIP, addr.LocalGRPCPort
} }


+ 1
- 1
common/pkgs/ioswitch2/hub_worker.go View File

@@ -24,7 +24,7 @@ type HubWorker struct {
} }


func (w *HubWorker) NewClient() (exec.WorkerClient, error) { func (w *HubWorker) NewClient() (exec.WorkerClient, error) {
cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(w.Hub, w.Address))
cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address))
return &HubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil return &HubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil
} }




+ 1
- 1
common/pkgs/ioswitchlrc/hub_worker.go View File

@@ -20,7 +20,7 @@ type HubWorker struct {
} }


func (w *HubWorker) NewClient() (exec.WorkerClient, error) { func (w *HubWorker) NewClient() (exec.WorkerClient, error) {
cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(w.Hub, w.Address))
cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address))
return &HubWorkerClient{cli: cli}, nil return &HubWorkerClient{cli: cli}, nil
} }




+ 22
- 17
common/pkgs/rpc/coordinator/coordinator.pb.go View File

@@ -27,7 +27,7 @@ var file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = []byte{
0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74,
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63,
0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x32, 0xc9, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e,
0x72, 0x6f, 0x74, 0x6f, 0x32, 0xff, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e,
0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f,
0x6e, 0x66, 0x69, 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
@@ -36,15 +36,18 @@ var file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = []byte{
0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74,
0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x69, 0x65, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x69, 0x65,
0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30,
0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x44, 0x65, 0x74, 0x61,
0x69, 0x6c, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e,
0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73,
0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73,
0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6f, 0x72, 0x72,
0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34,
0x0a, 0x15, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61,
0x67, 0x65, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e,
0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65,
0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x70,
0x63, 0x3b, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }


var file_pkgs_rpc_coordinator_coordinator_proto_goTypes = []any{ var file_pkgs_rpc_coordinator_coordinator_proto_goTypes = []any{
@@ -55,13 +58,15 @@ var file_pkgs_rpc_coordinator_coordinator_proto_depIdxs = []int32{
0, // 0: corrpc.Coordinator.GetHubConfig:input_type -> rpc.Request 0, // 0: corrpc.Coordinator.GetHubConfig:input_type -> rpc.Request
0, // 1: corrpc.Coordinator.GetHubs:input_type -> rpc.Request 0, // 1: corrpc.Coordinator.GetHubs:input_type -> rpc.Request
0, // 2: corrpc.Coordinator.GetHubConnectivities:input_type -> rpc.Request 0, // 2: corrpc.Coordinator.GetHubConnectivities:input_type -> rpc.Request
0, // 3: corrpc.Coordinator.GetStorageDetails:input_type -> rpc.Request
1, // 4: corrpc.Coordinator.GetHubConfig:output_type -> rpc.Response
1, // 5: corrpc.Coordinator.GetHubs:output_type -> rpc.Response
1, // 6: corrpc.Coordinator.GetHubConnectivities:output_type -> rpc.Response
1, // 7: corrpc.Coordinator.GetStorageDetails:output_type -> rpc.Response
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
0, // 3: corrpc.Coordinator.ReportHubConnectivity:input_type -> rpc.Request
0, // 4: corrpc.Coordinator.GetStorageDetails:input_type -> rpc.Request
1, // 5: corrpc.Coordinator.GetHubConfig:output_type -> rpc.Response
1, // 6: corrpc.Coordinator.GetHubs:output_type -> rpc.Response
1, // 7: corrpc.Coordinator.GetHubConnectivities:output_type -> rpc.Response
1, // 8: corrpc.Coordinator.ReportHubConnectivity:output_type -> rpc.Response
1, // 9: corrpc.Coordinator.GetStorageDetails:output_type -> rpc.Response
5, // [5:10] is the sub-list for method output_type
0, // [0:5] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name 0, // [0:0] is the sub-list for field type_name


+ 1
- 0
common/pkgs/rpc/coordinator/coordinator.proto View File

@@ -11,6 +11,7 @@ service Coordinator {
rpc GetHubConfig(rpc.Request) returns(rpc.Response); rpc GetHubConfig(rpc.Request) returns(rpc.Response);
rpc GetHubs(rpc.Request) returns(rpc.Response); rpc GetHubs(rpc.Request) returns(rpc.Response);
rpc GetHubConnectivities(rpc.Request) returns(rpc.Response); rpc GetHubConnectivities(rpc.Request) returns(rpc.Response);
rpc ReportHubConnectivity(rpc.Request) returns(rpc.Response);


rpc GetStorageDetails(rpc.Request) returns(rpc.Response); rpc GetStorageDetails(rpc.Request) returns(rpc.Response);
} }

+ 41
- 4
common/pkgs/rpc/coordinator/coordinator_grpc.pb.go View File

@@ -20,10 +20,11 @@ import (
const _ = grpc.SupportPackageIsVersion7 const _ = grpc.SupportPackageIsVersion7


const ( const (
Coordinator_GetHubConfig_FullMethodName = "/corrpc.Coordinator/GetHubConfig"
Coordinator_GetHubs_FullMethodName = "/corrpc.Coordinator/GetHubs"
Coordinator_GetHubConnectivities_FullMethodName = "/corrpc.Coordinator/GetHubConnectivities"
Coordinator_GetStorageDetails_FullMethodName = "/corrpc.Coordinator/GetStorageDetails"
Coordinator_GetHubConfig_FullMethodName = "/corrpc.Coordinator/GetHubConfig"
Coordinator_GetHubs_FullMethodName = "/corrpc.Coordinator/GetHubs"
Coordinator_GetHubConnectivities_FullMethodName = "/corrpc.Coordinator/GetHubConnectivities"
Coordinator_ReportHubConnectivity_FullMethodName = "/corrpc.Coordinator/ReportHubConnectivity"
Coordinator_GetStorageDetails_FullMethodName = "/corrpc.Coordinator/GetStorageDetails"
) )


// CoordinatorClient is the client API for Coordinator service. // CoordinatorClient is the client API for Coordinator service.
@@ -33,6 +34,7 @@ type CoordinatorClient interface {
GetHubConfig(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetHubConfig(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
GetHubs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetHubs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
GetHubConnectivities(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetHubConnectivities(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
ReportHubConnectivity(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
} }


@@ -71,6 +73,15 @@ func (c *coordinatorClient) GetHubConnectivities(ctx context.Context, in *rpc.Re
return out, nil return out, nil
} }


func (c *coordinatorClient) ReportHubConnectivity(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) {
out := new(rpc.Response)
err := c.cc.Invoke(ctx, Coordinator_ReportHubConnectivity_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

func (c *coordinatorClient) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { func (c *coordinatorClient) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) {
out := new(rpc.Response) out := new(rpc.Response)
err := c.cc.Invoke(ctx, Coordinator_GetStorageDetails_FullMethodName, in, out, opts...) err := c.cc.Invoke(ctx, Coordinator_GetStorageDetails_FullMethodName, in, out, opts...)
@@ -87,6 +98,7 @@ type CoordinatorServer interface {
GetHubConfig(context.Context, *rpc.Request) (*rpc.Response, error) GetHubConfig(context.Context, *rpc.Request) (*rpc.Response, error)
GetHubs(context.Context, *rpc.Request) (*rpc.Response, error) GetHubs(context.Context, *rpc.Request) (*rpc.Response, error)
GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error)
ReportHubConnectivity(context.Context, *rpc.Request) (*rpc.Response, error)
GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error)
mustEmbedUnimplementedCoordinatorServer() mustEmbedUnimplementedCoordinatorServer()
} }
@@ -104,6 +116,9 @@ func (UnimplementedCoordinatorServer) GetHubs(context.Context, *rpc.Request) (*r
func (UnimplementedCoordinatorServer) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) { func (UnimplementedCoordinatorServer) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetHubConnectivities not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetHubConnectivities not implemented")
} }
func (UnimplementedCoordinatorServer) ReportHubConnectivity(context.Context, *rpc.Request) (*rpc.Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReportHubConnectivity not implemented")
}
func (UnimplementedCoordinatorServer) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) { func (UnimplementedCoordinatorServer) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetStorageDetails not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetStorageDetails not implemented")
} }
@@ -174,6 +189,24 @@ func _Coordinator_GetHubConnectivities_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }


func _Coordinator_ReportHubConnectivity_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(rpc.Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CoordinatorServer).ReportHubConnectivity(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Coordinator_ReportHubConnectivity_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CoordinatorServer).ReportHubConnectivity(ctx, req.(*rpc.Request))
}
return interceptor(ctx, in, info, handler)
}

func _Coordinator_GetStorageDetails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Coordinator_GetStorageDetails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(rpc.Request) in := new(rpc.Request)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@@ -211,6 +244,10 @@ var Coordinator_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetHubConnectivities", MethodName: "GetHubConnectivities",
Handler: _Coordinator_GetHubConnectivities_Handler, Handler: _Coordinator_GetHubConnectivities_Handler,
}, },
{
MethodName: "ReportHubConnectivity",
Handler: _Coordinator_ReportHubConnectivity_Handler,
},
{ {
MethodName: "GetStorageDetails", MethodName: "GetStorageDetails",
Handler: _Coordinator_GetStorageDetails_Handler, Handler: _Coordinator_GetStorageDetails_Handler,


+ 16
- 0
common/pkgs/rpc/coordinator/hub.go View File

@@ -13,6 +13,8 @@ type HubService interface {
GetHubs(ctx context.Context, msg *GetHubs) (*GetHubsResp, *rpc.CodeError) GetHubs(ctx context.Context, msg *GetHubs) (*GetHubsResp, *rpc.CodeError)


GetHubConnectivities(ctx context.Context, msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *rpc.CodeError) GetHubConnectivities(ctx context.Context, msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *rpc.CodeError)

ReportHubConnectivity(ctx context.Context, msg *ReportHubConnectivity) (*ReportHubConnectivityResp, *rpc.CodeError)
} }


type GetHubConfig struct { type GetHubConfig struct {
@@ -98,3 +100,17 @@ func (c *Client) GetHubConnectivities(ctx context.Context, msg *GetHubConnectivi
func (s *Server) GetHubConnectivities(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { func (s *Server) GetHubConnectivities(ctx context.Context, req *rpc.Request) (*rpc.Response, error) {
return rpc.UnaryServer(s.svrImpl.GetHubConnectivities, ctx, req) return rpc.UnaryServer(s.svrImpl.GetHubConnectivities, ctx, req)
} }

// 上报节点连通性信息
type ReportHubConnectivity struct {
Connecttivities []cortypes.HubConnectivity
}
type ReportHubConnectivityResp struct {
}

func (c *Client) ReportHubConnectivity(ctx context.Context, msg *ReportHubConnectivity) (*ReportHubConnectivityResp, *rpc.CodeError) {
return rpc.UnaryClient[*ReportHubConnectivityResp](c.cli.ReportHubConnectivity, ctx, msg)
}
func (s *Server) ReportHubConnectivity(ctx context.Context, req *rpc.Request) (*rpc.Response, error) {
return rpc.UnaryServer(s.svrImpl.ReportHubConnectivity, ctx, req)
}

+ 10
- 0
coordinator/internal/rpc/hub.go View File

@@ -72,3 +72,13 @@ func (svc *Service) GetHubConnectivities(ctx context.Context, msg *corrpc.GetHub


return corrpc.RespGetHubConnectivities(cons), nil return corrpc.RespGetHubConnectivities(cons), nil
} }

func (svc *Service) ReportHubConnectivity(ctx context.Context, msg *corrpc.ReportHubConnectivity) (*corrpc.ReportHubConnectivityResp, *rpc.CodeError) {
err := svc.db.HubConnectivity().BatchUpdateOrCreate(svc.db.DefCtx(), msg.Connecttivities)
if err != nil {
logger.Warnf("batch update or create hub connectivities: %v", err)
return nil, rpc.Failed(errorcode.OperationFailed, "batch update or create hub connectivities: %v", err)
}

return &corrpc.ReportHubConnectivityResp{}, nil
}

+ 3
- 3
coordinator/internal/ticktock/check_hub_state.go View File

@@ -22,10 +22,10 @@ func (j *CheckHubState) Name() string {


func (j *CheckHubState) Execute(t *TickTock) { func (j *CheckHubState) Execute(t *TickTock) {
log := logger.WithType[CheckHubState]("TickTock") log := logger.WithType[CheckHubState]("TickTock")
log.Debugf("job start")
log.Infof("job start")
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
log.Infof("job end, time: %v", time.Since(startTime))
}() }()


hubs, err := t.db.Hub().GetAllHubs(t.db.DefCtx()) hubs, err := t.db.Hub().GetAllHubs(t.db.DefCtx())
@@ -50,7 +50,7 @@ func (j *CheckHubState) checkOne(t *TickTock, hub cortypes.Hub) error {
return fmt.Errorf("hub has no grpc address") return fmt.Errorf("hub has no grpc address")
} }


agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(hub, *addr))
agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&hub, addr))
defer agtCli.Release() defer agtCli.Release()


ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)


+ 1
- 1
go.mod View File

@@ -28,6 +28,7 @@ require (
github.com/spf13/cobra v1.8.1 github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
gitlink.org.cn/cloudream/common v0.0.0 gitlink.org.cn/cloudream/common v0.0.0
golang.org/x/net v0.35.0
golang.org/x/sync v0.13.0 golang.org/x/sync v0.13.0
golang.org/x/sys v0.32.0 golang.org/x/sys v0.32.0
golang.org/x/term v0.31.0 golang.org/x/term v0.31.0
@@ -71,7 +72,6 @@ require (
golang.org/x/arch v0.8.0 // indirect golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.37.0 // indirect golang.org/x/crypto v0.37.0 // indirect
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect


+ 4
- 68
hub/internal/cmd/serve.go View File

@@ -6,7 +6,6 @@ import (
"os" "os"
"time" "time"


"github.com/go-co-op/gocron/v2"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
@@ -21,6 +20,7 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
"gitlink.org.cn/cloudream/jcs-pub/hub/internal/config" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/config"
"gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock"


coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
) )
@@ -74,48 +74,6 @@ func serve(configPath string, httpAddr string) {
} }
go serveHTTP(httpSvr) go serveHTTP(httpSvr)


// 启动网络连通性检测,并就地检测一次
// conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
// log := logger.WithField("Connectivity", "")

// coorCli, err := stgglb.CoordinatorMQPool.Acquire()
// if err != nil {
// log.Warnf("acquire coordinator mq failed, err: %s", err.Error())
// return
// }
// defer stgglb.CoordinatorMQPool.Release(coorCli)

// cons := collector.GetAll()
// hubCons := make([]cortypes.HubConnectivity, 0, len(cons))
// for _, con := range cons {
// var delay *float32
// if con.Latency != nil {
// v := float32(con.Latency.Microseconds()) / 1000
// delay = &v
// }

// hubCons = append(hubCons, cortypes.HubConnectivity{
// FromHubID: *stgglb.Local.HubID,
// ToHubID: con.ToHubID,
// Latency: delay,
// TestTime: con.TestTime,
// })
// }

// _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons))
// if err != nil {
// log.Warnf("update hub connectivities: %v", err)
// }
// })
// conCol.CollectInPlace()

// 初始化元数据缓存服务
// metacacheHost := metacache.NewHost()
// go metacacheHost.Serve()
// stgMeta := metacacheHost.AddStorageMeta()
// hubMeta := metacacheHost.AddHubMeta()
// conMeta := metacacheHost.AddConnectivity()

// 启动访问统计服务 // 启动访问统计服务
// acStat := accessstat.NewAccessStat(accessstat.Config{ // acStat := accessstat.NewAccessStat(accessstat.Config{
// // TODO 考虑放到配置里 // // TODO 考虑放到配置里
@@ -135,9 +93,9 @@ func serve(configPath string, httpAddr string) {
go servePublisher(evtPub) go servePublisher(evtPub)


// 初始化定时任务执行器 // 初始化定时任务执行器
sch := setupTickTask(stgPool, evtPub)
sch.Start()
defer sch.Shutdown()
tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool)
tktk.Start()
defer tktk.Stop()


// RPC服务 // RPC服务
rpcSvr := hubrpc.NewServer(config.Cfg().RPC, myrpc.NewService(&worker, stgPool)) rpcSvr := hubrpc.NewServer(config.Cfg().RPC, myrpc.NewService(&worker, stgPool))
@@ -222,28 +180,6 @@ loop:
os.Exit(1) os.Exit(1)
} }


func setupTickTask(hubPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler {
sch, err := gocron.NewScheduler()
if err != nil {
logger.Errorf("new cron scheduler: %s", err.Error())
os.Exit(1)
}

// sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(
// gocron.NewAtTime(0, 0, 0),
// )), gocron.NewTask(tickevent.ReportStorageStats, hubPool, evtPub))

// sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(
// gocron.NewAtTime(0, 0, 1),
// )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub))

// sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(
// gocron.NewAtTime(0, 0, 2),
// )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub))

return sch
}

func serveHTTP(server *http.Server) { func serveHTTP(server *http.Server) {
logger.Info("start serving http") logger.Info("start serving http")




+ 2
- 2
hub/internal/config/config.go View File

@@ -5,10 +5,10 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
c "gitlink.org.cn/cloudream/common/utils/config" c "gitlink.org.cn/cloudream/common/utils/config"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
"gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock"
) )


type Config struct { type Config struct {
@@ -18,7 +18,7 @@ type Config struct {
CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"` CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"`
Logger log.Config `json:"logger"` Logger log.Config `json:"logger"`
RabbitMQ mq.Config `json:"rabbitMQ"` RabbitMQ mq.Config `json:"rabbitMQ"`
Connectivity connectivity.Config `json:"connectivity"`
TickTock ticktock.Config `json:"tickTock"`
} }


var cfg Config var cfg Config


+ 7
- 0
hub/internal/ticktock/config.go View File

@@ -0,0 +1,7 @@
package ticktock

import "time"

type Config struct {
TestHubConnectivitiesInterval time.Duration `json:"testHubConnectivitiesInterval"`
}

+ 107
- 0
hub/internal/ticktock/test_hub_connectivities.go View File

@@ -0,0 +1,107 @@
package ticktock

import (
"context"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

type TestHubConnectivities struct {
myHubID cortypes.HubID
}

func (t *TestHubConnectivities) Name() string {
return reflect2.TypeNameOf[TestHubConnectivities]()
}
func (j *TestHubConnectivities) Execute(t *TickTock) {
log := logger.WithType[TestHubConnectivities]("TickTock")
startTime := time.Now()
log.Infof("job start")
defer func() {
log.Infof("job end, time: %v", time.Since(startTime))
}()

coorCli := stgglb.CoordinatorRPCPool.Get()
defer coorCli.Release()

getHubs, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil))
if cerr != nil {
log.Warnf("get all hubs: %v", cerr)
return
}

tests := make([]cortypes.HubConnectivity, len(getHubs.Hubs))

wg := sync.WaitGroup{}
for i, hub := range getHubs.Hubs {
wg.Add(1)

go func(hub *cortypes.Hub, i int) {
defer wg.Done()

tests[i] = j.testOne(hub)
}(hub, i)
}
wg.Wait()

_, cerr = coorCli.ReportHubConnectivity(context.Background(), &corrpc.ReportHubConnectivity{
Connecttivities: tests,
})
if cerr != nil {
log.Warnf("report hub connectivities: %v", cerr)
}
}

func (j *TestHubConnectivities) testOne(hub *cortypes.Hub) cortypes.HubConnectivity {
log := logger.WithType[TestHubConnectivities]("TickTock")

rpcAddr, ok := hub.Address.(*cortypes.GRPCAddressInfo)
if !ok {
return cortypes.HubConnectivity{
FromHubID: j.myHubID,
ToHubID: hub.HubID,
Latency: nil,
TestTime: time.Now(),
}
}
hubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(hub, rpcAddr))
defer hubCli.Release()

// 后几次ping计算延迟
var avgLatency time.Duration
for i := 0; i < 3; i++ {
start := time.Now()
_, cerr := hubCli.Ping(context.Background(), &hubrpc.Ping{})
if cerr != nil {
log.Warnf("ping %v: %v", hub.String(), cerr)
return cortypes.HubConnectivity{
FromHubID: j.myHubID,
ToHubID: hub.HubID,
Latency: nil,
TestTime: time.Now(),
}
}

latency := time.Since(start)
avgLatency += latency

// 每次ping之间间隔1秒
<-time.After(time.Second)
}
latency := avgLatency / 3
latencyMs := float32(latency.Microseconds()) / 1000

return cortypes.HubConnectivity{
FromHubID: j.myHubID,
ToHubID: hub.HubID,
Latency: &latencyMs,
TestTime: time.Now(),
}
}

+ 77
- 0
hub/internal/ticktock/ticktock.go View File

@@ -0,0 +1,77 @@
package ticktock

import (
"fmt"

"github.com/go-co-op/gocron/v2"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

type Job interface {
Name() string
Execute(t *TickTock)
}

type cronJob struct {
cronJob gocron.Job
job Job
}

type TickTock struct {
cfg Config
sch gocron.Scheduler
jobs map[string]cronJob

myHubID cortypes.HubID
stgPool *pool.Pool
}

func New(cfg Config, myHubID cortypes.HubID, stgPool *pool.Pool) *TickTock {
sch, _ := gocron.NewScheduler()
t := &TickTock{
cfg: cfg,
sch: sch,
jobs: map[string]cronJob{},

myHubID: myHubID,
stgPool: stgPool,
}
t.initJobs()
return t
}

func (t *TickTock) Start() {
t.sch.Start()
}

func (t *TickTock) Stop() {
t.sch.Shutdown()
}

func (t *TickTock) RunNow(jobName string) {
j, ok := t.jobs[jobName]
if !ok {
logger.Warnf("job %s not found", jobName)
return
}

j.cronJob.RunNow()
}

func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) {
j, err := t.sch.NewJob(duration, gocron.NewTask(job.Execute, t))
if err != nil {
panic(fmt.Errorf("add job %s: %w", job.Name(), err))
}

t.jobs[job.Name()] = cronJob{
cronJob: j,
job: job,
}
}

func (t *TickTock) initJobs() {
t.addJob(&TestHubConnectivities{myHubID: t.myHubID}, gocron.DurationJob(t.cfg.TestHubConnectivitiesInterval))
}

Loading…
Cancel
Save