From 898a0b3d5fefeaa32bb79b724f8fa89c7a0d6b3d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 4 Nov 2024 10:45:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq=E9=87=8D=E8=AF=95=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/mq_test.go | 2 +- pkgs/mq/server.go | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pkgs/mq/mq_test.go b/pkgs/mq/mq_test.go index 45357a1..93549ab 100644 --- a/pkgs/mq/mq_test.go +++ b/pkgs/mq/mq_test.go @@ -24,7 +24,7 @@ func Test_ServerClient(t *testing.T) { <-time.After(time.Second * 10) reply := MakeAppDataMessage(&Msg{Data: 1}) return &reply, nil - }) + }, RabbitMQParam{}) So(err, ShouldBeNil) //go svr.Start() diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index 01fcdb8..7c65e6b 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -2,10 +2,11 @@ package mq import ( "fmt" - "gitlink.org.cn/cloudream/common/utils/sync2" "net" "time" + "gitlink.org.cn/cloudream/common/utils/sync2" + "github.com/streadway/amqp" ) @@ -65,6 +66,12 @@ func NewReplyError(err error) ReplyError { } } +type ServerExit struct { + Error error +} + +type RabbitMQServerEvent interface{} + // 处理消息。会将第一个返回值作为响应回复给客户端,如果为nil,则不回复。 type MessageHandlerFn func(msg *Message) (*Message, error) @@ -115,10 +122,8 @@ func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, return srv, nil } -type RabbitMQLogEvent interface{} - -func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] { - ch := sync2.NewUnboundChannel[RabbitMQLogEvent]() +func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQServerEvent] { + ch := sync2.NewUnboundChannel[RabbitMQServerEvent]() channel := s.openChannel(ch) if channel == nil { @@ -133,8 +138,7 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] { case rawReq, ok := <-channel: if !ok { if retryNum > s.config.RetryNum { - ch.Send(fmt.Errorf("maximum number of retries exceeded")) - ch.Send(1) + ch.Send(ServerExit{Error: fmt.Errorf("maximum number of retries exceeded")}) return ch } retryNum++ @@ -158,7 +162,7 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] { } } -func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent]) <-chan amqp.Delivery { +func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQServerEvent]) <-chan amqp.Delivery { _, err := s.channel.QueueDeclare( s.queueName, false, @@ -190,7 +194,7 @@ func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent]) return channel } -func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQLogEvent], reqMsg *Message, rawReq amqp.Delivery) { +func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQServerEvent], reqMsg *Message, rawReq amqp.Delivery) { replyed := make(chan bool) defer close(replyed)