package cmd import ( "context" "fmt" "os" "time" "github.com/spf13/cobra" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/http" myrpc "gitlink.org.cn/cloudream/jcs-pub/hub/internal/rpc" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/config" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/ticktock" coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" ) func init() { var configPath string var opt serveOptions cmd := cobra.Command{ Use: "serve", Short: "start serving hub", Run: func(cmd *cobra.Command, args []string) { serve(configPath, opt) }, } cmd.Flags().StringVarP(&configPath, "config", "c", "", "config file path") cmd.Flags().BoolVarP(&opt.DisableHTTP, "no-http", "", false, "disable http server") cmd.Flags().StringVarP(&opt.HTTPListenAddr, "http", "", "", "http listen address, will override config file") RootCmd.AddCommand(&cmd) } type serveOptions struct { DisableHTTP bool HTTPListenAddr string } func serve(configPath string, opts serveOptions) { err := config.Init(configPath) if err != nil { fmt.Printf("init config failed, err: %s", err.Error()) os.Exit(1) } err = logger.Init(&config.Cfg().Logger) if err != nil { fmt.Printf("init logger failed, err: %s", err.Error()) os.Exit(1) } stgglb.InitLocal(config.Cfg().Local) stgglb.InitPools(&hubrpc.PoolConfig{}, &config.Cfg().CoordinatorRPC) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 hubCfg := downloadHubConfig() // 初始化存储服务管理器 stgPool := pool.NewPool() // 初始化执行器 worker := exec.NewWorker() // HTTP接口 httpCfg := config.Cfg().HTTP if !opts.DisableHTTP && httpCfg != nil && httpCfg.Enabled { if opts.HTTPListenAddr != "" { httpCfg.Listen = opts.HTTPListenAddr } } else { httpCfg = nil } httpSvr := http.NewServer(httpCfg, http.NewService(&worker, stgPool)) httpChan := httpSvr.Start() defer httpSvr.Stop() // 启动访问统计服务 // acStat := accessstat.NewAccessStat(accessstat.Config{ // // TODO 考虑放到配置里 // ReportInterval: time.Second * 10, // }) // go serveAccessStat(acStat) // 初始化系统事件发布器 evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceHub{ HubID: hubCfg.Hub.HubID, HubName: hubCfg.Hub.Name, }) if err != nil { logger.Errorf("new sysevent publisher: %v", err) os.Exit(1) } evtPubChan := evtPub.Start() defer evtPub.Stop() // 初始化定时任务执行器 tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool) tktk.Start() defer tktk.Stop() // RPC服务 rpcSvr := hubrpc.NewServer(config.Cfg().RPC, myrpc.NewService(&worker, stgPool)) rpcSvrChan := rpcSvr.Start() defer rpcSvr.Stop() /// 开始监听各个模块的事件 evtPubEvt := evtPubChan.Receive() rpcEvt := rpcSvrChan.Receive() httpEvt := httpChan.Receive() loop: for { select { case e := <-evtPubEvt.Chan(): if e.Err != nil { logger.Errorf("receive publisher event: %v", err) break loop } switch val := e.Value.(type) { case sysevent.PublishError: logger.Errorf("publishing event: %v", val) case sysevent.PublisherExited: if val.Err != nil { logger.Errorf("publisher exited with error: %v", val.Err) } else { logger.Info("publisher exited") } break loop case sysevent.OtherError: logger.Errorf("sysevent: %v", val) } evtPubEvt = evtPubChan.Receive() case e := <-rpcEvt.Chan(): if e.Err != nil { logger.Errorf("receive rpc event: %v", e.Err) break loop } switch e := e.Value.(type) { case rpc.ExitEvent: if e.Err != nil { logger.Errorf("rpc server exited with error: %v", e.Err) } else { logger.Infof("rpc server exited") } break loop } rpcEvt = rpcSvrChan.Receive() case e := <-httpEvt.Chan(): if e.Err != nil { logger.Errorf("receive http event: %v", err) break loop } switch e := e.Value.(type) { case http.ExitEvent: logger.Infof("http server exited, err: %v", e.Err) break loop } httpEvt = httpChan.Receive() } } } func downloadHubConfig() coormq.GetHubConfigResp { coorCli := stgglb.CoordinatorRPCPool.Get() defer coorCli.Release() ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() cfgResp, cerr := coorCli.GetHubConfig(ctx, coormq.ReqGetHubConfig(cortypes.HubID(config.Cfg().ID))) if cerr != nil { logger.Errorf("getting hub config: %v", cerr) os.Exit(1) } return *cfgResp }