Browse Source

移植检查Hub状态的定时任务

feature_gxh
Sydonian 7 months ago
parent
commit
04da2d8132
8 changed files with 345 additions and 34 deletions
  1. +3
    -0
      common/assets/confs/coordinator.config.json
  2. +60
    -31
      coordinator/internal/cmd/serve.go
  3. +5
    -3
      coordinator/internal/config/config.go
  4. +91
    -0
      coordinator/internal/repl/repl.go
  5. +25
    -0
      coordinator/internal/repl/ticktock.go
  6. +81
    -0
      coordinator/internal/ticktock/check_hub_state.go
  7. +7
    -0
      coordinator/internal/ticktock/config.go
  8. +73
    -0
      coordinator/internal/ticktock/ticktock.go

+ 3
- 0
common/assets/confs/coordinator.config.json View File

@@ -20,5 +20,8 @@
"retryNum": 5, "retryNum": 5,
"retryInterval": 5000 "retryInterval": 5000
} }
},
"tickTock": {
"hubUnavailableTime": "20s"
} }
} }

+ 60
- 31
coordinator/internal/cmd/serve.go View File

@@ -1,18 +1,19 @@
package cmd package cmd


import ( import (
"context"
"fmt" "fmt"
"os" "os"


"github.com/spf13/cobra" "github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq" mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/repl"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock"
) )


func init() { func init() {
@@ -42,6 +43,8 @@ func serve(configPath string) {
os.Exit(1) os.Exit(1)
} }


stgglb.InitMQPool(config.Cfg().RabbitMQ)

db2, err := db.NewDB(&config.Cfg().DB) db2, err := db.NewDB(&config.Cfg().DB)
if err != nil { if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error()) logger.Fatalf("new db2 failed, err: %s", err.Error())
@@ -67,45 +70,71 @@ func serve(configPath string) {
// 启动服务 // 启动服务
go serveCoorServer(coorSvr, config.Cfg().RabbitMQ) go serveCoorServer(coorSvr, config.Cfg().RabbitMQ)


forever := make(chan bool)
<-forever
}

func servePublisher(evtPub *sysevent.Publisher) {
logger.Info("start serving sysevent publisher")
// 定时任务
tktk := ticktock.New(config.Cfg().TickTock, db2)
tktk.Start()
defer tktk.Stop()


ch := evtPub.Start()
// 交互式命令行
rep := repl.New(db2, tktk)
replCh := rep.Start()


/// 开始监听各个模块的事件
replEvt := replCh.Receive()
loop: loop:
for { for {
val, err := ch.Receive().Wait(context.Background())
if err != nil {
logger.Errorf("sysevent publisher stopped with error: %s", err.Error())
break
}

switch val := val.(type) {
case sysevent.PublishError:
logger.Errorf("publishing event: %v", val)

case sysevent.PublisherExited:
if val.Err != nil {
logger.Errorf("publisher exited with error: %v", val.Err)
} else {
logger.Info("publisher exited")
select {
case e := <-replEvt.Chan():
if e.Err != nil {
logger.Errorf("receive repl event: %v", err)
break loop
} }
break loop


case sysevent.OtherError:
logger.Errorf("sysevent: %v", val)
switch e.Value.(type) {
case repl.ExitEvent:
logger.Info("exit by repl")
break loop
}
replEvt = replCh.Receive()
} }
} }
logger.Info("sysevent publisher stopped")

// TODO 仅简单结束了程序
os.Exit(1)
} }


// func servePublisher(evtPub *sysevent.Publisher) {
// logger.Info("start serving sysevent publisher")

// ch := evtPub.Start()

// loop:
// for {
// val, err := ch.Receive().Wait(context.Background())
// if err != nil {
// logger.Errorf("sysevent publisher stopped with error: %s", err.Error())
// break
// }

// switch val := val.(type) {
// case sysevent.PublishError:
// logger.Errorf("publishing event: %v", val)

// case sysevent.PublisherExited:
// if val.Err != nil {
// logger.Errorf("publisher exited with error: %v", val.Err)
// } else {
// logger.Info("publisher exited")
// }
// break loop

// case sysevent.OtherError:
// logger.Errorf("sysevent: %v", val)
// }
// }
// logger.Info("sysevent publisher stopped")

// // TODO 仅简单结束了程序
// os.Exit(1)
// }

func serveCoorServer(server *coormq.Server, cfg mq.Config) { func serveCoorServer(server *coormq.Server, cfg mq.Config) {
logger.Info("start serving command server") logger.Info("start serving command server")




+ 5
- 3
coordinator/internal/config/config.go View File

@@ -5,12 +5,14 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
c "gitlink.org.cn/cloudream/common/utils/config" c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock"
) )


type Config struct { type Config struct {
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ mq.Config `json:"rabbitMQ"`
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ mq.Config `json:"rabbitMQ"`
TickTock ticktock.Config `json:"tickTock"`
} }


var cfg Config var cfg Config


+ 91
- 0
coordinator/internal/repl/repl.go View File

@@ -0,0 +1,91 @@
package repl

import (
"context"
"strings"

"github.com/c-bata/go-prompt"
"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock"
)

var (
cmdCtxKey = &CommandContext{}
)

type ReplEventChan = async.UnboundChannel[ReplEvent]

type ReplEvent interface {
IsReplEvent() bool
}

type ExitEvent struct {
ReplEvent
}

type Repl struct {
prompt *prompt.Prompt
evtCh *ReplEventChan
db *db.DB
tktk *ticktock.TickTock
}

