You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

space_syncer.go 4.3 kB


  1. package spacesyncer
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "gitlink.org.cn/cloudream/common/pkgs/async"
  7. "gitlink.org.cn/cloudream/common/pkgs/logger"
  8. "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
  9. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  10. "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache"
  11. stgpool "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
  12. jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
  13. )
  14. const (
  15. logMod = "SpaceSyncer"
  16. )
  17. type SpaceSyncerEvent interface {
  18. IsSpaceSyncerEvent() bool
  19. }
  20. type SpaceSyncer struct {
  21. db *db.DB
  22. stgPool *stgpool.Pool
  23. spaceMeta *metacache.UserSpaceMeta
  24. cluster *cluster.Cluster
  25. lock sync.Mutex
  26. tasks map[jcstypes.SpaceSyncTaskID]*task
  27. }
  28. func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta, cluster *cluster.Cluster) *SpaceSyncer {
  29. return &SpaceSyncer{
  30. db: db,
  31. stgPool: stgPool,
  32. spaceMeta: spaceMeta,
  33. cluster: cluster,
  34. tasks: make(map[jcstypes.SpaceSyncTaskID]*task),
  35. }
  36. }
  37. func (s *SpaceSyncer) Start() *async.UnboundChannel[SpaceSyncerEvent] {
  38. s.lock.Lock()
  39. defer s.lock.Unlock()
  40. log := logger.WithField("Mod", logMod)
  41. ch := async.NewUnboundChannel[SpaceSyncerEvent]()
  42. allTask, err := db.DoTx01(s.db, s.db.SpaceSyncTask().GetAll)
  43. if err != nil {
  44. log.Warnf("load task from db: %v", err)
  45. } else {
  46. var rms []jcstypes.SpaceSyncTaskID
  47. for _, t := range allTask {
  48. ctx, cancel := context.WithCancel(context.Background())
  49. tsk := task{
  50. Task: t,
  51. Context: ctx,
  52. CancelFn: cancel,
  53. }
  54. switch tr := t.Trigger.(type) {
  55. case *jcstypes.SpaceSyncTriggerOnce:
  56. // Once类型的任务没有执行完也不执行了
  57. rms = append(rms, t.TaskID)
  58. case *jcstypes.SpaceSyncTriggerInterval:
  59. triggerInterval(s, &tsk, tr)
  60. case *jcstypes.SpaceSyncTriggerAt:
  61. triggerAt(s, &tsk, tr)
  62. }
  63. log.Infof("load task %v from db", t.TaskID)
  64. }
  65. if len(rms) > 0 {
  66. err := s.db.SpaceSyncTask().BatchDelete(s.db.DefCtx(), rms)
  67. if err != nil {
  68. log.Warnf("batch delete task: %v", err)
  69. } else {
  70. log.Infof("%v once task deleted", len(rms))
  71. }
  72. }
  73. }
  74. return ch
  75. }
  76. func (s *SpaceSyncer) Stop() {
  77. s.lock.Lock()
  78. defer s.lock.Unlock()
  79. if !s.cluster.IsMaster() {
  80. return
  81. }
  82. for _, t := range s.tasks {
  83. t.CancelFn()
  84. }
  85. s.tasks = make(map[jcstypes.SpaceSyncTaskID]*task)
  86. }
  87. func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) {
  88. log := logger.WithField("Mod", logMod)
  89. if !s.cluster.IsMaster() {
  90. return nil, fmt.Errorf("not master, create task aborted")
  91. }
  92. d := s.db
  93. err := d.DoTx(func(tx db.SQLContext) error {
  94. err := d.SpaceSyncTask().Create(tx, &t)
  95. if err != nil {
  96. return err
  97. }
  98. return nil
  99. })
  100. if err != nil {
  101. return nil, fmt.Errorf("creating space sync task: %w", err)
  102. }
  103. ctx, cancel := context.WithCancel(context.Background())
  104. tsk := task{
  105. Task: t,
  106. Context: ctx,
  107. CancelFn: cancel,
  108. }
  109. s.lock.Lock()
  110. s.tasks[t.TaskID] = &tsk
  111. s.lock.Unlock()
  112. switch tr := t.Trigger.(type) {
  113. case *jcstypes.SpaceSyncTriggerOnce:
  114. triggerOnce(s, &tsk)
  115. case *jcstypes.SpaceSyncTriggerInterval:
  116. triggerInterval(s, &tsk, tr)
  117. case *jcstypes.SpaceSyncTriggerAt:
  118. triggerAt(s, &tsk, tr)
  119. }
  120. log.Infof("task %v created", t.TaskID)
  121. return &TaskInfo{
  122. Task: t,
  123. }, nil
  124. }
  125. func (s *SpaceSyncer) CancelTask(taskID jcstypes.SpaceSyncTaskID) {
  126. log := logger.WithField("Mod", logMod)
  127. if !s.cluster.IsMaster() {
  128. return
  129. }
  130. s.lock.Lock()
  131. defer s.lock.Unlock()
  132. t := s.tasks[taskID]
  133. if t == nil {
  134. log.Infof("task %v not found, cancel aborted", taskID)
  135. return
  136. }
  137. t.CancelFn()
  138. delete(s.tasks, taskID)
  139. err := s.db.SpaceSyncTask().Delete(s.db.DefCtx(), taskID)
  140. if err != nil {
  141. log.Warnf("delete task %v from db: %v", taskID, err)
  142. }
  143. log.Infof("task %v canceled", taskID)
  144. }
  145. func (s *SpaceSyncer) GetTask(taskID jcstypes.SpaceSyncTaskID) *jcstypes.SpaceSyncTask {
  146. s.lock.Lock()
  147. defer s.lock.Unlock()
  148. if !s.cluster.IsMaster() {
  149. return nil
  150. }
  151. tsk := s.tasks[taskID]
  152. if tsk == nil {
  153. return nil
  154. }
  155. // TODO 考虑复制一份状态,防止修改
  156. t := tsk.Task
  157. return &t
  158. }
  159. type TaskInfo struct {
  160. Task jcstypes.SpaceSyncTask
  161. }
  162. type task struct {
  163. Task jcstypes.SpaceSyncTask
  164. Context context.Context
  165. CancelFn func()
  166. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。