You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 1.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package tickevent
  2. import (
  3. "math/rand"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type StartOption struct {
  8. RandomStartDelayMs int // 第一次任务启动前增加一段随机时长的延迟,随机的最长延迟时间由这个参数指定。如果为0,则不进行延迟。
  9. }
  10. type ExecuteContext[TArgs any] struct {
  11. Executor *Executor[TArgs]
  12. Self *EventTicker[TArgs]
  13. Args TArgs
  14. }
  15. type EventTicker[TArgs any] struct {
  16. event TickEvent[TArgs]
  17. intervalMs int
  18. doneChan chan int
  19. done atomic.Bool
  20. }
  21. type Executor[TArgs any] struct {
  22. execArgs TArgs
  23. }
  24. func NewExecutor[TArgs any](args TArgs) Executor[TArgs] {
  25. return Executor[TArgs]{
  26. execArgs: args,
  27. }
  28. }
  29. func (e *Executor[TArgs]) Start(event TickEvent[TArgs], intervalMs int, opts ...StartOption) EventTicker[TArgs] {
  30. opt := StartOption{}
  31. if len(opts) > 0 {
  32. opt = opts[0]
  33. }
  34. ticker := EventTicker[TArgs]{
  35. event: event,
  36. intervalMs: intervalMs,
  37. doneChan: make(chan int),
  38. done: atomic.Bool{},
  39. }
  40. ticker.done.Store(false)
  41. go func() {
  42. if opt.RandomStartDelayMs > 0 {
  43. <-time.After(time.Duration(rand.Intn(opt.RandomStartDelayMs)) * time.Millisecond)
  44. }
  45. timeTicker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
  46. loop:
  47. for {
  48. select {
  49. case <-timeTicker.C:
  50. if ticker.done.Load() {
  51. break loop
  52. }
  53. execCtx := ExecuteContext[TArgs]{
  54. Executor: e,
  55. Self: &ticker,
  56. Args: e.execArgs,
  57. }
  58. event.Execute(execCtx)
  59. case <-ticker.doneChan:
  60. break loop
  61. }
  62. }
  63. timeTicker.Stop()
  64. }()
  65. return ticker
  66. }
  67. func (e *Executor[TArgs]) Stop(ticker EventTicker[TArgs]) {
  68. ticker.done.Store(true)
  69. close(ticker.doneChan)
  70. // 保证在调用此函数结束后,事件不会再被调用
  71. }

公共库

Contributors (1)