You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 1.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package task
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/zyedidia/generic/list"
  6. mydb "gitlink.org.cn/cloudream/db"
  7. "golang.org/x/sync/semaphore"
  8. )
  9. type ExecuteOption struct {
  10. IsEmergency bool
  11. DontMerge bool
  12. }
  13. type ExecuteContext struct {
  14. Executor *Executor
  15. DB *mydb.DB
  16. }
  17. type postedTask struct {
  18. Task Task
  19. Option ExecuteOption
  20. }
  21. type Executor struct {
  22. tasks list.List[postedTask]
  23. locker sync.Mutex
  24. taskSema semaphore.Weighted
  25. execCtx *ExecuteContext
  26. }
  27. func (e *Executor) Post(task Task, opts ...ExecuteOption) {
  28. opt := ExecuteOption{
  29. IsEmergency: false,
  30. DontMerge: false,
  31. }
  32. if len(opts) > 0 {
  33. opt = opts[0]
  34. }
  35. e.locker.Lock()
  36. defer e.locker.Unlock()
  37. // 紧急任务直接插入到队头,不进行合并
  38. if opt.IsEmergency {
  39. e.tasks.PushFront(postedTask{
  40. Task: task,
  41. Option: opt,
  42. })
  43. e.taskSema.Release(1)
  44. return
  45. }
  46. // 合并任务
  47. if opt.DontMerge {
  48. ptr := e.tasks.Front
  49. for ptr != nil {
  50. // 只与非紧急任务,且允许合并的任务进行合并
  51. if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge {
  52. if ptr.Value.Task.TryMerge(task) {
  53. return
  54. }
  55. }
  56. ptr = ptr.Next
  57. }
  58. }
  59. e.tasks.PushBack(postedTask{
  60. Task: task,
  61. Option: opt,
  62. })
  63. e.taskSema.Release(1)
  64. }
  65. // Execute 开始执行任务
  66. func (e *Executor) Execute() error {
  67. for {
  68. // TODO 打印错误日志
  69. e.taskSema.Acquire(context.Background(), 1)
  70. task := e.popFrontTask()
  71. if task == nil {
  72. continue
  73. }
  74. task.Task.Execute(e.execCtx, task.Option)
  75. }
  76. }
  77. func (e *Executor) popFrontTask() *postedTask {
  78. e.locker.Lock()
  79. defer e.locker.Unlock()
  80. if e.tasks.Front == nil {
  81. return nil
  82. }
  83. return &e.tasks.Front.Value
  84. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。