| @@ -7,6 +7,7 @@ type BucketPool[T any] struct { | |||||
| filled []T | filled []T | ||||
| emptyCond *sync.Cond | emptyCond *sync.Cond | ||||
| filledCond *sync.Cond | filledCond *sync.Cond | ||||
| closed bool | |||||
| } | } | ||||
| func NewBucketPool[T any]() *BucketPool[T] { | func NewBucketPool[T any]() *BucketPool[T] { | ||||
| @@ -20,6 +21,11 @@ func (p *BucketPool[T]) GetEmpty() (T, bool) { | |||||
| p.emptyCond.L.Lock() | p.emptyCond.L.Lock() | ||||
| defer p.emptyCond.L.Unlock() | defer p.emptyCond.L.Unlock() | ||||
| if p.closed { | |||||
| var t T | |||||
| return t, false | |||||
| } | |||||
| if len(p.empty) == 0 { | if len(p.empty) == 0 { | ||||
| p.emptyCond.Wait() | p.emptyCond.Wait() | ||||
| } | } | ||||
| @@ -46,6 +52,11 @@ func (p *BucketPool[T]) GetFilled() (T, bool) { | |||||
| p.filledCond.L.Lock() | p.filledCond.L.Lock() | ||||
| defer p.filledCond.L.Unlock() | defer p.filledCond.L.Unlock() | ||||
| if p.closed { | |||||
| var t T | |||||
| return t, false | |||||
| } | |||||
| if len(p.filled) == 0 { | if len(p.filled) == 0 { | ||||
| p.filledCond.Wait() | p.filledCond.Wait() | ||||
| } | } | ||||
| @@ -68,7 +79,17 @@ func (p *BucketPool[T]) PutFilled(t T) { | |||||
| p.filledCond.Signal() | p.filledCond.Signal() | ||||
| } | } | ||||
| func (p *BucketPool[T]) WakeUpAll() { | |||||
| func (p *BucketPool[T]) Close() { | |||||
| // 在两个锁中分别关闭,防止变量可见性问题 | |||||
| p.emptyCond.L.Lock() | |||||
| p.closed = true | |||||
| p.emptyCond.L.Unlock() | |||||
| p.filledCond.L.Lock() | |||||
| p.closed = true | |||||
| p.filledCond.L.Unlock() | |||||
| p.emptyCond.Broadcast() | p.emptyCond.Broadcast() | ||||
| p.filledCond.Broadcast() | p.filledCond.Broadcast() | ||||
| } | } | ||||