package cmd import ( "fmt" "os" "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/accesstoken" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/repl" myrpc "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/rpc" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" ) func init() { var configPath string cmd := &cobra.Command{ Use: "serve", Short: "Start coordinator server", Long: `Start coordinator server`, Run: func(cmd *cobra.Command, args []string) { serve(configPath) }, } cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") RootCmd.AddCommand(cmd) } func serve(configPath string) { err := config.Init(configPath) if err != nil { fmt.Printf("init config failed, err: %s", err.Error()) os.Exit(1) } err = logger.Init(&config.Cfg().Logger) if err != nil { fmt.Printf("init logger failed, err: %s", err.Error()) os.Exit(1) } hubRPCCfg, err := config.Cfg().HubRPC.Build(nil) if err != nil { logger.Errorf("build hub rpc config: %v", err) os.Exit(1) } stgglb.InitPools(hubRPCCfg, nil) db2, err := db.NewDB(&config.Cfg().DB) if err != nil { logger.Fatalf("new db2 failed, err: %s", err.Error()) } // 初始化系统事件发布器 // evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &cortypes.SourceCoordinator{}) // if err != nil { // logger.Errorf("new sysevent publisher: %v", err) // os.Exit(1) // } // go servePublisher(evtPub) // 客户端访问令牌缓存 accToken := accesstoken.New(db2) accTokenChan := accToken.Start() defer accToken.Stop() // RPC服务 rpcSvr := corrpc.NewServer(config.Cfg().RPC, myrpc.NewService(db2, accToken), accToken) rpcSvrChan := rpcSvr.Start() defer rpcSvr.Stop() // 定时任务 tktk := ticktock.New(config.Cfg().TickTock, db2, accToken) tktk.Start() defer tktk.Stop() // 交互式命令行 rep := repl.New(db2, tktk, accToken) replCh := rep.Start() /// 开始监听各个模块的事件 accTokenEvt := accTokenChan.Receive() replEvt := replCh.Receive() rpcEvt := rpcSvrChan.Receive() loop: for { select { case e := <-accTokenEvt.Chan(): if e.Err != nil { logger.Errorf("receive access token event: %v", e.Err) break loop } switch e := e.Value.(type) { case accesstoken.ExitEvent: if e.Err != nil { logger.Errorf("access token cache exited with error: %v", e.Err) } else { logger.Info("access token cache exited") } break loop } accTokenEvt = accTokenChan.Receive() case e := <-replEvt.Chan(): if e.Err != nil { logger.Errorf("receive repl event: %v", err) break loop } switch e.Value.(type) { case repl.ExitEvent: logger.Info("exit by repl") break loop } replEvt = replCh.Receive() case e := <-rpcEvt.Chan(): if e.Err != nil { logger.Errorf("receive rpc event: %v", e.Err) break loop } switch e := e.Value.(type) { case rpc.ExitEvent: if e.Err != nil { logger.Errorf("rpc server exited with error: %v", e.Err) } else { logger.Infof("rpc server exited") } break loop } rpcEvt = rpcSvrChan.Receive() } } } // func servePublisher(evtPub *sysevent.Publisher) { // logger.Info("start serving sysevent publisher") // ch := evtPub.Start() // loop: // for { // val, err := ch.Receive().Wait(context.Background()) // if err != nil { // logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) // break // } // switch val := val.(type) { // case sysevent.PublishError: // logger.Errorf("publishing event: %v", val) // case sysevent.PublisherExited: // if val.Err != nil { // logger.Errorf("publisher exited with error: %v", val.Err) // } else { // logger.Info("publisher exited") // } // break loop // case sysevent.OtherError: // logger.Errorf("sysevent: %v", val) // } // } // logger.Info("sysevent publisher stopped") // // TODO 仅简单结束了程序 // os.Exit(1) // }