From 04da2d81320080a133d14a527b3bea4e3dd0fb9f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 21 Apr 2025 11:21:20 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E6=A4=8D=E6=A3=80=E6=9F=A5Hub?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E7=9A=84=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/assets/confs/coordinator.config.json | 3 + coordinator/internal/cmd/serve.go | 91 ++++++++++++------- coordinator/internal/config/config.go | 8 +- coordinator/internal/repl/repl.go | 91 +++++++++++++++++++ coordinator/internal/repl/ticktock.go | 25 +++++ .../internal/ticktock/check_hub_state.go | 81 +++++++++++++++++ coordinator/internal/ticktock/config.go | 7 ++ coordinator/internal/ticktock/ticktock.go | 73 +++++++++++++++ 8 files changed, 345 insertions(+), 34 deletions(-) create mode 100644 coordinator/internal/repl/repl.go create mode 100644 coordinator/internal/repl/ticktock.go create mode 100644 coordinator/internal/ticktock/check_hub_state.go create mode 100644 coordinator/internal/ticktock/config.go create mode 100644 coordinator/internal/ticktock/ticktock.go diff --git a/common/assets/confs/coordinator.config.json b/common/assets/confs/coordinator.config.json index af60414..86bbbd0 100644 --- a/common/assets/confs/coordinator.config.json +++ b/common/assets/confs/coordinator.config.json @@ -20,5 +20,8 @@ "retryNum": 5, "retryInterval": 5000 } + }, + "tickTock": { + "hubUnavailableTime": "20s" } } \ No newline at end of file diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index d50be27..0a20081 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -1,18 +1,19 @@ package cmd import ( - "context" "fmt" "os" "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/repl" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" ) func init() { @@ -42,6 +43,8 @@ func serve(configPath string) { os.Exit(1) } + stgglb.InitMQPool(config.Cfg().RabbitMQ) + db2, err := db.NewDB(&config.Cfg().DB) if err != nil { logger.Fatalf("new db2 failed, err: %s", err.Error()) @@ -67,45 +70,71 @@ func serve(configPath string) { // 启动服务 go serveCoorServer(coorSvr, config.Cfg().RabbitMQ) - forever := make(chan bool) - <-forever -} - -func servePublisher(evtPub *sysevent.Publisher) { - logger.Info("start serving sysevent publisher") + // 定时任务 + tktk := ticktock.New(config.Cfg().TickTock, db2) + tktk.Start() + defer tktk.Stop() - ch := evtPub.Start() + // 交互式命令行 + rep := repl.New(db2, tktk) + replCh := rep.Start() + /// 开始监听各个模块的事件 + replEvt := replCh.Receive() loop: for { - val, err := ch.Receive().Wait(context.Background()) - if err != nil { - logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) - break - } - - switch val := val.(type) { - case sysevent.PublishError: - logger.Errorf("publishing event: %v", val) - - case sysevent.PublisherExited: - if val.Err != nil { - logger.Errorf("publisher exited with error: %v", val.Err) - } else { - logger.Info("publisher exited") + select { + case e := <-replEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive repl event: %v", err) + break loop } - break loop - case sysevent.OtherError: - logger.Errorf("sysevent: %v", val) + switch e.Value.(type) { + case repl.ExitEvent: + logger.Info("exit by repl") + break loop + } + replEvt = replCh.Receive() } } - logger.Info("sysevent publisher stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) } +// func servePublisher(evtPub *sysevent.Publisher) { +// logger.Info("start serving sysevent publisher") + +// ch := evtPub.Start() + +// loop: +// for { +// val, err := ch.Receive().Wait(context.Background()) +// if err != nil { +// logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) +// break +// } + +// switch val := val.(type) { +// case sysevent.PublishError: +// logger.Errorf("publishing event: %v", val) + +// case sysevent.PublisherExited: +// if val.Err != nil { +// logger.Errorf("publisher exited with error: %v", val.Err) +// } else { +// logger.Info("publisher exited") +// } +// break loop + +// case sysevent.OtherError: +// logger.Errorf("sysevent: %v", val) +// } +// } +// logger.Info("sysevent publisher stopped") + +// // TODO 仅简单结束了程序 +// os.Exit(1) +// } + func serveCoorServer(server *coormq.Server, cfg mq.Config) { logger.Info("start serving command server") diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index 5608e3d..678ced6 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -5,12 +5,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" ) type Config struct { - Logger log.Config `json:"logger"` - DB db.Config `json:"db"` - RabbitMQ mq.Config `json:"rabbitMQ"` + Logger log.Config `json:"logger"` + DB db.Config `json:"db"` + RabbitMQ mq.Config `json:"rabbitMQ"` + TickTock ticktock.Config `json:"tickTock"` } var cfg Config diff --git a/coordinator/internal/repl/repl.go b/coordinator/internal/repl/repl.go new file mode 100644 index 0000000..691285d --- /dev/null +++ b/coordinator/internal/repl/repl.go @@ -0,0 +1,91 @@ +package repl + +import ( + "context" + "strings" + + "github.com/c-bata/go-prompt" + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" +) + +var ( + cmdCtxKey = &CommandContext{} +) + +type ReplEventChan = async.UnboundChannel[ReplEvent] + +type ReplEvent interface { + IsReplEvent() bool +} + +type ExitEvent struct { + ReplEvent +} + +type Repl struct { + prompt *prompt.Prompt + evtCh *ReplEventChan + db *db.DB + tktk *ticktock.TickTock +} + +func New(db *db.DB, ticktock *ticktock.TickTock) *Repl { + r := &Repl{ + evtCh: async.NewUnboundChannel[ReplEvent](), + db: db, + tktk: ticktock, + } + r.prompt = prompt.New( + r.executor, + r.completer, + prompt.OptionPrefix(">>> "), + prompt.OptionTitle("JCS Coordinator REPL"), + prompt.OptionSetExitCheckerOnInput(r.exitChecker), + ) + return r +} + +func (r *Repl) Start() *ReplEventChan { + go func() { + r.prompt.Run() + r.evtCh.Send(ExitEvent{}) + }() + return r.evtCh +} + +func (r *Repl) completer(d prompt.Document) []prompt.Suggest { + return nil +} + +func (r *Repl) executor(input string) { + fields := strings.Fields(input) + if len(fields) == 0 { + return + } + + RootCmd.SetArgs(fields) + RootCmd.ExecuteContext(context.WithValue(context.Background(), cmdCtxKey, &CommandContext{ + repl: r, + })) +} + +func (r *Repl) exitChecker(input string, breakline bool) bool { + if breakline && input == "exit" { + return true + } + + return false +} + +var RootCmd = &cobra.Command{} + +type CommandContext struct { + repl *Repl +} + +func GetCmdCtx(cmd *cobra.Command) *CommandContext { + return cmd.Context().Value(cmdCtxKey).(*CommandContext) +} diff --git a/coordinator/internal/repl/ticktock.go b/coordinator/internal/repl/ticktock.go new file mode 100644 index 0000000..5a9c69f --- /dev/null +++ b/coordinator/internal/repl/ticktock.go @@ -0,0 +1,25 @@ +package repl + +import "github.com/spf13/cobra" + +func init() { + ttCmd := &cobra.Command{ + Use: "ticktock", + Short: "ticktock command", + } + RootCmd.AddCommand(ttCmd) + + runCmd := &cobra.Command{ + Use: "run [jobName]", + Short: "run job now", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + tickTockRun(GetCmdCtx(cmd), args[0]) + }, + } + ttCmd.AddCommand(runCmd) +} + +func tickTockRun(ctx *CommandContext, jobName string) { + ctx.repl.tktk.RunNow(jobName) +} diff --git a/coordinator/internal/ticktock/check_hub_state.go b/coordinator/internal/ticktock/check_hub_state.go new file mode 100644 index 0000000..063d42c --- /dev/null +++ b/coordinator/internal/ticktock/check_hub_state.go @@ -0,0 +1,81 @@ +package ticktock + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/jcs-pub/common/consts" + stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +type CheckHubState struct { +} + +func (j *CheckHubState) Name() string { + return reflect2.TypeNameOf[CheckHubState]() +} + +func (j *CheckHubState) Execute(t *TickTock) { + log := logger.WithType[CheckHubState]("TickTock") + log.Debugf("job start") + startTime := time.Now() + defer func() { + log.Debugf("job end, time: %v", time.Since(startTime)) + }() + + hubs, err := t.db.Hub().GetAllHubs(t.db.DefCtx()) + if err != nil { + log.Warnf("get all hubs: %s", err.Error()) + return + } + + for _, hub := range hubs { + err := j.checkOne(t, hub) + if err != nil { + log.Warnf("check one hub %v: %s", hub, err.Error()) + } + } +} + +func (j *CheckHubState) checkOne(t *TickTock, hub cortypes.Hub) error { + log := logger.WithType[CheckHubState]("TickTock") + + agtCli, err := stgglb.HubMQPool.Acquire(hub.HubID) + if err != nil { + return fmt.Errorf("new hub mq client: %w", err) + } + defer stgglb.HubMQPool.Release(agtCli) + + _, err = agtCli.GetState(hubmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) + if err != nil { + if hub.LastReportTime != nil { + if time.Since(*hub.LastReportTime) > t.cfg.HubUnavailableTime { + err := t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateUnavailable) + if err != nil { + log.Warnf("set hub %v state: %s", hub, err.Error()) + } + } + + } else if hub.LastReportTime == nil { + err := t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateUnavailable) + if err != nil { + log.Warnf("set hub %v state: %s", hub, err.Error()) + } + } + + return fmt.Errorf("getting state: %w", err) + } + + // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal + err = t.db.Hub().UpdateState(t.db.DefCtx(), hub.HubID, consts.HubStateNormal) + if err != nil { + log.Warnf("set hub %v state: %s", hub, err.Error()) + } + + return nil +} diff --git a/coordinator/internal/ticktock/config.go b/coordinator/internal/ticktock/config.go new file mode 100644 index 0000000..e46c2e9 --- /dev/null +++ b/coordinator/internal/ticktock/config.go @@ -0,0 +1,7 @@ +package ticktock + +import "time" + +type Config struct { + HubUnavailableTime time.Duration `json:"hubUnavailableTime"` +} diff --git a/coordinator/internal/ticktock/ticktock.go b/coordinator/internal/ticktock/ticktock.go new file mode 100644 index 0000000..f63fbf0 --- /dev/null +++ b/coordinator/internal/ticktock/ticktock.go @@ -0,0 +1,73 @@ +package ticktock + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" +) + +type Job interface { + Name() string + Execute(t *TickTock) +} + +type cronJob struct { + cronJob gocron.Job + job Job +} + +type TickTock struct { + cfg Config + sch gocron.Scheduler + jobs map[string]cronJob + db *db.DB +} + +func New(cfg Config, db *db.DB) *TickTock { + sch, _ := gocron.NewScheduler() + t := &TickTock{ + cfg: cfg, + sch: sch, + jobs: map[string]cronJob{}, + db: db, + } + t.initJobs() + return t +} + +func (t *TickTock) Start() { + t.sch.Start() +} + +func (t *TickTock) Stop() { + t.sch.Shutdown() +} + +func (t *TickTock) RunNow(jobName string) { + j, ok := t.jobs[jobName] + if !ok { + logger.Warnf("job %s not found", jobName) + return + } + + j.cronJob.RunNow() +} + +func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) { + j, err := t.sch.NewJob(duration, gocron.NewTask(job.Execute, t)) + if err != nil { + panic(fmt.Errorf("add job %s: %w", job.Name(), err)) + } + + t.jobs[job.Name()] = cronJob{ + cronJob: j, + job: job, + } +} + +func (t *TickTock) initJobs() { + t.addJob(&CheckHubState{}, gocron.DurationJob(time.Minute*5)) +}