| @@ -1,8 +0,0 @@ | |||||
| # Scanner服务 | |||||
| ## 目录结构 | |||||
| - `internal`:服务源码。 | |||||
| - `config`:服务使用的配置文件结构定义。 | |||||
| - `event`:被投递到队列顺序执行的事件。 | |||||
| - `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\scanner`目录里文件定义的接口。 | |||||
| - `tickevent`:定时执行的事件。 | |||||
| @@ -1,29 +0,0 @@ | |||||
| package config | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock" | |||||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| c "gitlink.org.cn/cloudream/common/utils/config" | |||||
| db "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2/config" | |||||
| ) | |||||
| type Config struct { | |||||
| AccessStatHistoryAmount float64 `json:"accessStatHistoryAmount"` | |||||
| ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` | |||||
| HubUnavailableSeconds int `json:"hubUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 | |||||
| Logger log.Config `json:"logger"` | |||||
| DB db.Config `json:"db"` | |||||
| RabbitMQ mq.Config `json:"rabbitMQ"` | |||||
| DistLock distlock.Config `json:"distlock"` | |||||
| } | |||||
| var cfg Config | |||||
| func Init() error { | |||||
| return c.DefaultLoad("scanner", &cfg) | |||||
| } | |||||
| func Cfg() *Config { | |||||
| return &cfg | |||||
| } | |||||
| @@ -1,81 +0,0 @@ | |||||
| package event | |||||
| import ( | |||||
| "database/sql" | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/consts" | |||||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||||
| agtmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||||
| scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" | |||||
| ) | |||||
| type HubCheckState struct { | |||||
| *scevt.HubCheckState | |||||
| } | |||||
| func NewHubCheckState(evt *scevt.HubCheckState) *HubCheckState { | |||||
| return &HubCheckState{ | |||||
| HubCheckState: evt, | |||||
| } | |||||
| } | |||||
| func (t *HubCheckState) TryMerge(other Event) bool { | |||||
| event, ok := other.(*HubCheckState) | |||||
| if !ok { | |||||
| return false | |||||
| } | |||||
| return t.HubID == event.HubID | |||||
| } | |||||
| func (t *HubCheckState) Execute(execCtx ExecuteContext) { | |||||
| log := logger.WithType[HubCheckState]("Event") | |||||
| log.Debugf("begin with %v", logger.FormatStruct(t.HubCheckState)) | |||||
| defer log.Debugf("end") | |||||
| hub, err := execCtx.Args.DB.Hub().GetByID(execCtx.Args.DB.DefCtx(), t.HubID) | |||||
| if err == sql.ErrNoRows { | |||||
| return | |||||
| } | |||||
| if err != nil { | |||||
| log.WithField("HubID", t.HubID).Warnf("get hub by id failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| agtCli, err := stgglb.HubMQPool.Acquire(t.HubID) | |||||
| if err != nil { | |||||
| log.WithField("HubID", t.HubID).Warnf("create hub client failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| defer stgglb.HubMQPool.Release(agtCli) | |||||
| _, err = agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) | |||||
| if err != nil { | |||||
| log.WithField("HubID", t.HubID).Warnf("getting state: %s", err.Error()) | |||||
| // 检查上次上报时间,超时的设置为不可用 | |||||
| // TODO 没有上报过是否要特殊处理? | |||||
| if hub.LastReportTime != nil && time.Since(*hub.LastReportTime) > time.Duration(config.Cfg().HubUnavailableSeconds)*time.Second { | |||||
| err := execCtx.Args.DB.Hub().UpdateState(execCtx.Args.DB.DefCtx(), t.HubID, consts.HubStateUnavailable) | |||||
| if err != nil { | |||||
| log.WithField("HubID", t.HubID).Warnf("set hub state failed, err: %s", err.Error()) | |||||
| } | |||||
| } | |||||
| return | |||||
| } | |||||
| // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal | |||||
| err = execCtx.Args.DB.Hub().UpdateState(execCtx.Args.DB.DefCtx(), t.HubID, consts.HubStateNormal) | |||||
| if err != nil { | |||||
| log.WithField("HubID", t.HubID).Warnf("change hub state failed, err: %s", err.Error()) | |||||
| } | |||||
| } | |||||
| func init() { | |||||
| RegisterMessageConvertor(NewHubCheckState) | |||||
| } | |||||
| @@ -1,55 +0,0 @@ | |||||
| package event | |||||
| import ( | |||||
| "fmt" | |||||
| "reflect" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock" | |||||
| event "gitlink.org.cn/cloudream/common/pkgs/event" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" | |||||
| scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" | |||||
| ) | |||||
| type ExecuteArgs struct { | |||||
| DB *db2.DB | |||||
| DistLock *distlock.Service | |||||
| StgMgr *agtpool.HubPool | |||||
| EvtPub *sysevent.Publisher | |||||
| } | |||||
| type Executor = event.Executor[ExecuteArgs] | |||||
| type ExecuteContext = event.ExecuteContext[ExecuteArgs] | |||||
| type Event = event.Event[ExecuteArgs] | |||||
| type ExecuteOption = event.ExecuteOption | |||||
| func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) Executor { | |||||
| return event.NewExecutor(ExecuteArgs{ | |||||
| DB: db, | |||||
| DistLock: distLock, | |||||
| StgMgr: stgAgts, | |||||
| EvtPub: evtPub, | |||||
| }) | |||||
| } | |||||
| var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() | |||||
| func FromMessage(msg scevt.Event) (Event, error) { | |||||
| event, ok := msgDispatcher.Dispatch(msg) | |||||
| if !ok { | |||||
| return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).String()) | |||||
| } | |||||
| return event, nil | |||||
| } | |||||
| func RegisterMessageConvertor[T any, TEvt Event](converter func(msg T) TEvt) { | |||||
| typedispatcher.Add(msgDispatcher, func(msg T) Event { | |||||
| return converter(msg) | |||||
| }) | |||||
| } | |||||
| @@ -1,77 +0,0 @@ | |||||
| package event | |||||
| /* | |||||
| import ( | |||||
| "testing" | |||||
| "github.com/samber/lo" | |||||
| . "github.com/smartystreets/goconvey/convey" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| func Test_chooseSoManyNodes(t *testing.T) { | |||||
| testcases := []struct { | |||||
| title string | |||||
| allNodes []*StorageLoadInfo | |||||
| count int | |||||
| expectedHubIDs []cdssdk.HubID | |||||
| }{ | |||||
| { | |||||
| title: "节点数量充足", | |||||
| allNodes: []*StorageLoadInfo{ | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, | |||||
| }, | |||||
| count: 2, | |||||
| expectedHubIDs: []cdssdk.HubID{1, 2}, | |||||
| }, | |||||
| { | |||||
| title: "节点数量超过", | |||||
| allNodes: []*StorageLoadInfo{ | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(3)}}, | |||||
| }, | |||||
| count: 2, | |||||
| expectedHubIDs: []cdssdk.HubID{1, 2}, | |||||
| }, | |||||
| { | |||||
| title: "只有一个节点,节点数量不够", | |||||
| allNodes: []*StorageLoadInfo{ | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, | |||||
| }, | |||||
| count: 3, | |||||
| expectedHubIDs: []cdssdk.HubID{1, 1, 1}, | |||||
| }, | |||||
| { | |||||
| title: "多个同地区节点,节点数量不够", | |||||
| allNodes: []*StorageLoadInfo{ | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, | |||||
| }, | |||||
| count: 5, | |||||
| expectedHubIDs: []cdssdk.HubID{1, 1, 1, 2, 2}, | |||||
| }, | |||||
| { | |||||
| title: "节点数量不够,且在不同地区", | |||||
| allNodes: []*StorageLoadInfo{ | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(1), LocationID: cdssdk.LocationID(1)}}, | |||||
| {Storage: cdssdk.Node{HubID: cdssdk.HubID(2), LocationID: cdssdk.LocationID(2)}}, | |||||
| }, | |||||
| count: 5, | |||||
| expectedHubIDs: []cdssdk.HubID{1, 2, 1, 2, 1}, | |||||
| }, | |||||
| } | |||||
| for _, test := range testcases { | |||||
| Convey(test.title, t, func() { | |||||
| var t CheckPackageRedundancy | |||||
| chosenNodes := t.chooseSoManyNodes(test.count, test.allNodes) | |||||
| chosenHubIDs := lo.Map(chosenNodes, func(item *StorageLoadInfo, idx int) cdssdk.HubID { return item.Storage.HubID }) | |||||
| So(chosenHubIDs, ShouldResemble, test.expectedHubIDs) | |||||
| }) | |||||
| } | |||||
| } | |||||
| */ | |||||
| @@ -1,20 +0,0 @@ | |||||
| package mq | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| scmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" | |||||
| ) | |||||
| func (svc *Service) PostEvent(msg *scmq.PostEvent) { | |||||
| evt, err := event.FromMessage(msg.Event) | |||||
| if err != nil { | |||||
| logger.Warnf("create event from event message failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| svc.eventExecutor.Post(evt, event.ExecuteOption{ | |||||
| IsEmergency: msg.IsEmergency, | |||||
| DontMerge: msg.DontMerge, | |||||
| }) | |||||
| } | |||||
| @@ -1,15 +0,0 @@ | |||||
| package mq | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" | |||||
| ) | |||||
| type Service struct { | |||||
| eventExecutor *event.Executor | |||||
| } | |||||
| func NewService(eventExecutor *event.Executor) *Service { | |||||
| return &Service{ | |||||
| eventExecutor: eventExecutor, | |||||
| } | |||||
| } | |||||
| @@ -1,33 +0,0 @@ | |||||
| package tickevent | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" | |||||
| ) | |||||
| type CheckHubState struct { | |||||
| } | |||||
| func NewCheckHubState() *CheckHubState { | |||||
| return &CheckHubState{} | |||||
| } | |||||
| func (e *CheckHubState) Execute(ctx ExecuteContext) { | |||||
| log := logger.WithType[CheckHubState]("TickEvent") | |||||
| log.Debugf("begin") | |||||
| defer log.Debugf("end") | |||||
| hubs, err := ctx.Args.DB.Hub().GetAllHubs(ctx.Args.DB.DefCtx()) | |||||
| if err != nil { | |||||
| log.Warnf("get all hubs failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| for _, hub := range hubs { | |||||
| ctx.Args.EventExecutor.Post(event.NewHubCheckState(scevt.NewHubCheckState(hub.HubID)), event.ExecuteOption{ | |||||
| IsEmergency: true, | |||||
| DontMerge: true, | |||||
| }) | |||||
| } | |||||
| } | |||||
| @@ -1,24 +0,0 @@ | |||||
| package tickevent | |||||
| import ( | |||||
| tickevent "gitlink.org.cn/cloudream/common/pkgs/tickevent" | |||||
| mydb "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" | |||||
| ) | |||||
| type ExecuteArgs struct { | |||||
| EventExecutor *event.Executor | |||||
| DB *mydb.DB | |||||
| } | |||||
| type StartOption = tickevent.StartOption | |||||
| type Executor = tickevent.Executor[ExecuteArgs] | |||||
| type ExecuteContext = tickevent.ExecuteContext[ExecuteArgs] | |||||
| type Event = tickevent.TickEvent[ExecuteArgs] | |||||
| func NewExecutor(args ExecuteArgs) Executor { | |||||
| return tickevent.NewExecutor(args) | |||||
| } | |||||
| @@ -1,199 +0,0 @@ | |||||
| package main | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "os" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||||
| stgmod "gitlink.org.cn/cloudream/jcs-pub/common/models" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" | |||||
| agtrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/grpc/hub" | |||||
| scmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/mq" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/tickevent" | |||||
| ) | |||||
| func main() { | |||||
| err := config.Init() | |||||
| if err != nil { | |||||
| fmt.Printf("init config failed, err: %s", err.Error()) | |||||
| os.Exit(1) | |||||
| } | |||||
| err = logger.Init(&config.Cfg().Logger) | |||||
| if err != nil { | |||||
| fmt.Printf("init logger failed, err: %s", err.Error()) | |||||
| os.Exit(1) | |||||
| } | |||||
| db, err := db2.NewDB(&config.Cfg().DB) | |||||
| if err != nil { | |||||
| logger.Fatalf("new db failed, err: %s", err.Error()) | |||||
| } | |||||
| stgglb.InitMQPool(config.Cfg().RabbitMQ) | |||||
| stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) | |||||
| // 启动分布式锁服务 | |||||
| distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) | |||||
| if err != nil { | |||||
| logger.Warnf("new distlock service failed, err: %s", err.Error()) | |||||
| os.Exit(1) | |||||
| } | |||||
| go serveDistLock(distlockSvc) | |||||
| // 启动存储服务管理器 | |||||
| stgAgts := agtpool.NewPool() | |||||
| // 初始化系统事件发布器 | |||||
| evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceScanner{}) | |||||
| if err != nil { | |||||
| logger.Errorf("new sysevent publisher: %v", err) | |||||
| os.Exit(1) | |||||
| } | |||||
| go servePublisher(evtPub) | |||||
| // 启动事件执行器 | |||||
| eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts, evtPub) | |||||
| go serveEventExecutor(&eventExecutor) | |||||
| agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ) | |||||
| if err != nil { | |||||
| logger.Fatalf("new hub server failed, err: %s", err.Error()) | |||||
| } | |||||
| agtSvr.OnError(func(err error) { | |||||
| logger.Warnf("hub server err: %s", err.Error()) | |||||
| }) | |||||
| go serveScannerServer(agtSvr) | |||||
| tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{ | |||||
| EventExecutor: &eventExecutor, | |||||
| DB: db, | |||||
| }) | |||||
| startTickEvent(&tickExecutor) | |||||
| forever := make(chan struct{}) | |||||
| <-forever | |||||
| } | |||||
| func serveEventExecutor(executor *event.Executor) { | |||||
| logger.Info("start serving event executor") | |||||
| err := executor.Execute() | |||||
| if err != nil { | |||||
| logger.Errorf("event executor stopped with error: %s", err.Error()) | |||||
| } | |||||
| logger.Info("event executor stopped") | |||||
| // TODO 仅简单结束了程序 | |||||
| os.Exit(1) | |||||
| } | |||||
| func servePublisher(evtPub *sysevent.Publisher) { | |||||
| logger.Info("start serving sysevent publisher") | |||||
| ch := evtPub.Start() | |||||
| loop: | |||||
| for { | |||||
| val, err := ch.Receive().Wait(context.Background()) | |||||
| if err != nil { | |||||
| logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) | |||||
| break | |||||
| } | |||||
| switch val := val.(type) { | |||||
| case sysevent.PublishError: | |||||
| logger.Errorf("publishing event: %v", val) | |||||
| case sysevent.PublisherExited: | |||||
| if val.Err != nil { | |||||
| logger.Errorf("publisher exited with error: %v", val.Err) | |||||
| } else { | |||||
| logger.Info("publisher exited") | |||||
| } | |||||
| break loop | |||||
| case sysevent.OtherError: | |||||
| logger.Errorf("sysevent: %v", val) | |||||
| } | |||||
| } | |||||
| logger.Info("sysevent publisher stopped") | |||||
| // TODO 仅简单结束了程序 | |||||
| os.Exit(1) | |||||
| } | |||||
| func serveScannerServer(server *scmq.Server) { | |||||
| logger.Info("start serving scanner server") | |||||
| ch := server.Start() | |||||
| if ch == nil { | |||||
| logger.Errorf("RabbitMQ logEvent is nil") | |||||
| os.Exit(1) | |||||
| } | |||||
| for { | |||||
| val, err := ch.Receive() | |||||
| if err != nil { | |||||
| logger.Errorf("command server stopped with error: %s", err.Error()) | |||||
| break | |||||
| } | |||||
| switch val := val.(type) { | |||||
| case error: | |||||
| logger.Errorf("rabbitmq connect with error: %v", val) | |||||
| case int: | |||||
| if val == 1 { | |||||
| break | |||||
| } | |||||
| } | |||||
| } | |||||
| logger.Info("command server stopped") | |||||
| // TODO 仅简单结束了程序 | |||||
| os.Exit(1) | |||||
| } | |||||
| func serveDistLock(svc *distlock.Service) { | |||||
| logger.Info("start serving distlock") | |||||
| err := svc.Serve() | |||||
| if err != nil { | |||||
| logger.Errorf("distlock stopped with error: %s", err.Error()) | |||||
| } | |||||
| logger.Info("distlock stopped") | |||||
| // TODO 仅简单结束了程序 | |||||
| os.Exit(1) | |||||
| } | |||||
| func startTickEvent(tickExecutor *tickevent.Executor) { | |||||
| // TODO 可以考虑增加配置文件,配置这些任务间隔时间 | |||||
| interval := 5 * 60 * 1000 | |||||
| tickExecutor.Start(tickevent.NewBatchAllHubCheckShardStore(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewStorageGC(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewCheckHubState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewUpdateAllPackageAccessStatAmount(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) | |||||
| } | |||||