You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.go 3.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. distlocksvc "gitlink.org.cn/cloudream/common/pkg/distlock/service"
  7. log "gitlink.org.cn/cloudream/common/pkg/logger"
  8. "gitlink.org.cn/cloudream/db"
  9. scsvr "gitlink.org.cn/cloudream/rabbitmq/server/scanner"
  10. "gitlink.org.cn/cloudream/scanner/internal/config"
  11. "gitlink.org.cn/cloudream/scanner/internal/event"
  12. "gitlink.org.cn/cloudream/scanner/internal/services"
  13. "gitlink.org.cn/cloudream/scanner/internal/tickevent"
  14. )
  15. func main() {
  16. err := config.Init()
  17. if err != nil {
  18. fmt.Printf("init config failed, err: %s", err.Error())
  19. os.Exit(1)
  20. }
  21. err = log.Init(&config.Cfg().Logger)
  22. if err != nil {
  23. fmt.Printf("init logger failed, err: %s", err.Error())
  24. os.Exit(1)
  25. }
  26. db, err := db.NewDB(&config.Cfg().DB)
  27. if err != nil {
  28. log.Fatalf("new db failed, err: %s", err.Error())
  29. }
  30. wg := sync.WaitGroup{}
  31. wg.Add(3)
  32. distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock)
  33. if err != nil {
  34. log.Warnf("new distlock service failed, err: %s", err.Error())
  35. os.Exit(1)
  36. }
  37. go serveDistLock(distlockSvc, &wg)
  38. eventExecutor := event.NewExecutor(db, distlockSvc)
  39. go serveEventExecutor(&eventExecutor, &wg)
  40. agtSvr, err := scsvr.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
  41. if err != nil {
  42. log.Fatalf("new agent server failed, err: %s", err.Error())
  43. }
  44. agtSvr.OnError = func(err error) {
  45. log.Warnf("agent server err: %s", err.Error())
  46. }
  47. go serveScannerServer(agtSvr, &wg)
  48. tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{
  49. EventExecutor: &eventExecutor,
  50. DB: db,
  51. })
  52. startTickEvent(&tickExecutor)
  53. wg.Wait()
  54. }
  55. func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {
  56. log.Info("start serving event executor")
  57. err := executor.Execute()
  58. if err != nil {
  59. log.Errorf("event executor stopped with error: %s", err.Error())
  60. }
  61. log.Info("event executor stopped")
  62. wg.Done()
  63. }
  64. func serveScannerServer(server *scsvr.Server, wg *sync.WaitGroup) {
  65. log.Info("start serving scanner server")
  66. err := server.Serve()
  67. if err != nil {
  68. log.Errorf("scanner server stopped with error: %s", err.Error())
  69. }
  70. log.Info("scanner server stopped")
  71. wg.Done()
  72. }
  73. func serveDistLock(svc *distlocksvc.Service, wg *sync.WaitGroup) {
  74. log.Info("start serving distlock")
  75. err := svc.Serve()
  76. if err != nil {
  77. log.Errorf("distlock stopped with error: %s", err.Error())
  78. }
  79. log.Info("distlock stopped")
  80. wg.Done()
  81. }
  82. func startTickEvent(tickExecutor *tickevent.Executor) {
  83. // TODO 可以考虑增加配置文件,配置这些任务间隔时间
  84. interval := 5 * 60 * 1000
  85. tickExecutor.Start(tickevent.NewBatchAllAgentCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  86. tickExecutor.Start(tickevent.NewBatchCheckAllObject(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  87. tickExecutor.Start(tickevent.NewBatchCheckAllRepCount(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  88. tickExecutor.Start(tickevent.NewBatchCheckAllStorage(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  89. tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  90. tickExecutor.Start(tickevent.NewCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})
  91. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。