|
- package mqs
-
- import (
- "context"
- "fmt"
- "github.com/pkg/errors"
- "github.com/redis/go-redis/v9"
- "github.com/zeromicro/go-zero/core/queue"
- "github.com/zeromicro/go-zero/core/service"
- "github.com/zeromicro/go-zero/core/threading"
- )
-
- type (
- ConsumeHandle func(v string) error
- ConsumeHandler interface {
- Consume(value string) error
- }
- redisQueues struct {
- queues []queue.MessageQueue
- group *service.ServiceGroup
- }
- redisQueue struct {
- topic string
- channel chan redis.Message
- client *redis.Client
- handler ConsumeHandler
- consumerRoutines *threading.RoutineGroup
- producerRoutines *threading.RoutineGroup
- }
- )
-
- func (r *redisQueue) Start() {
- r.startConsumers()
- r.startProducers()
-
- r.producerRoutines.Wait()
- close(r.channel)
- r.consumerRoutines.Wait()
- }
-
- func (r *redisQueue) Stop() {
- }
-
- func (r redisQueues) Start() {
- for _, each := range r.queues {
- r.group.Add(each)
- }
- r.group.Start()
- }
-
- func (r redisQueues) Stop() {
- r.group.Stop()
- }
-
- func (r *redisQueue) startConsumers() {
- r.consumerRoutines.Run(func() {
- for message := range r.channel {
- if err := r.consumeOne(message.Payload); err != nil {
- fmt.Errorf("consume: %s, error: %v", message.Payload, err)
- }
- }
- })
-
- }
-
- func (r *redisQueue) consumeOne(value string) error {
- err := r.handler.Consume(value)
- return err
- }
-
- func (r *redisQueue) startProducers() {
- r.producerRoutines.Run(func() {
- for {
- channel := r.client.Subscribe(context.Background(), r.topic).Channel()
- for msg := range channel {
- fmt.Println("生产者获取的值:", msg.Payload)
- r.channel <- *msg
- }
-
- }
- })
-
- }
-
- func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
- return &redisQueue{
- topic: topic,
- client: redisClient,
- channel: make(chan redis.Message),
- producerRoutines: threading.NewRoutineGroup(),
- consumerRoutines: threading.NewRoutineGroup(),
- handler: handler}
- }
-
- func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
- q, err := NewQueue(topic, redisClient, handler)
- if err != nil {
- fmt.Println("NewQueue报错")
- }
-
- return q
- }
-
- func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) {
- if len(topic) == 0 {
- return nil, errors.New("topic不能为空")
- }
-
- r := redisQueues{
- group: service.NewServiceGroup(),
- }
- r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler))
-
- return r, nil
- }
|