diff --git a/utils/sync2/bucket_pool.go b/utils/sync2/bucket_pool.go index 2551a42..f5fa633 100644 --- a/utils/sync2/bucket_pool.go +++ b/utils/sync2/bucket_pool.go @@ -7,6 +7,7 @@ type BucketPool[T any] struct { filled []T emptyCond *sync.Cond filledCond *sync.Cond + closed bool } func NewBucketPool[T any]() *BucketPool[T] { @@ -20,6 +21,11 @@ func (p *BucketPool[T]) GetEmpty() (T, bool) { p.emptyCond.L.Lock() defer p.emptyCond.L.Unlock() + if p.closed { + var t T + return t, false + } + if len(p.empty) == 0 { p.emptyCond.Wait() } @@ -46,6 +52,11 @@ func (p *BucketPool[T]) GetFilled() (T, bool) { p.filledCond.L.Lock() defer p.filledCond.L.Unlock() + if p.closed { + var t T + return t, false + } + if len(p.filled) == 0 { p.filledCond.Wait() } @@ -68,7 +79,17 @@ func (p *BucketPool[T]) PutFilled(t T) { p.filledCond.Signal() } -func (p *BucketPool[T]) WakeUpAll() { +func (p *BucketPool[T]) Close() { + // 在两个锁中分别关闭,防止变量可见性问题 + + p.emptyCond.L.Lock() + p.closed = true + p.emptyCond.L.Unlock() + + p.filledCond.L.Lock() + p.closed = true + p.filledCond.L.Unlock() + p.emptyCond.Broadcast() p.filledCond.Broadcast() }