package mqs import ( "context" "fmt" "github.com/pkg/errors" "github.com/redis/go-redis/v9" "github.com/zeromicro/go-zero/core/queue" "github.com/zeromicro/go-zero/core/service" "github.com/zeromicro/go-zero/core/threading" ) type ( ConsumeHandle func(v string) error ConsumeHandler interface { Consume(value string) error } redisQueues struct { queues []queue.MessageQueue group *service.ServiceGroup } redisQueue struct { topic string channel chan redis.Message client *redis.Client handler ConsumeHandler consumerRoutines *threading.RoutineGroup producerRoutines *threading.RoutineGroup } ) func (r *redisQueue) Start() { r.startConsumers() r.startProducers() r.producerRoutines.Wait() close(r.channel) r.consumerRoutines.Wait() } func (r *redisQueue) Stop() { } func (r redisQueues) Start() { for _, each := range r.queues { r.group.Add(each) } r.group.Start() } func (r redisQueues) Stop() { r.group.Stop() } func (r *redisQueue) startConsumers() { r.consumerRoutines.Run(func() { for message := range r.channel { if err := r.consumeOne(message.Payload); err != nil { fmt.Errorf("consume: %s, error: %v", message.Payload, err) } } }) } func (r *redisQueue) consumeOne(value string) error { err := r.handler.Consume(value) return err } func (r *redisQueue) startProducers() { r.producerRoutines.Run(func() { for { channel := r.client.Subscribe(context.Background(), r.topic).Channel() for msg := range channel { fmt.Println("生产者获取的值:", msg.Payload) r.channel <- *msg } } }) } func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { return &redisQueue{ topic: topic, client: redisClient, channel: make(chan redis.Message), producerRoutines: threading.NewRoutineGroup(), consumerRoutines: threading.NewRoutineGroup(), handler: handler} } func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { q, err := NewQueue(topic, redisClient, handler) if err != nil { fmt.Println("NewQueue报错") } return q } func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) { if len(topic) == 0 { return nil, errors.New("topic不能为空") } r := redisQueues{ group: service.NewServiceGroup(), } r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler)) return r, nil }