You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 2.1 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package event
  2. import (
  3. "sync"
  4. "github.com/zyedidia/generic/list"
  5. mysync "gitlink.org.cn/cloudream/common/utils/sync"
  6. )
  7. type ExecuteOption struct {
  8. IsEmergency bool
  9. DontMerge bool
  10. }
  11. type ExecuteContext[TArgs any] struct {
  12. Executor *Executor[TArgs]
  13. Option ExecuteOption
  14. Args TArgs
  15. }
  16. type postedEvent[TArgs any] struct {
  17. Event Event[TArgs]
  18. Option ExecuteOption
  19. }
  20. type Executor[TArgs any] struct {
  21. events *list.List[postedEvent[TArgs]]
  22. locker sync.Mutex
  23. eventCond *mysync.CounterCond
  24. execArgs TArgs
  25. }
  26. func NewExecutor[TArgs any](args TArgs) Executor[TArgs] {
  27. return Executor[TArgs]{
  28. events: list.New[postedEvent[TArgs]](),
  29. locker: sync.Mutex{},
  30. eventCond: mysync.NewCounterCond(0),
  31. execArgs: args,
  32. }
  33. }
  34. func (e *Executor[TArgs]) Post(event Event[TArgs], opts ...ExecuteOption) {
  35. opt := ExecuteOption{
  36. IsEmergency: false,
  37. DontMerge: false,
  38. }
  39. if len(opts) > 0 {
  40. opt = opts[0]
  41. }
  42. e.locker.Lock()
  43. defer e.locker.Unlock()
  44. // 紧急任务直接插入到队头,不进行合并
  45. if opt.IsEmergency {
  46. e.events.PushFront(postedEvent[TArgs]{
  47. Event: event,
  48. Option: opt,
  49. })
  50. e.eventCond.Release()
  51. return
  52. }
  53. // 合并任务
  54. if opt.DontMerge {
  55. ptr := e.events.Front
  56. for ptr != nil {
  57. // 只与非紧急任务,且允许合并的任务进行合并
  58. if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge {
  59. if ptr.Value.Event.TryMerge(event) {
  60. return
  61. }
  62. }
  63. ptr = ptr.Next
  64. }
  65. }
  66. e.events.PushBack(postedEvent[TArgs]{
  67. Event: event,
  68. Option: opt,
  69. })
  70. e.eventCond.Release()
  71. }
  72. // Execute 开始执行任务
  73. func (e *Executor[TArgs]) Execute() error {
  74. for {
  75. e.eventCond.Wait()
  76. event := e.popFrontEvent()
  77. if event == nil {
  78. continue
  79. }
  80. ctx := ExecuteContext[TArgs]{
  81. Executor: e,
  82. Option: event.Option,
  83. Args: e.execArgs,
  84. }
  85. event.Event.Execute(ctx)
  86. }
  87. }
  88. func (e *Executor[TArgs]) popFrontEvent() *postedEvent[TArgs] {
  89. e.locker.Lock()
  90. defer e.locker.Unlock()
  91. if e.events.Front == nil {
  92. return nil
  93. }
  94. return &e.events.Front.Value
  95. }

公共库

Contributors (1)