diff --git a/consts/task/task.go b/consts/event/event.go similarity index 90% rename from consts/task/task.go rename to consts/event/event.go index cc3d31c..d4ca515 100644 --- a/consts/task/task.go +++ b/consts/event/event.go @@ -1,4 +1,4 @@ -package task +package event const ( UPDATE_CACHE_OP_UNTEMP = "UnTemp" diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 0000000..cc4d853 --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,6 @@ +package event + +type Event[TArgs any] interface { + TryMerge(other Event[TArgs]) bool // 尝试将other任务与自身合并,如果成功返回true + Execute(ctx ExecuteContext[TArgs]) +} diff --git a/pkg/event/executor.go b/pkg/event/executor.go new file mode 100644 index 0000000..1245a1d --- /dev/null +++ b/pkg/event/executor.go @@ -0,0 +1,114 @@ +package event + +import ( + "sync" + + "github.com/zyedidia/generic/list" + mysync "gitlink.org.cn/cloudream/common/utils/sync" +) + +type ExecuteOption struct { + IsEmergency bool + DontMerge bool +} +type ExecuteContext[TArgs any] struct { + Executor *Executor[TArgs] + Option ExecuteOption + Args TArgs +} +type postedEvent[TArgs any] struct { + Event Event[TArgs] + Option ExecuteOption +} + +type Executor[TArgs any] struct { + events *list.List[postedEvent[TArgs]] + locker sync.Mutex + eventCond *mysync.CounterCond + execArgs TArgs +} + +func NewExecutor[TArgs any](args TArgs) Executor[TArgs] { + return Executor[TArgs]{ + events: list.New[postedEvent[TArgs]](), + locker: sync.Mutex{}, + eventCond: mysync.NewCounterCond(0), + execArgs: args, + } +} + +func (e *Executor[TArgs]) Post(event Event[TArgs], opts ...ExecuteOption) { + opt := ExecuteOption{ + IsEmergency: false, + DontMerge: false, + } + + if len(opts) > 0 { + opt = opts[0] + } + + e.locker.Lock() + defer e.locker.Unlock() + + // 紧急任务直接插入到队头,不进行合并 + if opt.IsEmergency { + e.events.PushFront(postedEvent[TArgs]{ + Event: event, + Option: opt, + }) + e.eventCond.Release() + return + } + + // 合并任务 + if opt.DontMerge { + ptr := e.events.Front + for ptr != nil { + // 只与非紧急任务,且允许合并的任务进行合并 + if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge { + if ptr.Value.Event.TryMerge(event) { + return + } + } + + ptr = ptr.Next + } + } + + e.events.PushBack(postedEvent[TArgs]{ + Event: event, + Option: opt, + }) + e.eventCond.Release() +} + +// Execute 开始执行任务 +func (e *Executor[TArgs]) Execute() error { + for { + e.eventCond.Wait() + + event := e.popFrontEvent() + if event == nil { + continue + } + + ctx := ExecuteContext[TArgs]{ + Executor: e, + Option: event.Option, + Args: e.execArgs, + } + + event.Event.Execute(ctx) + } +} + +func (e *Executor[TArgs]) popFrontEvent() *postedEvent[TArgs] { + e.locker.Lock() + defer e.locker.Unlock() + + if e.events.Front == nil { + return nil + } + + return &e.events.Front.Value +} diff --git a/utils/sync/counter_cond.go b/utils/sync/counter_cond.go new file mode 100644 index 0000000..1500b0e --- /dev/null +++ b/utils/sync/counter_cond.go @@ -0,0 +1,32 @@ +package sync + +import "sync" + +type CounterCond struct { + count int + cond *sync.Cond +} + +func NewCounterCond(initCount int) *CounterCond { + return &CounterCond{ + count: initCount, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (c *CounterCond) Wait() { + c.cond.L.Lock() + + for c.count == 0 { + c.cond.Wait() + } + + c.count-- + + c.cond.L.Unlock() +} + +func (c *CounterCond) Release() { + c.count++ + c.cond.Signal() +}