|
- package spacesyncer
-
- import (
- "time"
-
- "gitlink.org.cn/cloudream/common/pkgs/logger"
- "gitlink.org.cn/cloudream/common/utils/sort2"
- jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
- )
-
- func triggerOnce(syncer *SpaceSyncer, task *task) {
- go func() {
- log := logger.WithField("Mod", logMod)
- execute(syncer, task)
-
- syncer.lock.Lock()
- defer syncer.lock.Unlock()
-
- tsk := syncer.tasks[task.Task.TaskID]
- if tsk == nil {
- return
- }
-
- tsk.CancelFn()
- delete(syncer.tasks, task.Task.TaskID)
-
- err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID)
- if err != nil {
- log.Warnf("delete task %v from db: %v", task.Task.TaskID, err)
- }
- }()
- }
-
- func triggerInterval(syncer *SpaceSyncer, task *task, trigger *jcstypes.SpaceSyncTriggerInterval) {
- go func() {
- log := logger.WithField("Mod", logMod)
-
- ticker := time.NewTicker(time.Duration(trigger.Interval) * time.Second)
- defer ticker.Stop()
-
- loop:
- for {
- select {
- case <-ticker.C:
- execute(syncer, task)
- case <-task.Context.Done():
- break loop
- }
- }
-
- syncer.lock.Lock()
- defer syncer.lock.Unlock()
-
- tsk := syncer.tasks[task.Task.TaskID]
- if tsk == nil {
- return
- }
-
- tsk.CancelFn()
- delete(syncer.tasks, task.Task.TaskID)
-
- err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID)
- if err != nil {
- log.Warnf("delete task %v from db: %v", task.Task.TaskID, err)
- }
- }()
- }
-
- func triggerAt(syncer *SpaceSyncer, task *task, trigger *jcstypes.SpaceSyncTriggerAt) {
- go func() {
- log := logger.WithField("Mod", logMod)
-
- atTimes := sort2.Sort(trigger.At, func(l, r time.Time) int {
- return l.Compare(r)
- })
-
- loop:
- for _, at := range atTimes {
- nowTime := time.Now()
- if nowTime.After(at) {
- continue
- }
-
- select {
- case <-time.After(at.Sub(nowTime)):
- execute(syncer, task)
-
- case <-task.Context.Done():
- break loop
- }
- }
-
- syncer.lock.Lock()
- defer syncer.lock.Unlock()
-
- tsk := syncer.tasks[task.Task.TaskID]
- if tsk == nil {
- return
- }
-
- tsk.CancelFn()
- delete(syncer.tasks, task.Task.TaskID)
-
- err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID)
- if err != nil {
- log.Warnf("delete task %v from db: %v", task.Task.TaskID, err)
- }
- }()
- }
|