|
- package sync2
-
- import "sync"
-
- type BucketPool[T any] struct {
- empty []T
- filled []T
- emptyCond *sync.Cond
- filledCond *sync.Cond
- closed bool
- }
-
- func NewBucketPool[T any]() *BucketPool[T] {
- return &BucketPool[T]{
- emptyCond: sync.NewCond(&sync.Mutex{}),
- filledCond: sync.NewCond(&sync.Mutex{}),
- }
- }
-
- func (p *BucketPool[T]) GetEmpty() (T, bool) {
- p.emptyCond.L.Lock()
- defer p.emptyCond.L.Unlock()
-
- if p.closed {
- var t T
- return t, false
- }
-
- if len(p.empty) == 0 {
- p.emptyCond.Wait()
- }
-
- if len(p.empty) == 0 {
- var t T
- return t, false
- }
-
- t := p.empty[0]
- p.empty = p.empty[1:]
- return t, true
- }
-
- func (p *BucketPool[T]) PutEmpty(t T) {
- p.emptyCond.L.Lock()
- defer p.emptyCond.L.Unlock()
-
- p.empty = append(p.empty, t)
- p.emptyCond.Signal()
- }
-
- func (p *BucketPool[T]) GetFilled() (T, bool) {
- p.filledCond.L.Lock()
- defer p.filledCond.L.Unlock()
-
- if len(p.filled) == 0 {
- if p.closed {
- var t T
- return t, false
- }
-
- p.filledCond.Wait()
- }
-
- if len(p.filled) == 0 {
- var t T
- return t, false
- }
-
- t := p.filled[0]
- p.filled = p.filled[1:]
- return t, true
- }
-
- func (p *BucketPool[T]) PutFilled(t T) {
- p.filledCond.L.Lock()
- defer p.filledCond.L.Unlock()
-
- p.filled = append(p.filled, t)
- p.filledCond.Signal()
- }
-
- func (p *BucketPool[T]) Close() {
- // 在两个锁中分别关闭,防止变量可见性问题
-
- p.emptyCond.L.Lock()
- p.closed = true
- p.emptyCond.L.Unlock()
-
- p.filledCond.L.Lock()
- p.closed = true
- p.filledCond.L.Unlock()
-
- p.emptyCond.Broadcast()
- p.filledCond.Broadcast()
- }
|