Browse Source

优化mq重试机制

gitlink
Sydonian 1 year ago
parent
commit
898a0b3d5f
2 changed files with 14 additions and 10 deletions
  1. +1
    -1
      pkgs/mq/mq_test.go
  2. +13
    -9
      pkgs/mq/server.go

+ 1
- 1
pkgs/mq/mq_test.go View File

@@ -24,7 +24,7 @@ func Test_ServerClient(t *testing.T) {
<-time.After(time.Second * 10) <-time.After(time.Second * 10)
reply := MakeAppDataMessage(&Msg{Data: 1}) reply := MakeAppDataMessage(&Msg{Data: 1})
return &reply, nil return &reply, nil
})
}, RabbitMQParam{})
So(err, ShouldBeNil) So(err, ShouldBeNil)


//go svr.Start() //go svr.Start()


+ 13
- 9
pkgs/mq/server.go View File

@@ -2,10 +2,11 @@ package mq


import ( import (
"fmt" "fmt"
"gitlink.org.cn/cloudream/common/utils/sync2"
"net" "net"
"time" "time"


"gitlink.org.cn/cloudream/common/utils/sync2"

"github.com/streadway/amqp" "github.com/streadway/amqp"
) )


@@ -65,6 +66,12 @@ func NewReplyError(err error) ReplyError {
} }
} }


type ServerExit struct {
Error error
}

type RabbitMQServerEvent interface{}

// 处理消息。会将第一个返回值作为响应回复给客户端,如果为nil,则不回复。 // 处理消息。会将第一个返回值作为响应回复给客户端,如果为nil,则不回复。
type MessageHandlerFn func(msg *Message) (*Message, error) type MessageHandlerFn func(msg *Message) (*Message, error)


@@ -115,10 +122,8 @@ func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn,
return srv, nil return srv, nil
} }


type RabbitMQLogEvent interface{}

func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] {
ch := sync2.NewUnboundChannel[RabbitMQLogEvent]()
func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQServerEvent] {
ch := sync2.NewUnboundChannel[RabbitMQServerEvent]()


channel := s.openChannel(ch) channel := s.openChannel(ch)
if channel == nil { if channel == nil {
@@ -133,8 +138,7 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] {
case rawReq, ok := <-channel: case rawReq, ok := <-channel:
if !ok { if !ok {
if retryNum > s.config.RetryNum { if retryNum > s.config.RetryNum {
ch.Send(fmt.Errorf("maximum number of retries exceeded"))
ch.Send(1)
ch.Send(ServerExit{Error: fmt.Errorf("maximum number of retries exceeded")})
return ch return ch
} }
retryNum++ retryNum++
@@ -158,7 +162,7 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] {
} }
} }


func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent]) <-chan amqp.Delivery {
func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQServerEvent]) <-chan amqp.Delivery {
_, err := s.channel.QueueDeclare( _, err := s.channel.QueueDeclare(
s.queueName, s.queueName,
false, false,
@@ -190,7 +194,7 @@ func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent])
return channel return channel
} }


func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQLogEvent], reqMsg *Message, rawReq amqp.Delivery) {
func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQServerEvent], reqMsg *Message, rawReq amqp.Delivery) {
replyed := make(chan bool) replyed := make(chan bool)
defer close(replyed) defer close(replyed)




Loading…
Cancel
Save