| @@ -0,0 +1,78 @@ | |||||
| package cluster | |||||
| import ( | |||||
| "context" | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" | |||||
| ) | |||||
| type Cluster struct { | |||||
| cfg Config | |||||
| cliPool *clirpc.Pool | |||||
| } | |||||
| func New(cfg *Config) *Cluster { | |||||
| c := Config{} | |||||
| if cfg != nil { | |||||
| c = *cfg | |||||
| } else { | |||||
| c.IsMaster = true | |||||
| } | |||||
| return &Cluster{ | |||||
| cfg: c, | |||||
| } | |||||
| } | |||||
| func (c *Cluster) Start() error { | |||||
| log := logger.WithField("Mod", "Cluster") | |||||
| if c.cfg.IsMaster { | |||||
| log.Infof("cluster start as master") | |||||
| return nil | |||||
| } | |||||
| poolCfgJSON := clirpc.PoolConfigJSON{ | |||||
| Address: c.cfg.MasterAddress, | |||||
| RootCA: c.cfg.RootCA, | |||||
| ClientCert: c.cfg.ClientCert, | |||||
| ClientKey: c.cfg.ClientKey, | |||||
| } | |||||
| poolCfg, err := poolCfgJSON.Build() | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| c.cliPool = clirpc.NewPool(*poolCfg) | |||||
| for { | |||||
| cli := c.cliPool.Get() | |||||
| ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) | |||||
| resp, cerr := cli.GetClusterMasterInfo(ctx, &clirpc.GetClusterMasterInfo{}) | |||||
| cancelFn() | |||||
| if cerr != nil { | |||||
| log.Warnf("first report: %v, will retry after 3 seconds", err) | |||||
| time.Sleep(3 * time.Second) | |||||
| continue | |||||
| } | |||||
| log.Infof("cluster start as slave, master is: %v", resp.Name) | |||||
| break | |||||
| } | |||||
| return nil | |||||
| } | |||||
| func (c *Cluster) IsMaster() bool { | |||||
| return c.cfg.IsMaster | |||||
| } | |||||
| func (c *Cluster) Name() string { | |||||
| return c.cfg.Name | |||||
| } | |||||
| func (c *Cluster) MasterClient() *clirpc.Pool { | |||||
| return c.cliPool | |||||
| } | |||||
| @@ -0,0 +1,10 @@ | |||||
| package cluster | |||||
| type Config struct { | |||||
| IsMaster bool `json:"isMaster"` | |||||
| Name string `json:"name"` | |||||
| MasterAddress string `json:"masterAddress"` | |||||
| RootCA string `json:"rootCA"` | |||||
| ClientCert string `json:"clientCert"` | |||||
| ClientKey string `json:"clientKey"` | |||||
| } | |||||
| @@ -9,6 +9,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | ||||
| @@ -76,6 +77,14 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { | |||||
| stgglb.InitLocal(config.Cfg().Local) | stgglb.InitLocal(config.Cfg().Local) | ||||
| // 集群模式 | |||||
| clster := cluster.New(config.Cfg().Cluster) | |||||
| err = clster.Start() | |||||
| if err != nil { | |||||
| logger.Errorf("start cluster failed, err: %v", err) | |||||
| os.Exit(1) | |||||
| } | |||||
| // 数据库 | // 数据库 | ||||
| db, err := db.NewDB(&config.Cfg().DB) | db, err := db.NewDB(&config.Cfg().DB) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -195,12 +204,12 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { | |||||
| uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) | uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) | ||||
| // 定时任务 | // 定时任务 | ||||
| tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock, spdStats) | |||||
| tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock, spdStats, clster) | |||||
| tktk.Start() | tktk.Start() | ||||
| defer tktk.Stop() | defer tktk.Stop() | ||||
| // 用户空间同步功能 | // 用户空间同步功能 | ||||
| spaceSync := spacesyncer.New(db, stgPool, spaceMeta) | |||||
| spaceSync := spacesyncer.New(db, stgPool, spaceMeta, clster) | |||||
| spaceSyncChan := spaceSync.Start() | spaceSyncChan := spaceSync.Start() | ||||
| defer spaceSync.Stop() | defer spaceSync.Stop() | ||||
| @@ -244,7 +253,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { | |||||
| defer httpSvr.Stop() | defer httpSvr.Stop() | ||||
| // RPC接口 | // RPC接口 | ||||
| rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock), nil) | |||||
| rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster), nil) | |||||
| rpcChan := rpcSvr.Start() | rpcChan := rpcSvr.Start() | ||||
| defer rpcSvr.Stop() | defer rpcSvr.Stop() | ||||
| @@ -10,6 +10,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | ||||
| @@ -87,6 +88,14 @@ func test(configPath string) { | |||||
| stgglb.StandaloneMode = config.Cfg().AccessToken == nil | stgglb.StandaloneMode = config.Cfg().AccessToken == nil | ||||
| // 集群模式 | |||||
| clster := cluster.New(config.Cfg().Cluster) | |||||
| err = clster.Start() | |||||
| if err != nil { | |||||
| logger.Errorf("start cluster failed, err: %v", err) | |||||
| os.Exit(1) | |||||
| } | |||||
| // 数据库 | // 数据库 | ||||
| db, err := db.NewDB(&config.Cfg().DB) | db, err := db.NewDB(&config.Cfg().DB) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -203,7 +212,7 @@ func test(configPath string) { | |||||
| uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) | uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) | ||||
| // 用户空间同步功能 | // 用户空间同步功能 | ||||
| spaceSync := spacesyncer.New(db, stgPool, stgMeta) | |||||
| spaceSync := spacesyncer.New(db, stgPool, stgMeta, clster) | |||||
| spaceSyncChan := spaceSync.Start() | spaceSyncChan := spaceSync.Start() | ||||
| defer spaceSync.Stop() | defer spaceSync.Stop() | ||||
| @@ -10,6 +10,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | ||||
| @@ -67,6 +68,14 @@ func vfsTest(configPath string, opts serveHTTPOptions) { | |||||
| stgglb.StandaloneMode = opts.Standalone || config.Cfg().AccessToken == nil | stgglb.StandaloneMode = opts.Standalone || config.Cfg().AccessToken == nil | ||||
| // 集群模式 | |||||
| clster := cluster.New(config.Cfg().Cluster) | |||||
| err = clster.Start() | |||||
| if err != nil { | |||||
| logger.Errorf("start cluster failed, err: %v", err) | |||||
| os.Exit(1) | |||||
| } | |||||
| // 数据库 | // 数据库 | ||||
| db, err := db.NewDB(&config.Cfg().DB) | db, err := db.NewDB(&config.Cfg().DB) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -183,7 +192,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { | |||||
| uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) | uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) | ||||
| // 用户空间同步功能 | // 用户空间同步功能 | ||||
| spaceSync := spacesyncer.New(db, stgPool, stgMeta) | |||||
| spaceSync := spacesyncer.New(db, stgPool, stgMeta, clster) | |||||
| spaceSyncChan := spaceSync.Start() | spaceSyncChan := spaceSync.Start() | ||||
| defer spaceSync.Stop() | defer spaceSync.Stop() | ||||
| @@ -4,6 +4,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/common/utils/config" | "gitlink.org.cn/cloudream/common/utils/config" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" | ||||
| @@ -33,6 +34,7 @@ type Config struct { | |||||
| RPC rpc.Config `json:"rpc"` | RPC rpc.Config `json:"rpc"` | ||||
| Mount *mntcfg.Config `json:"mount"` | Mount *mntcfg.Config `json:"mount"` | ||||
| AccessToken *accesstoken.Config `json:"accessToken"` | AccessToken *accesstoken.Config `json:"accessToken"` | ||||
| Cluster *cluster.Config `json:"cluster"` | |||||
| } | } | ||||
| var cfg Config | var cfg Config | ||||
| @@ -0,0 +1,12 @@ | |||||
| package rpc | |||||
| import ( | |||||
| "context" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" | |||||
| clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" | |||||
| ) | |||||
| func (svc *Service) GetClusterMasterInfo(ctx context.Context, msg *clirpc.GetClusterMasterInfo) (*clirpc.GetClusterMasterInfoResp, *rpc.CodeError) { | |||||
| return &clirpc.GetClusterMasterInfoResp{Name: svc.cluster.Name()}, nil | |||||
| } | |||||
| @@ -1,17 +1,20 @@ | |||||
| package rpc | package rpc | ||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" | ||||
| clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" | clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" | ||||
| ) | ) | ||||
| type Service struct { | type Service struct { | ||||
| pubLock *publock.PubLock | pubLock *publock.PubLock | ||||
| cluster *cluster.Cluster | |||||
| } | } | ||||
| func NewService(pubLock *publock.PubLock) *Service { | |||||
| func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster) *Service { | |||||
| return &Service{ | return &Service{ | ||||
| pubLock: pubLock, | pubLock: pubLock, | ||||
| cluster: cluster, | |||||
| } | } | ||||
| } | } | ||||
| @@ -111,7 +111,7 @@ func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpa | |||||
| // 通知元数据缓存无效 | // 通知元数据缓存无效 | ||||
| svc.UserSpaceMeta.Drop([]jcstypes.UserSpaceID{req.UserSpaceID}) | svc.UserSpaceMeta.Drop([]jcstypes.UserSpaceID{req.UserSpaceID}) | ||||
| // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理 | |||||
| // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理。还有集群模式 | |||||
| svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID) | svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID) | ||||
| // TODO 考虑加锁再进行操作 | // TODO 考虑加锁再进行操作 | ||||
| @@ -7,6 +7,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/async" | "gitlink.org.cn/cloudream/common/pkgs/async" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | ||||
| stgpool "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" | stgpool "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" | ||||
| @@ -25,15 +26,17 @@ type SpaceSyncer struct { | |||||
| db *db.DB | db *db.DB | ||||
| stgPool *stgpool.Pool | stgPool *stgpool.Pool | ||||
| spaceMeta *metacache.UserSpaceMeta | spaceMeta *metacache.UserSpaceMeta | ||||
| cluster *cluster.Cluster | |||||
| lock sync.Mutex | lock sync.Mutex | ||||
| tasks map[jcstypes.SpaceSyncTaskID]*task | tasks map[jcstypes.SpaceSyncTaskID]*task | ||||
| } | } | ||||
| func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta) *SpaceSyncer { | |||||
| func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta, cluster *cluster.Cluster) *SpaceSyncer { | |||||
| return &SpaceSyncer{ | return &SpaceSyncer{ | ||||
| db: db, | db: db, | ||||
| stgPool: stgPool, | stgPool: stgPool, | ||||
| spaceMeta: spaceMeta, | spaceMeta: spaceMeta, | ||||
| cluster: cluster, | |||||
| tasks: make(map[jcstypes.SpaceSyncTaskID]*task), | tasks: make(map[jcstypes.SpaceSyncTaskID]*task), | ||||
| } | } | ||||
| } | } | ||||
| @@ -46,6 +49,11 @@ func (s *SpaceSyncer) Start() *async.UnboundChannel[SpaceSyncerEvent] { | |||||
| ch := async.NewUnboundChannel[SpaceSyncerEvent]() | ch := async.NewUnboundChannel[SpaceSyncerEvent]() | ||||
| if !s.cluster.IsMaster() { | |||||
| log.Infof("not master, skip start space syncer") | |||||
| return ch | |||||
| } | |||||
| allTask, err := db.DoTx01(s.db, s.db.SpaceSyncTask().GetAll) | allTask, err := db.DoTx01(s.db, s.db.SpaceSyncTask().GetAll) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("load task from db: %v", err) | log.Warnf("load task from db: %v", err) | ||||
| @@ -91,6 +99,10 @@ func (s *SpaceSyncer) Stop() { | |||||
| s.lock.Lock() | s.lock.Lock() | ||||
| defer s.lock.Unlock() | defer s.lock.Unlock() | ||||
| if !s.cluster.IsMaster() { | |||||
| return | |||||
| } | |||||
| for _, t := range s.tasks { | for _, t := range s.tasks { | ||||
| t.CancelFn() | t.CancelFn() | ||||
| } | } | ||||
| @@ -101,6 +113,10 @@ func (s *SpaceSyncer) Stop() { | |||||
| func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) { | func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) { | ||||
| log := logger.WithField("Mod", logMod) | log := logger.WithField("Mod", logMod) | ||||
| if !s.cluster.IsMaster() { | |||||
| return nil, fmt.Errorf("not master, create task aborted") | |||||
| } | |||||
| d := s.db | d := s.db | ||||
| err := d.DoTx(func(tx db.SQLContext) error { | err := d.DoTx(func(tx db.SQLContext) error { | ||||
| err := d.SpaceSyncTask().Create(tx, &t) | err := d.SpaceSyncTask().Create(tx, &t) | ||||
| @@ -146,6 +162,10 @@ func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) { | |||||
| func (s *SpaceSyncer) CancelTask(taskID jcstypes.SpaceSyncTaskID) { | func (s *SpaceSyncer) CancelTask(taskID jcstypes.SpaceSyncTaskID) { | ||||
| log := logger.WithField("Mod", logMod) | log := logger.WithField("Mod", logMod) | ||||
| if !s.cluster.IsMaster() { | |||||
| return | |||||
| } | |||||
| s.lock.Lock() | s.lock.Lock() | ||||
| defer s.lock.Unlock() | defer s.lock.Unlock() | ||||
| @@ -170,6 +190,10 @@ func (s *SpaceSyncer) GetTask(taskID jcstypes.SpaceSyncTaskID) *jcstypes.SpaceSy | |||||
| s.lock.Lock() | s.lock.Lock() | ||||
| defer s.lock.Unlock() | defer s.lock.Unlock() | ||||
| if !s.cluster.IsMaster() { | |||||
| return nil | |||||
| } | |||||
| tsk := s.tasks[taskID] | tsk := s.tasks[taskID] | ||||
| if tsk == nil { | if tsk == nil { | ||||
| return nil | return nil | ||||
| @@ -32,6 +32,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { | |||||
| log.Infof("job end, time: %v", time.Since(startTime)) | log.Infof("job end, time: %v", time.Since(startTime)) | ||||
| }() | }() | ||||
| if !t.cluster.IsMaster() { | |||||
| log.Infof("not master, skip") | |||||
| return | |||||
| } | |||||
| ctx := &changeRedundancyContext{ | ctx := &changeRedundancyContext{ | ||||
| ticktock: t, | ticktock: t, | ||||
| allUserSpaces: make(map[jcstypes.UserSpaceID]*userSpaceUsageInfo), | allUserSpaces: make(map[jcstypes.UserSpaceID]*userSpaceUsageInfo), | ||||
| @@ -30,6 +30,11 @@ func (j *CheckShardStore) Execute(t *TickTock) { | |||||
| log.Infof("job end, time: %v", time.Since(startTime)) | log.Infof("job end, time: %v", time.Since(startTime)) | ||||
| }() | }() | ||||
| if !t.cluster.IsMaster() { | |||||
| log.Infof("not master, skip") | |||||
| return | |||||
| } | |||||
| db2 := t.db | db2 := t.db | ||||
| spaceIDs, err := db2.UserSpace().GetAllIDs(db2.DefCtx()) | spaceIDs, err := db2.UserSpace().GetAllIDs(db2.DefCtx()) | ||||
| @@ -6,6 +6,7 @@ import ( | |||||
| "github.com/go-co-op/gocron/v2" | "github.com/go-co-op/gocron/v2" | ||||
| "github.com/samber/lo" | "github.com/samber/lo" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" | ||||
| @@ -34,9 +35,10 @@ type TickTock struct { | |||||
| evtPub *sysevent.Publisher | evtPub *sysevent.Publisher | ||||
| pubLock *publock.PubLock | pubLock *publock.PubLock | ||||
| speedStats *speedstats.SpeedStats | speedStats *speedstats.SpeedStats | ||||
| cluster *cluster.Cluster | |||||
| } | } | ||||
| func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats) *TickTock { | |||||
| func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats, cluster *cluster.Cluster) *TickTock { | |||||
| sch, _ := gocron.NewScheduler() | sch, _ := gocron.NewScheduler() | ||||
| t := &TickTock{ | t := &TickTock{ | ||||
| cfg: cfg, | cfg: cfg, | ||||
| @@ -48,6 +50,7 @@ func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *poo | |||||
| evtPub: evtPub, | evtPub: evtPub, | ||||
| pubLock: pubLock, | pubLock: pubLock, | ||||
| speedStats: speedStats, | speedStats: speedStats, | ||||
| cluster: cluster, | |||||
| } | } | ||||
| t.initJobs() | t.initJobs() | ||||
| return t | return t | ||||
| @@ -22,6 +22,11 @@ func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) { | |||||
| log.Infof("job end, time: %v", time.Since(startTime)) | log.Infof("job end, time: %v", time.Since(startTime)) | ||||
| }() | }() | ||||
| if !t.cluster.IsMaster() { | |||||
| log.Infof("not master, skip") | |||||
| return | |||||
| } | |||||
| err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) | err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("update all package access stat amount: %v", err) | log.Warnf("update all package access stat amount: %v", err) | ||||
| @@ -26,6 +26,11 @@ func (j *UserSpaceGC) Execute(t *TickTock) { | |||||
| log.Infof("job end, time: %v", time.Since(startTime)) | log.Infof("job end, time: %v", time.Since(startTime)) | ||||
| }() | }() | ||||
| if !t.cluster.IsMaster() { | |||||
| log.Infof("not master, skip") | |||||
| return | |||||
| } | |||||
| spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) | spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("getting user space ids: %v", err) | log.Warnf("getting user space ids: %v", err) | ||||
| @@ -26,16 +26,19 @@ var file_pkgs_rpc_client_client_proto_rawDesc = []byte{ | |||||
| 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, | 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, | ||||
| 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, | 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, | ||||
| 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, | 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, | ||||
| 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x3b, 0x0a, 0x06, 0x43, 0x6c, | |||||
| 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x70, 0x0a, 0x06, 0x43, 0x6c, | |||||
| 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43, | 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43, | ||||
| 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, | 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, | ||||
| 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, | 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, | ||||
| 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, | |||||
| 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, | |||||
| 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, | |||||
| 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x72, | |||||
| 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, | |||||
| 0x33, | |||||
| 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x43, 0x6c, | |||||
| 0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, | |||||
| 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, | |||||
| 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, | |||||
| 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, | |||||
| 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, | |||||
| 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, | |||||
| 0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, | |||||
| 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, | |||||
| } | } | ||||
| var file_pkgs_rpc_client_client_proto_goTypes = []any{ | var file_pkgs_rpc_client_client_proto_goTypes = []any{ | ||||
| @@ -44,9 +47,11 @@ var file_pkgs_rpc_client_client_proto_goTypes = []any{ | |||||
| } | } | ||||
| var file_pkgs_rpc_client_client_proto_depIdxs = []int32{ | var file_pkgs_rpc_client_client_proto_depIdxs = []int32{ | ||||
| 0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request | 0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request | ||||
| 1, // 1: clirpc.Client.PubLockChannel:output_type -> rpc.Response | |||||
| 1, // [1:2] is the sub-list for method output_type | |||||
| 0, // [0:1] is the sub-list for method input_type | |||||
| 0, // 1: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request | |||||
| 1, // 2: clirpc.Client.PubLockChannel:output_type -> rpc.Response | |||||
| 1, // 3: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response | |||||
| 2, // [2:4] is the sub-list for method output_type | |||||
| 0, // [0:2] is the sub-list for method input_type | |||||
| 0, // [0:0] is the sub-list for extension type_name | 0, // [0:0] is the sub-list for extension type_name | ||||
| 0, // [0:0] is the sub-list for extension extendee | 0, // [0:0] is the sub-list for extension extendee | ||||
| 0, // [0:0] is the sub-list for field type_name | 0, // [0:0] is the sub-list for field type_name | ||||
| @@ -8,4 +8,6 @@ option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/clirpc;cli | |||||
| service Client { | service Client { | ||||
| rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response); | rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response); | ||||
| rpc GetClusterMasterInfo(rpc.Request) returns(rpc.Response); | |||||
| } | } | ||||
| @@ -20,7 +20,8 @@ import ( | |||||
| const _ = grpc.SupportPackageIsVersion7 | const _ = grpc.SupportPackageIsVersion7 | ||||
| const ( | const ( | ||||
| Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel" | |||||
| Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel" | |||||
| Client_GetClusterMasterInfo_FullMethodName = "/clirpc.Client/GetClusterMasterInfo" | |||||
| ) | ) | ||||
| // ClientClient is the client API for Client service. | // ClientClient is the client API for Client service. | ||||
| @@ -28,6 +29,7 @@ const ( | |||||
| // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. | // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. | ||||
| type ClientClient interface { | type ClientClient interface { | ||||
| PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error) | PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error) | ||||
| GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) | |||||
| } | } | ||||
| type clientClient struct { | type clientClient struct { | ||||
| @@ -69,11 +71,21 @@ func (x *clientPubLockChannelClient) Recv() (*rpc.Response, error) { | |||||
| return m, nil | return m, nil | ||||
| } | } | ||||
| func (c *clientClient) GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { | |||||
| out := new(rpc.Response) | |||||
| err := c.cc.Invoke(ctx, Client_GetClusterMasterInfo_FullMethodName, in, out, opts...) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return out, nil | |||||
| } | |||||
| // ClientServer is the server API for Client service. | // ClientServer is the server API for Client service. | ||||
| // All implementations must embed UnimplementedClientServer | // All implementations must embed UnimplementedClientServer | ||||
| // for forward compatibility | // for forward compatibility | ||||
| type ClientServer interface { | type ClientServer interface { | ||||
| PubLockChannel(Client_PubLockChannelServer) error | PubLockChannel(Client_PubLockChannelServer) error | ||||
| GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) | |||||
| mustEmbedUnimplementedClientServer() | mustEmbedUnimplementedClientServer() | ||||
| } | } | ||||
| @@ -84,6 +96,9 @@ type UnimplementedClientServer struct { | |||||
| func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) error { | func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) error { | ||||
| return status.Errorf(codes.Unimplemented, "method PubLockChannel not implemented") | return status.Errorf(codes.Unimplemented, "method PubLockChannel not implemented") | ||||
| } | } | ||||
| func (UnimplementedClientServer) GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) { | |||||
| return nil, status.Errorf(codes.Unimplemented, "method GetClusterMasterInfo not implemented") | |||||
| } | |||||
| func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {} | func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {} | ||||
| // UnsafeClientServer may be embedded to opt out of forward compatibility for this service. | // UnsafeClientServer may be embedded to opt out of forward compatibility for this service. | ||||
| @@ -123,13 +138,36 @@ func (x *clientPubLockChannelServer) Recv() (*rpc.Request, error) { | |||||
| return m, nil | return m, nil | ||||
| } | } | ||||
| func _Client_GetClusterMasterInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { | |||||
| in := new(rpc.Request) | |||||
| if err := dec(in); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| if interceptor == nil { | |||||
| return srv.(ClientServer).GetClusterMasterInfo(ctx, in) | |||||
| } | |||||
| info := &grpc.UnaryServerInfo{ | |||||
| Server: srv, | |||||
| FullMethod: Client_GetClusterMasterInfo_FullMethodName, | |||||
| } | |||||
| handler := func(ctx context.Context, req interface{}) (interface{}, error) { | |||||
| return srv.(ClientServer).GetClusterMasterInfo(ctx, req.(*rpc.Request)) | |||||
| } | |||||
| return interceptor(ctx, in, info, handler) | |||||
| } | |||||
| // Client_ServiceDesc is the grpc.ServiceDesc for Client service. | // Client_ServiceDesc is the grpc.ServiceDesc for Client service. | ||||
| // It's only intended for direct use with grpc.RegisterService, | // It's only intended for direct use with grpc.RegisterService, | ||||
| // and not to be introspected or modified (even as a copy) | // and not to be introspected or modified (even as a copy) | ||||
| var Client_ServiceDesc = grpc.ServiceDesc{ | var Client_ServiceDesc = grpc.ServiceDesc{ | ||||
| ServiceName: "clirpc.Client", | ServiceName: "clirpc.Client", | ||||
| HandlerType: (*ClientServer)(nil), | HandlerType: (*ClientServer)(nil), | ||||
| Methods: []grpc.MethodDesc{}, | |||||
| Methods: []grpc.MethodDesc{ | |||||
| { | |||||
| MethodName: "GetClusterMasterInfo", | |||||
| Handler: _Client_GetClusterMasterInfo_Handler, | |||||
| }, | |||||
| }, | |||||
| Streams: []grpc.StreamDesc{ | Streams: []grpc.StreamDesc{ | ||||
| { | { | ||||
| StreamName: "PubLockChannel", | StreamName: "PubLockChannel", | ||||
| @@ -0,0 +1,29 @@ | |||||
| package clirpc | |||||
| import ( | |||||
| "context" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" | |||||
| ) | |||||
| type ClusterService interface { | |||||
| GetClusterMasterInfo(ctx context.Context, req *GetClusterMasterInfo) (*GetClusterMasterInfoResp, *rpc.CodeError) | |||||
| } | |||||
| type GetClusterMasterInfo struct { | |||||
| } | |||||
| type GetClusterMasterInfoResp struct { | |||||
| Name string | |||||
| } | |||||
| func (c *Client) GetClusterMasterInfo(ctx context.Context, msg *GetClusterMasterInfo) (*GetClusterMasterInfoResp, *rpc.CodeError) { | |||||
| if c.fusedErr != nil { | |||||
| return nil, c.fusedErr | |||||
| } | |||||
| return rpc.UnaryClient[*GetClusterMasterInfoResp](c.cli.GetClusterMasterInfo, ctx, msg) | |||||
| } | |||||
| func (s *Server) GetClusterMasterInfo(ctx context.Context, msg *rpc.Request) (*rpc.Response, error) { | |||||
| return rpc.UnaryServer(s.svrImpl.GetClusterMasterInfo, ctx, msg) | |||||
| } | |||||
| @@ -24,11 +24,10 @@ type PoolConfigJSON struct { | |||||
| ClientKey string `json:"clientKey"` | ClientKey string `json:"clientKey"` | ||||
| } | } | ||||
| func (c *PoolConfigJSON) Build(tokenProv rpc.AccessTokenProvider) (*PoolConfig, error) { | |||||
| func (c *PoolConfigJSON) Build() (*PoolConfig, error) { | |||||
| pc := &PoolConfig{ | pc := &PoolConfig{ | ||||
| Address: c.Address, | Address: c.Address, | ||||
| } | } | ||||
| pc.Conn.AccessTokenProvider = tokenProv | |||||
| rootCA, err := os.ReadFile(c.RootCA) | rootCA, err := os.ReadFile(c.RootCA) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -45,8 +44,6 @@ func (c *PoolConfigJSON) Build(tokenProv rpc.AccessTokenProvider) (*PoolConfig, | |||||
| return nil, fmt.Errorf("load client cert: %v", err) | return nil, fmt.Errorf("load client cert: %v", err) | ||||
| } | } | ||||
| pc.Conn.ClientCert = &cert | pc.Conn.ClientCert = &cert | ||||
| } else if tokenProv == nil { | |||||
| return nil, fmt.Errorf("must provide client cert or access token provider") | |||||
| } | } | ||||
| return pc, nil | return pc, nil | ||||
| @@ -5,6 +5,7 @@ import ( | |||||
| ) | ) | ||||
| type ClientAPI interface { | type ClientAPI interface { | ||||
| ClusterService | |||||
| PubLockService | PubLockService | ||||
| } | } | ||||