You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 4.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package mqs
  13. import (
  14. "fmt"
  15. "github.com/pkg/errors"
  16. "github.com/zeromicro/go-zero/core/queue"
  17. "github.com/zeromicro/go-zero/core/service"
  18. "github.com/zeromicro/go-zero/core/threading"
  19. "k8s.io/apimachinery/pkg/util/json"
  20. "sync"
  21. )
  22. var InsQueue *workQueue
  23. type (
  24. ConsumeHandle func(v string) error
  25. ConsumeHandler interface {
  26. Consume(value string) error
  27. }
  28. workQueues struct {
  29. queues []queue.MessageQueue
  30. group *service.ServiceGroup
  31. }
  32. workQueue struct {
  33. topic string
  34. Beta *Beta
  35. handler ConsumeHandler
  36. consumerRoutines *threading.RoutineGroup
  37. }
  38. Beta struct {
  39. queue []t
  40. // dirty defines all of the items that need to be processed.
  41. dirty set
  42. // Things that are currently being processed are in the processing set.
  43. // These things may be simultaneously in the dirty set. When we finish
  44. // processing something and remove it from this set, we'll check if
  45. // it's in the dirty set, and if so, add it to the queue.
  46. processing set
  47. cond *sync.Cond
  48. }
  49. empty struct{}
  50. t interface{}
  51. set map[t]empty
  52. )
  53. func (s set) has(item t) bool {
  54. _, exists := s[item]
  55. return exists
  56. }
  57. func (s set) insert(item t) {
  58. s[item] = empty{}
  59. }
  60. func (s set) delete(item t) {
  61. delete(s, item)
  62. }
  63. func (s set) len() int {
  64. return len(s)
  65. }
  66. func (b *Beta) Get() (item interface{}) {
  67. b.cond.L.Lock()
  68. defer b.cond.L.Unlock()
  69. for len(b.queue) == 0 {
  70. b.cond.Wait()
  71. }
  72. if len(b.queue) == 0 {
  73. // We must be shutting down.
  74. return nil
  75. }
  76. item = b.queue[0]
  77. // The underlying array still exists and reference this object, so the object will not be garbage collected.
  78. b.queue[0] = nil
  79. b.queue = b.queue[1:]
  80. b.processing.insert(item)
  81. b.dirty.delete(item)
  82. return item
  83. }
  84. func (b *Beta) Add(item interface{}) {
  85. b.cond.L.Lock()
  86. defer b.cond.L.Unlock()
  87. if b.dirty.has(item) {
  88. return
  89. }
  90. b.dirty.insert(item)
  91. if b.processing.has(item) {
  92. return
  93. }
  94. b.queue = append(b.queue, item)
  95. b.cond.Signal()
  96. }
  97. func (b *Beta) Done(item interface{}) {
  98. b.cond.L.Lock()
  99. defer b.cond.L.Unlock()
  100. b.processing.delete(item)
  101. if b.dirty.has(item) {
  102. b.queue = append(b.queue, item)
  103. b.cond.Signal()
  104. } else if b.processing.len() == 0 {
  105. b.cond.Signal()
  106. }
  107. }
  108. func (w *workQueue) Start() {
  109. w.startConsumers()
  110. w.consumerRoutines.Wait()
  111. }
  112. func (w *workQueue) Stop() {
  113. }
  114. func (w workQueues) Start() {
  115. for _, each := range w.queues {
  116. w.group.Add(each)
  117. }
  118. w.group.Start()
  119. }
  120. func (w workQueues) Stop() {
  121. w.group.Stop()
  122. }
  123. func (w *workQueue) startConsumers() {
  124. w.consumerRoutines.Run(func() {
  125. for {
  126. item := w.Beta.Get()
  127. if item != nil {
  128. bytes, err := json.Marshal(item)
  129. if err != nil {
  130. return
  131. }
  132. w.consumeOne(string(bytes))
  133. }
  134. w.Beta.Done(item)
  135. }
  136. })
  137. }
  138. func (w *workQueue) consumeOne(value string) error {
  139. err := w.handler.Consume(value)
  140. return err
  141. }
  142. func newWorkQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
  143. wq := &workQueue{
  144. topic: topic,
  145. Beta: &Beta{
  146. dirty: set{},
  147. processing: set{},
  148. cond: sync.NewCond(&sync.Mutex{}),
  149. },
  150. consumerRoutines: threading.NewRoutineGroup(),
  151. handler: handler}
  152. InsQueue = wq
  153. return wq
  154. }
  155. func MustNewQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
  156. q, err := NewQueue(topic, handler)
  157. if err != nil {
  158. fmt.Println("NewQueue报错")
  159. }
  160. return q
  161. }
  162. func NewQueue(topic string, handler ConsumeHandler) (queue.MessageQueue, error) {
  163. if len(topic) == 0 {
  164. return nil, errors.New("topic不能为空")
  165. }
  166. r := workQueues{
  167. group: service.NewServiceGroup(),
  168. }
  169. r.queues = append(r.queues, newWorkQueue(topic, handler))
  170. return r, nil
  171. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.