You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.go 4.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package mqs
  13. import (
  14. "fmt"
  15. "github.com/pkg/errors"
  16. "github.com/zeromicro/go-zero/core/queue"
  17. "github.com/zeromicro/go-zero/core/service"
  18. "github.com/zeromicro/go-zero/core/threading"
  19. "k8s.io/apimachinery/pkg/util/json"
  20. "sync"
  21. "time"
  22. )
  23. var InsQueue *workQueue
  24. type (
  25. ConsumeHandle func(v string) error
  26. ConsumeHandler interface {
  27. Consume(value string) error
  28. }
  29. workQueues struct {
  30. queues []queue.MessageQueue
  31. group *service.ServiceGroup
  32. }
  33. workQueue struct {
  34. topic string
  35. Beta *Beta
  36. handler ConsumeHandler
  37. consumerRoutines *threading.RoutineGroup
  38. }
  39. Beta struct {
  40. queue []t
  41. // dirty defines all of the items that need to be processed.
  42. dirty set
  43. // Things that are currently being processed are in the processing set.
  44. // These things may be simultaneously in the dirty set. When we finish
  45. // processing something and remove it from this set, we'll check if
  46. // it's in the dirty set, and if so, add it to the queue.
  47. processing set
  48. cond *sync.Cond
  49. }
  50. empty struct{}
  51. t interface{}
  52. set map[t]empty
  53. )
  54. func (s set) has(item t) bool {
  55. _, exists := s[item]
  56. return exists
  57. }
  58. func (s set) insert(item t) {
  59. s[item] = empty{}
  60. }
  61. func (s set) delete(item t) {
  62. delete(s, item)
  63. }
  64. func (s set) len() int {
  65. return len(s)
  66. }
  67. func (b *Beta) Get() (item interface{}) {
  68. b.cond.L.Lock()
  69. defer b.cond.L.Unlock()
  70. for len(b.queue) == 0 {
  71. b.cond.Wait()
  72. }
  73. if len(b.queue) == 0 {
  74. // We must be shutting down.
  75. return nil
  76. }
  77. item = b.queue[0]
  78. // The underlying array still exists and reference this object, so the object will not be garbage collected.
  79. b.queue[0] = nil
  80. b.queue = b.queue[1:]
  81. b.processing.insert(item)
  82. b.dirty.delete(item)
  83. return item
  84. }
  85. func (b *Beta) Add(item interface{}) {
  86. b.cond.L.Lock()
  87. defer b.cond.L.Unlock()
  88. if b.dirty.has(item) {
  89. return
  90. }
  91. b.dirty.insert(item)
  92. if b.processing.has(item) {
  93. return
  94. }
  95. b.queue = append(b.queue, item)
  96. b.cond.Signal()
  97. }
  98. func (w *workQueue) Start() {
  99. w.startConsumers()
  100. w.consumerRoutines.Wait()
  101. }
  102. func (w *workQueue) Stop() {
  103. }
  104. func (w workQueues) Start() {
  105. for _, each := range w.queues {
  106. w.group.Add(each)
  107. }
  108. w.group.Start()
  109. }
  110. func (w workQueues) Stop() {
  111. w.group.Stop()
  112. }
  113. func (w *workQueue) startConsumers() {
  114. w.consumerRoutines.Run(func() {
  115. for {
  116. item := w.Beta.Get()
  117. println("开始消费 ")
  118. if item != nil {
  119. bytes, err := json.Marshal(item)
  120. if err != nil {
  121. return
  122. }
  123. w.consumeOne(string(bytes))
  124. println("开始消费3")
  125. }
  126. time.Sleep(1 * time.Second)
  127. }
  128. })
  129. }
  130. func (w *workQueue) consumeOne(value string) error {
  131. err := w.handler.Consume(value)
  132. return err
  133. }
  134. func newWorkQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
  135. wq := &workQueue{
  136. topic: topic,
  137. Beta: &Beta{
  138. dirty: set{},
  139. processing: set{},
  140. cond: sync.NewCond(&sync.Mutex{}),
  141. },
  142. consumerRoutines: threading.NewRoutineGroup(),
  143. handler: handler}
  144. InsQueue = wq
  145. return wq
  146. }
  147. func MustNewQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
  148. q, err := NewQueue(topic, handler)
  149. if err != nil {
  150. fmt.Println("NewQueue报错")
  151. }
  152. return q
  153. }
  154. func NewQueue(topic string, handler ConsumeHandler) (queue.MessageQueue, error) {
  155. if len(topic) == 0 {
  156. return nil, errors.New("topic不能为空")
  157. }
  158. r := workQueues{
  159. group: service.NewServiceGroup(),
  160. }
  161. r.queues = append(r.queues, newWorkQueue(topic, handler))
  162. return r, nil
  163. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.