From dff14c39cfdcaa1c3ced4b69f450b3aa85a5be1d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 26 Sep 2023 09:56:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ClientPool=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=EF=BC=8C=E6=96=B9=E4=BE=BF=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/client.go | 59 ++++++++++++++++++++++------------------ pkgs/mq/mq_test.go | 2 +- sdks/pcm/client.go | 25 ++++++----------- sdks/scheduler/client.go | 29 ++++++++------------ sdks/storage/client.go | 29 ++++++++------------ sdks/unifyops/client.go | 25 ++++++----------- 6 files changed, 76 insertions(+), 93 deletions(-) diff --git a/pkgs/mq/client.go b/pkgs/mq/client.go index 0fb02e8..078cdc2 100644 --- a/pkgs/mq/client.go +++ b/pkgs/mq/client.go @@ -31,7 +31,7 @@ func (e *CodeMessageError) Error() string { } type SendOption struct { - // 等待响应的超时时间,为0代表不设置超时时间 + // 发送消息的超时时间,为0代表不设置超时时间 Timeout time.Duration } @@ -43,6 +43,12 @@ type RequestOption struct { KeepAlive bool } +type RoundTripper interface { + Send(msg Message, opt SendOption) error + Request(req Message, opt RequestOption) (*Message, error) + Close() error +} + type requesting struct { RequestID string Receiving chan *Message @@ -51,7 +57,7 @@ type requesting struct { Option RequestOption } -type RabbitMQClient struct { +type RabbitMQTransport struct { connection *amqp.Connection channel *amqp.Channel exchange string @@ -63,7 +69,7 @@ type RabbitMQClient struct { closed chan any } -func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient, error) { +func NewRabbitMQTransport(url string, key string, exchange string) (*RabbitMQTransport, error) { connection, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("connecting to %s: %w", url, err) @@ -75,7 +81,7 @@ func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient return nil, fmt.Errorf("openning channel on connection: %w", err) } - cli := &RabbitMQClient{ + cli := &RabbitMQTransport{ connection: connection, channel: channel, exchange: exchange, @@ -113,11 +119,7 @@ func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient return cli, nil } -func (cli *RabbitMQClient) Request(req Message, opts ...RequestOption) (*Message, error) { - opt := RequestOption{Timeout: time.Second * 15} - if len(opts) > 0 { - opt = opts[0] - } +func (cli *RabbitMQTransport) Request(req Message, opt RequestOption) (*Message, error) { // 如果没有设置timeout,却设置了keepalive,那么默认心跳间隔为15秒 if opt.KeepAlive && opt.Timeout == 0 { opt.Timeout = time.Second * 15 @@ -154,7 +156,7 @@ func (cli *RabbitMQClient) Request(req Message, opts ...RequestOption) (*Message return cli.receiveNoTimeout(reqing) } -func (cli *RabbitMQClient) receiveWithTimeout(reqing *requesting) (*Message, error) { +func (cli *RabbitMQTransport) receiveWithTimeout(reqing *requesting) (*Message, error) { ticker := time.NewTicker(reqing.Option.Timeout) defer ticker.Stop() @@ -182,7 +184,7 @@ func (cli *RabbitMQClient) receiveWithTimeout(reqing *requesting) (*Message, err } } -func (cli *RabbitMQClient) receiveNoTimeout(reqing *requesting) (*Message, error) { +func (cli *RabbitMQTransport) receiveNoTimeout(reqing *requesting) (*Message, error) { for { msg := <-reqing.Receiving if msg.Type != MessageTypeAppData { @@ -193,13 +195,13 @@ func (cli *RabbitMQClient) receiveNoTimeout(reqing *requesting) (*Message, error } } -func (cli *RabbitMQClient) startRequesting(reqing *requesting) { +func (cli *RabbitMQTransport) startRequesting(reqing *requesting) { cli.requestingsLock.Lock() cli.requestings[reqing.RequestID] = reqing cli.requestingsLock.Unlock() } -func (cli *RabbitMQClient) cancelRequsting(reqing *requesting) { +func (cli *RabbitMQTransport) cancelRequsting(reqing *requesting) { cli.requestingsLock.Lock() delete(cli.requestings, reqing.RequestID) cli.requestingsLock.Unlock() @@ -208,19 +210,14 @@ func (cli *RabbitMQClient) cancelRequsting(reqing *requesting) { close(reqing.ReceiveStopped) } -func (c *RabbitMQClient) findReuqesting(reqID string) *requesting { +func (c *RabbitMQTransport) findReuqesting(reqID string) *requesting { c.requestingsLock.Lock() reqing := c.requestings[reqID] c.requestingsLock.Unlock() return reqing } -func (c *RabbitMQClient) Send(msg Message, opts ...SendOption) error { - opt := SendOption{} - if len(opts) > 0 { - opt = opts[0] - } - +func (c *RabbitMQTransport) Send(msg Message, opt SendOption) error { data, err := Serialize(msg) if err != nil { return fmt.Errorf("serialize message failed: %w", err) @@ -250,7 +247,7 @@ func (c *RabbitMQClient) Send(msg Message, opts ...SendOption) error { return nil } -func (c *RabbitMQClient) serve(recvChan <-chan amqp.Delivery) error { +func (c *RabbitMQTransport) serve(recvChan <-chan amqp.Delivery) error { for { select { case rawMsg, ok := <-recvChan: @@ -283,7 +280,7 @@ func (c *RabbitMQClient) serve(recvChan <-chan amqp.Delivery) error { } } -func (c *RabbitMQClient) Close() error { +func (c *RabbitMQTransport) Close() error { var retErr error close(c.closed) @@ -302,10 +299,15 @@ func (c *RabbitMQClient) Close() error { } // 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来 -func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli *RabbitMQClient, req TReq, opts ...RequestOption) (TResp, error) { +func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) { + opt := RequestOption{Timeout: time.Second * 15} + if len(opts) > 0 { + opt = opts[0] + } + var defRet TResp - resp, err := cli.Request(MakeAppDataMessage(req), opts...) + resp, err := cli.Request(MakeAppDataMessage(req), opt) if err != nil { return defRet, fmt.Errorf("requesting: %w", err) } @@ -329,10 +331,15 @@ func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg } // 发送消息,不等待回应 -func Send[TSvc any, TReq MessageBody](_ func(svc TSvc, msg TReq), cli *RabbitMQClient, msg TReq, opts ...SendOption) error { +func Send[TSvc any, TReq MessageBody](_ func(svc TSvc, msg TReq), cli RoundTripper, msg TReq, opts ...SendOption) error { + opt := SendOption{Timeout: time.Second * 15} + if len(opts) > 0 { + opt = opts[0] + } + req := MakeAppDataMessage(msg) - err := cli.Send(req, opts...) + err := cli.Send(req, opt) if err != nil { return fmt.Errorf("sending: %w", err) } diff --git a/pkgs/mq/mq_test.go b/pkgs/mq/mq_test.go index f818601..bb4a95e 100644 --- a/pkgs/mq/mq_test.go +++ b/pkgs/mq/mq_test.go @@ -29,7 +29,7 @@ func Test_ServerClient(t *testing.T) { go svr.Serve() - cli, err := NewRabbitMQClient(rabbitURL, testQueue, "") + cli, err := NewRabbitMQTransport(rabbitURL, testQueue, "") So(err, ShouldBeNil) _, err = cli.Request(MakeAppDataMessage(&Msg{}), RequestOption{ diff --git a/sdks/pcm/client.go b/sdks/pcm/client.go index df15fa1..ba38c88 100644 --- a/sdks/pcm/client.go +++ b/sdks/pcm/client.go @@ -29,32 +29,25 @@ func NewClient(cfg *Config) *Client { } } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { cfg *Config } -func NewPool(cfg *Config) *Pool { - return &Pool{ +func NewPool(cfg *Config) Pool { + return &pool{ cfg: cfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { +func (p *pool) Acquire() (*Client, error) { cli := NewClient(p.cfg) - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil } -func (p *Pool) Release(cli *PoolClient) { +func (p *pool) Release(cli *Client) { } diff --git a/sdks/scheduler/client.go b/sdks/scheduler/client.go index c4ad708..f301503 100644 --- a/sdks/scheduler/client.go +++ b/sdks/scheduler/client.go @@ -1,6 +1,8 @@ package schsdk -import "gitlink.org.cn/cloudream/common/sdks" +import ( + "gitlink.org.cn/cloudream/common/sdks" +) type response[T any] struct { Code string `json:"code"` @@ -25,32 +27,25 @@ func NewClient(cfg *Config) *Client { } } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { cfg *Config } -func NewPool(cfg *Config) *Pool { - return &Pool{ +func NewPool(cfg *Config) Pool { + return &pool{ cfg: cfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { +func (p *pool) Acquire() (*Client, error) { cli := NewClient(p.cfg) - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil } -func (p *Pool) Release(cli *PoolClient) { +func (p *pool) Release(cli *Client) { } diff --git a/sdks/storage/client.go b/sdks/storage/client.go index b6fec23..d3e2be6 100644 --- a/sdks/storage/client.go +++ b/sdks/storage/client.go @@ -1,6 +1,8 @@ package stgsdk -import "gitlink.org.cn/cloudream/common/sdks" +import ( + "gitlink.org.cn/cloudream/common/sdks" +) type response[T any] struct { Code string `json:"code"` @@ -25,32 +27,25 @@ func NewClient(cfg *Config) *Client { } } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { cfg *Config } -func NewPool(cfg *Config) *Pool { - return &Pool{ +func NewPool(cfg *Config) Pool { + return &pool{ cfg: cfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { +func (p *pool) Acquire() (*Client, error) { cli := NewClient(p.cfg) - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil } -func (p *Pool) Release(cli *PoolClient) { +func (p *pool) Release(cli *Client) { } diff --git a/sdks/unifyops/client.go b/sdks/unifyops/client.go index b4b9dd0..92dca4f 100644 --- a/sdks/unifyops/client.go +++ b/sdks/unifyops/client.go @@ -29,32 +29,25 @@ func NewClient(cfg *Config) *Client { } } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) -} - -type Pool struct { +type pool struct { cfg *Config } -func NewPool(cfg *Config) *Pool { - return &Pool{ +func NewPool(cfg *Config) Pool { + return &pool{ cfg: cfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { +func (p *pool) Acquire() (*Client, error) { cli := NewClient(p.cfg) - return &PoolClient{ - Client: cli, - owner: p, - }, nil + return cli, nil } -func (p *Pool) Release(cli *PoolClient) { +func (p *pool) Release(cli *Client) { }