func New(db *db.DB, ticktock *ticktock.TickTock) *Repl {
r := &Repl{
evtCh: async.NewUnboundChannel[ReplEvent](),
db: db,
tktk: ticktock,
}
r.prompt = prompt.New(
r.executor,
r.completer,
prompt.OptionPrefix(">>> "),
prompt.OptionTitle("JCS Coordinator REPL"),
prompt.OptionSetExitCheckerOnInput(r.exitChecker),
)
return r
}

func (r *Repl) Start() *ReplEventChan {
go func() {
r.prompt.Run()
r.evtCh.Send(ExitEvent{})
}()
return r.evtCh
}

func (r *Repl) completer(d prompt.Document) []prompt.Suggest {
return nil
}

func (r *Repl) executor(input string) {
fields := strings.Fields(input)
if len(fields) == 0 {
return
}

RootCmd.SetArgs(fields)
RootCmd.ExecuteContext(context.WithValue(context.Background(), cmdCtxKey, &CommandContext{
repl: r,
}))
}

func (r *Repl) exitChecker(input string, breakline bool) bool {
if breakline && input == "exit" {
return true
}

return false
}

var RootCmd = &cobra.Command{}

type CommandContext struct {
repl *Repl
}

func GetCmdCtx(cmd *cobra.Command) *CommandContext {
return cmd.Context().Value(cmdCtxKey).(*CommandContext)
}

+ 25
- 0
coordinator/internal/repl/ticktock.go View File

@@ -0,0 +1,25 @@
package repl

import "github.com/spf13/cobra"

func init() {
ttCmd := &cobra.Command{
Use: "ticktock",
Short: "ticktock command",
}
RootCmd.AddCommand(ttCmd)

runCmd := &cobra.Command{
Use: "run [jobName]",
Short: "run job now",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
tickTockRun(GetCmdCtx(cmd), args[0])
},
}
ttCmd.AddCommand(runCmd)
}

func tickTockRun(ctx *CommandContext, jobName string) {
ctx.repl.tktk.RunNow(jobName)
}

+ 81
- 0
coordinator/internal/ticktock/check_hub_state.go View File

@@ -0,0 +1,81 @@
package ticktock

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/jcs-pub/common/consts"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

type CheckHubState struct {
}

func (j *CheckHubState) Name() string {
return reflect2.TypeNameOf[CheckHubState]()
}

func (j *CheckHubState) Execute(t *TickTock) {
log := logger.WithType[CheckHubState]("TickTock")
log.Debugf("job start")
startTime := time.Now()
defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
}()

hubs, err := t.db.Hub().GetAllHubs(t.db.DefCtx())
if err != nil {
log.Warnf("get all hubs: %s", err.Error())
return
}

for _, hub := range hubs {
err := j.checkOne(t, hub)
if err != nil {
log.Warnf("check one hub %v: %s", hub, err.Error())
}
}
}

func (j *CheckHubState) checkOne(t *TickTock, hub cortypes.Hub) error {
log := logger.WithType[CheckHubState]("TickTock")

agtCli, err := stgglb.HubMQPool.Acquire(hub.HubID)
if err != nil {
return fmt.Errorf("new hub mq client: %w", err)
}
defer stgglb.HubMQPool.Release(agtCli)

_, err = agtCli.GetState(hubmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30})
if err != nil {
if hub.LastReportTime != nil {
if time.Since(*hub.LastReportTime) > t.cfg.HubUnavailableTime {
err := t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateUnavailable)
if err != nil {
log.Warnf("set hub %v state: %s", hub, err.Error())
}
}

} else if hub.LastReportTime == nil {
err := t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateUnavailable)
if err != nil {
log.Warnf("set hub %v state: %s", hub, err.Error())
}
}

return fmt.Errorf("getting state: %w", err)
}

// TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal
err = t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateNormal)
if err != nil {
log.Warnf("set hub %v state: %s", hub, err.Error())
}

return nil
}

+ 7
- 0
coordinator/internal/ticktock/config.go View File

@@ -0,0 +1,7 @@
package ticktock

import "time"

type Config struct {
HubUnavailableTime time.Duration `json:"hubUnavailableTime"`
}

+ 73
- 0
coordinator/internal/ticktock/ticktock.go View File

@@ -0,0 +1,73 @@
package ticktock

import (
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db"
)

type Job interface {
Name() string
Execute(t *TickTock)
}

type cronJob struct {
cronJob gocron.Job
job Job
}

type TickTock struct {
cfg Config
sch gocron.Scheduler
jobs map[string]cronJob
db *db.DB
}

func New(cfg Config, db *db.DB) *TickTock {
sch, _ := gocron.NewScheduler()
t := &TickTock{
cfg: cfg,
sch: sch,
jobs: map[string]cronJob{},
db: db,
}
t.initJobs()
return t
}

func (t *TickTock) Start() {
t.sch.Start()
}

func (t *TickTock) Stop() {
t.sch.Shutdown()
}

func (t *TickTock) RunNow(jobName string) {
j, ok := t.jobs[jobName]
if !ok {
logger.Warnf("job %s not found", jobName)
return
}

j.cronJob.RunNow()
}

func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) {
j, err := t.sch.NewJob(duration, gocron.NewTask(job.Execute, t))
if err != nil {
panic(fmt.Errorf("add job %s: %w", job.Name(), err))
}

t.jobs[job.Name()] = cronJob{
cronJob: j,
job: job,
}
}

func (t *TickTock) initJobs() {
t.addJob(&CheckHubState{}, gocron.DurationJob(time.Minute*5))
}

Loading…
Cancel
Save