| @@ -22,7 +22,6 @@ import ( | |||||
| "github.com/zeromicro/go-zero/core/threading" | "github.com/zeromicro/go-zero/core/threading" | ||||
| "k8s.io/apimachinery/pkg/util/json" | "k8s.io/apimachinery/pkg/util/json" | ||||
| "sync" | "sync" | ||||
| "time" | |||||
| ) | ) | ||||
| var InsQueue *workQueue | var InsQueue *workQueue | ||||
| @@ -117,6 +116,20 @@ func (b *Beta) Add(item interface{}) { | |||||
| b.queue = append(b.queue, item) | b.queue = append(b.queue, item) | ||||
| b.cond.Signal() | b.cond.Signal() | ||||
| } | } | ||||
| func (b *Beta) Done(item interface{}) { | |||||
| b.cond.L.Lock() | |||||
| defer b.cond.L.Unlock() | |||||
| b.processing.delete(item) | |||||
| if b.dirty.has(item) { | |||||
| b.queue = append(b.queue, item) | |||||
| b.cond.Signal() | |||||
| } else if b.processing.len() == 0 { | |||||
| b.cond.Signal() | |||||
| } | |||||
| } | |||||
| func (w *workQueue) Start() { | func (w *workQueue) Start() { | ||||
| w.startConsumers() | w.startConsumers() | ||||
| w.consumerRoutines.Wait() | w.consumerRoutines.Wait() | ||||
| @@ -140,16 +153,14 @@ func (w *workQueue) startConsumers() { | |||||
| w.consumerRoutines.Run(func() { | w.consumerRoutines.Run(func() { | ||||
| for { | for { | ||||
| item := w.Beta.Get() | item := w.Beta.Get() | ||||
| println("开始消费 ") | |||||
| if item != nil { | if item != nil { | ||||
| bytes, err := json.Marshal(item) | bytes, err := json.Marshal(item) | ||||
| if err != nil { | if err != nil { | ||||
| return | return | ||||
| } | } | ||||
| w.consumeOne(string(bytes)) | w.consumeOne(string(bytes)) | ||||
| println("开始消费3") | |||||
| } | } | ||||
| time.Sleep(1 * time.Second) | |||||
| w.Beta.Done(item) | |||||
| } | } | ||||
| }) | }) | ||||