Browse Source

优化ClientPool定义,方便测试

pull/25/head
Sydonian 2 years ago
parent
commit
dff14c39cf
6 changed files with 76 additions and 93 deletions
  1. +33
    -26
      pkgs/mq/client.go
  2. +1
    -1
      pkgs/mq/mq_test.go
  3. +9
    -16
      sdks/pcm/client.go
  4. +12
    -17
      sdks/scheduler/client.go
  5. +12
    -17
      sdks/storage/client.go
  6. +9
    -16
      sdks/unifyops/client.go

+ 33
- 26
pkgs/mq/client.go View File

@@ -31,7 +31,7 @@ func (e *CodeMessageError) Error() string {
}

type SendOption struct {
// 等待响应的超时时间,为0代表不设置超时时间
// 发送消息的超时时间,为0代表不设置超时时间
Timeout time.Duration
}

@@ -43,6 +43,12 @@ type RequestOption struct {
KeepAlive bool
}

type RoundTripper interface {
Send(msg Message, opt SendOption) error
Request(req Message, opt RequestOption) (*Message, error)
Close() error
}

type requesting struct {
RequestID string
Receiving chan *Message
@@ -51,7 +57,7 @@ type requesting struct {
Option RequestOption
}

type RabbitMQClient struct {
type RabbitMQTransport struct {
connection *amqp.Connection
channel *amqp.Channel
exchange string
@@ -63,7 +69,7 @@ type RabbitMQClient struct {
closed chan any
}

func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient, error) {
func NewRabbitMQTransport(url string, key string, exchange string) (*RabbitMQTransport, error) {
connection, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("connecting to %s: %w", url, err)
@@ -75,7 +81,7 @@ func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient
return nil, fmt.Errorf("openning channel on connection: %w", err)
}

cli := &RabbitMQClient{
cli := &RabbitMQTransport{
connection: connection,
channel: channel,
exchange: exchange,
@@ -113,11 +119,7 @@ func NewRabbitMQClient(url string, key string, exchange string) (*RabbitMQClient
return cli, nil
}

func (cli *RabbitMQClient) Request(req Message, opts ...RequestOption) (*Message, error) {
opt := RequestOption{Timeout: time.Second * 15}
if len(opts) > 0 {
opt = opts[0]
}
func (cli *RabbitMQTransport) Request(req Message, opt RequestOption) (*Message, error) {
// 如果没有设置timeout,却设置了keepalive,那么默认心跳间隔为15秒
if opt.KeepAlive && opt.Timeout == 0 {
opt.Timeout = time.Second * 15
@@ -154,7 +156,7 @@ func (cli *RabbitMQClient) Request(req Message, opts ...RequestOption) (*Message
return cli.receiveNoTimeout(reqing)
}

func (cli *RabbitMQClient) receiveWithTimeout(reqing *requesting) (*Message, error) {
func (cli *RabbitMQTransport) receiveWithTimeout(reqing *requesting) (*Message, error) {
ticker := time.NewTicker(reqing.Option.Timeout)
defer ticker.Stop()

@@ -182,7 +184,7 @@ func (cli *RabbitMQClient) receiveWithTimeout(reqing *requesting) (*Message, err
}
}

func (cli *RabbitMQClient) receiveNoTimeout(reqing *requesting) (*Message, error) {
func (cli *RabbitMQTransport) receiveNoTimeout(reqing *requesting) (*Message, error) {
for {
msg := <-reqing.Receiving
if msg.Type != MessageTypeAppData {
@@ -193,13 +195,13 @@ func (cli *RabbitMQClient) receiveNoTimeout(reqing *requesting) (*Message, error
}
}

func (cli *RabbitMQClient) startRequesting(reqing *requesting) {
func (cli *RabbitMQTransport) startRequesting(reqing *requesting) {
cli.requestingsLock.Lock()
cli.requestings[reqing.RequestID] = reqing
cli.requestingsLock.Unlock()
}

func (cli *RabbitMQClient) cancelRequsting(reqing *requesting) {
func (cli *RabbitMQTransport) cancelRequsting(reqing *requesting) {
cli.requestingsLock.Lock()
delete(cli.requestings, reqing.RequestID)
cli.requestingsLock.Unlock()
@@ -208,19 +210,14 @@ func (cli *RabbitMQClient) cancelRequsting(reqing *requesting) {
close(reqing.ReceiveStopped)
}

func (c *RabbitMQClient) findReuqesting(reqID string) *requesting {
func (c *RabbitMQTransport) findReuqesting(reqID string) *requesting {
c.requestingsLock.Lock()
reqing := c.requestings[reqID]
c.requestingsLock.Unlock()
return reqing
}

func (c *RabbitMQClient) Send(msg Message, opts ...SendOption) error {
opt := SendOption{}
if len(opts) > 0 {
opt = opts[0]
}

func (c *RabbitMQTransport) Send(msg Message, opt SendOption) error {
data, err := Serialize(msg)
if err != nil {
return fmt.Errorf("serialize message failed: %w", err)
@@ -250,7 +247,7 @@ func (c *RabbitMQClient) Send(msg Message, opts ...SendOption) error {
return nil
}

func (c *RabbitMQClient) serve(recvChan <-chan amqp.Delivery) error {
func (c *RabbitMQTransport) serve(recvChan <-chan amqp.Delivery) error {
for {
select {
case rawMsg, ok := <-recvChan:
@@ -283,7 +280,7 @@ func (c *RabbitMQClient) serve(recvChan <-chan amqp.Delivery) error {
}
}

func (c *RabbitMQClient) Close() error {
func (c *RabbitMQTransport) Close() error {
var retErr error

close(c.closed)
@@ -302,10 +299,15 @@ func (c *RabbitMQClient) Close() error {
}

// 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来
func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli *RabbitMQClient, req TReq, opts ...RequestOption) (TResp, error) {
func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) {
opt := RequestOption{Timeout: time.Second * 15}
if len(opts) > 0 {
opt = opts[0]
}

var defRet TResp

resp, err := cli.Request(MakeAppDataMessage(req), opts...)
resp, err := cli.Request(MakeAppDataMessage(req), opt)
if err != nil {
return defRet, fmt.Errorf("requesting: %w", err)
}
@@ -329,10 +331,15 @@ func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg
}

// 发送消息,不等待回应
func Send[TSvc any, TReq MessageBody](_ func(svc TSvc, msg TReq), cli *RabbitMQClient, msg TReq, opts ...SendOption) error {
func Send[TSvc any, TReq MessageBody](_ func(svc TSvc, msg TReq), cli RoundTripper, msg TReq, opts ...SendOption) error {
opt := SendOption{Timeout: time.Second * 15}
if len(opts) > 0 {
opt = opts[0]
}

req := MakeAppDataMessage(msg)

err := cli.Send(req, opts...)
err := cli.Send(req, opt)
if err != nil {
return fmt.Errorf("sending: %w", err)
}


+ 1
- 1
pkgs/mq/mq_test.go View File

@@ -29,7 +29,7 @@ func Test_ServerClient(t *testing.T) {

go svr.Serve()

cli, err := NewRabbitMQClient(rabbitURL, testQueue, "")
cli, err := NewRabbitMQTransport(rabbitURL, testQueue, "")
So(err, ShouldBeNil)

_, err = cli.Request(MakeAppDataMessage(&Msg{}), RequestOption{


+ 9
- 16
sdks/pcm/client.go View File

@@ -29,32 +29,25 @@ func NewClient(cfg *Config) *Client {
}
}

type PoolClient struct {
*Client
owner *Pool
type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
type pool struct {
cfg *Config
}

func NewPool(cfg *Config) *Pool {
return &Pool{
func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return &PoolClient{
Client: cli,
owner: p,
}, nil
return cli, nil
}

func (p *Pool) Release(cli *PoolClient) {
func (p *pool) Release(cli *Client) {

}

+ 12
- 17
sdks/scheduler/client.go View File

@@ -1,6 +1,8 @@
package schsdk

import "gitlink.org.cn/cloudream/common/sdks"
import (
"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code string `json:"code"`
@@ -25,32 +27,25 @@ func NewClient(cfg *Config) *Client {
}
}

type PoolClient struct {
*Client
owner *Pool
type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
type pool struct {
cfg *Config
}

func NewPool(cfg *Config) *Pool {
return &Pool{
func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return &PoolClient{
Client: cli,
owner: p,
}, nil
return cli, nil
}

func (p *Pool) Release(cli *PoolClient) {
func (p *pool) Release(cli *Client) {

}

+ 12
- 17
sdks/storage/client.go View File

@@ -1,6 +1,8 @@
package stgsdk

import "gitlink.org.cn/cloudream/common/sdks"
import (
"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code string `json:"code"`
@@ -25,32 +27,25 @@ func NewClient(cfg *Config) *Client {
}
}

type PoolClient struct {
*Client
owner *Pool
type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
type pool struct {
cfg *Config
}

func NewPool(cfg *Config) *Pool {
return &Pool{
func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return &PoolClient{
Client: cli,
owner: p,
}, nil
return cli, nil
}

func (p *Pool) Release(cli *PoolClient) {
func (p *pool) Release(cli *Client) {

}

+ 9
- 16
sdks/unifyops/client.go View File

@@ -29,32 +29,25 @@ func NewClient(cfg *Config) *Client {
}
}

type PoolClient struct {
*Client
owner *Pool
type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
type pool struct {
cfg *Config
}

func NewPool(cfg *Config) *Pool {
return &Pool{
func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return &PoolClient{
Client: cli,
owner: p,
}, nil
return cli, nil
}

func (p *Pool) Release(cli *PoolClient) {
func (p *pool) Release(cli *Client) {

}

Loading…
Cancel
Save