From 2bd342c863792268b11783a9c41e0f353bc47649 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 30 Oct 2024 17:16:09 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rabbitmq=E6=96=AD?= =?UTF-8?q?=E8=BF=9E=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/server.go | 87 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index 6106753..614b73f 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -2,6 +2,7 @@ package mq import ( "fmt" + "gitlink.org.cn/cloudream/common/utils/sync2" "net" "time" @@ -72,12 +73,18 @@ type RabbitMQServer struct { connection *amqp.Connection channel *amqp.Channel closed chan any + config RabbitMQParam OnMessage MessageHandlerFn OnError func(err error) } -func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) { +type RabbitMQParam struct { + RetryNum int `json:"retryNum"` + RetryInterval int `json:"retryInterval"` +} + +func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, cfg RabbitMQParam) (*RabbitMQServer, error) { config := amqp.Config{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 @@ -102,12 +109,55 @@ func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn) queueName: queueName, closed: make(chan any), OnMessage: onMessage, + config: cfg, } return srv, nil } -func (s *RabbitMQServer) Serve() error { +type RabbitMQLogEvent interface{} + +func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] { + ch := sync2.NewUnboundChannel[RabbitMQLogEvent]() + + channel := s.openChannel(ch) + if channel == nil { + ch.Send(1) + return ch + } + + retryNum := 0 + + for { + select { + case rawReq, ok := <-channel: + if !ok { + if retryNum > s.config.RetryNum { + ch.Send(fmt.Errorf("maximum number of retries exceeded")) + ch.Send(1) + return ch + } + retryNum++ + + time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond) + channel = s.openChannel(ch) + } + + reqMsg, err := Deserialize(rawReq.Body) + if err != nil { + ch.Send(NewDeserializeError(err)) + continue + } + + go s.handleMessage(ch, reqMsg, rawReq) + + case <-s.closed: + return nil + } + } +} + +func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent]) <-chan amqp.Delivery { _, err := s.channel.QueueDeclare( s.queueName, false, @@ -117,7 +167,8 @@ func (s *RabbitMQServer) Serve() error { nil, ) if err != nil { - return fmt.Errorf("declare queue failed, err: %w", err) + ch.Send(fmt.Errorf("declare queue failed, err: %w", err)) + return nil } channel, err := s.channel.Consume( @@ -131,32 +182,14 @@ func (s *RabbitMQServer) Serve() error { ) if err != nil { - return fmt.Errorf("open consume channel failed, err: %w", err) + ch.Send(fmt.Errorf("get rabbitmq channel failed, err: %w", err)) + return nil } - for { - select { - case rawReq, ok := <-channel: - if !ok { - s.onError(NewDeserializeError(fmt.Errorf("channel is closed"))) - return NewReceiveMessageError(fmt.Errorf("channel is closed")) - } - - reqMsg, err := Deserialize(rawReq.Body) - if err != nil { - s.onError(NewDeserializeError(err)) - continue - } - - go s.handleMessage(reqMsg, rawReq) - - case <-s.closed: - return nil - } - } + return channel } -func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) { +func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQLogEvent], reqMsg *Message, rawReq amqp.Delivery) { replyed := make(chan bool) defer close(replyed) @@ -167,7 +200,7 @@ func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) { reply, err := s.OnMessage(reqMsg) if err != nil { - s.onError(NewDispatchError(err)) + ch.Send(NewDispatchError(err)) return } @@ -175,7 +208,7 @@ func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) { reply.SetRequestID(reqMsg.GetRequestID()) err := s.replyToClient(*reply, &rawReq) if err != nil { - s.onError(NewReplyError(err)) + ch.Send(NewReplyError(err)) } } } From b760aa072f494a07ebefad438cf173c3a03c21d5 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Thu, 31 Oct 2024 16:49:23 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E8=B0=83=E6=95=B4s3=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/mq_test.go | 2 +- pkgs/mq/server.go | 1 + sdks/cloudstorage/client.go | 25 ----------------- sdks/cloudstorage/models.go | 16 ----------- sdks/cloudstorage/obs.go | 24 ---------------- sdks/cloudstorage/oss.go | 55 ------------------------------------- 6 files changed, 2 insertions(+), 121 deletions(-) delete mode 100644 sdks/cloudstorage/client.go delete mode 100644 sdks/cloudstorage/models.go delete mode 100644 sdks/cloudstorage/obs.go delete mode 100644 sdks/cloudstorage/oss.go diff --git a/pkgs/mq/mq_test.go b/pkgs/mq/mq_test.go index 209bd33..45357a1 100644 --- a/pkgs/mq/mq_test.go +++ b/pkgs/mq/mq_test.go @@ -27,7 +27,7 @@ func Test_ServerClient(t *testing.T) { }) So(err, ShouldBeNil) - go svr.Serve() + //go svr.Start() cli, err := NewRabbitMQTransport(rabbitURL, testQueue, "") So(err, ShouldBeNil) diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index 614b73f..01fcdb8 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -141,6 +141,7 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] { time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond) channel = s.openChannel(ch) + continue } reqMsg, err := Deserialize(rawReq.Body) diff --git a/sdks/cloudstorage/client.go b/sdks/cloudstorage/client.go deleted file mode 100644 index 878b474..0000000 --- a/sdks/cloudstorage/client.go +++ /dev/null @@ -1,25 +0,0 @@ -package cloudstorage - -import "fmt" - -//type ObjectStorageInfo interface { -// NewClient() (ObjectStorageClient, error) -//} - -type ObjectStorageClient interface { - InitiateMultipartUpload(objectName string) (string, error) - UploadPart() - CompleteMultipartUpload() (string, error) - AbortMultipartUpload() - Close() -} - -func NewObjectStorageClient(info ObjectStorage) (ObjectStorageClient, error) { - switch info.Manufacturer { - case AliCloud: - return NewOSSClient(info), nil - case HuaweiCloud: - return &OBSClient{}, nil - } - return nil, fmt.Errorf("unknown cloud storage manufacturer %s", info.Manufacturer) -} diff --git a/sdks/cloudstorage/models.go b/sdks/cloudstorage/models.go deleted file mode 100644 index ecba552..0000000 --- a/sdks/cloudstorage/models.go +++ /dev/null @@ -1,16 +0,0 @@ -package cloudstorage - -type ObjectStorage struct { - Manufacturer string `json:"manufacturer"` - Region string `json:"region"` - AK string `json:"access_key_id"` - SK string `json:"secret_access_key"` - Endpoint string `json:"endpoint"` - Bucket string `json:"bucket"` -} - -const ( - HuaweiCloud = "HuaweiCloud" - AliCloud = "AliCloud" - SugonCloud = "SugonCloud" -) diff --git a/sdks/cloudstorage/obs.go b/sdks/cloudstorage/obs.go deleted file mode 100644 index 7f38e2d..0000000 --- a/sdks/cloudstorage/obs.go +++ /dev/null @@ -1,24 +0,0 @@ -package cloudstorage - -type OBSClient struct { -} - -func (c *OBSClient) InitiateMultipartUpload(objectName string) (string, error) { - return "", nil -} - -func (c *OBSClient) UploadPart() { - -} - -func (c *OBSClient) CompleteMultipartUpload() (string, error) { - return "", nil -} - -func (c *OBSClient) AbortMultipartUpload() { - -} - -func (c *OBSClient) Close() { - -} diff --git a/sdks/cloudstorage/oss.go b/sdks/cloudstorage/oss.go deleted file mode 100644 index cfe5e14..0000000 --- a/sdks/cloudstorage/oss.go +++ /dev/null @@ -1,55 +0,0 @@ -package cloudstorage - -import ( - "fmt" - "github.com/aliyun/aliyun-oss-go-sdk/oss" - "log" -) - -type OSSClient struct { - client *oss.Client - bucket *oss.Bucket -} - -func (c *OSSClient) InitiateMultipartUpload(objectName string) (string, error) { - imur, err := c.bucket.InitiateMultipartUpload(objectName) - if err != nil { - return "", fmt.Errorf("failed to initiate multipart upload: %w", err) - } - return imur.UploadID, nil -} - -func NewOSSClient(obs ObjectStorage) *OSSClient { - // 创建OSSClient实例。 - client, err := oss.New(obs.Endpoint, obs.AK, obs.SK) - if err != nil { - log.Fatalf("Error: %v", err) - } - - bucket, err := client.Bucket(obs.Bucket) - if err != nil { - log.Fatalf("Error: %v", err) - } - - return &OSSClient{ - client: client, - bucket: bucket, - } -} - -func (c *OSSClient) UploadPart() { - -} - -func (c *OSSClient) CompleteMultipartUpload() (string, error) { - return "", nil -} - -func (c *OSSClient) AbortMultipartUpload() { - -} - -func (c *OSSClient) Close() { - // 关闭client - -} From 266857b4ac58e491b7bb76d9c8f5acc4f7b77313 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 1 Nov 2024 17:41:24 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E7=A7=BB=E9=99=A4UploadPartOutput=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E4=BD=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/storage/storage.go | 95 +++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/sdks/storage/storage.go b/sdks/storage/storage.go index 72a150c..5914cfe 100644 --- a/sdks/storage/storage.go +++ b/sdks/storage/storage.go @@ -7,6 +7,38 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) +type Storage struct { + StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; autoIncrement;"` + Name string `json:"name" gorm:"column:Name; not null"` + // 完全管理此存储服务的Hub的ID + MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; not null"` + // 存储服务的地址,包含鉴权所需数据 + Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"` + // 存储服务拥有的特别功能 + Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` +} + +func (Storage) TableName() string { + return "Storage" +} + +func (s *Storage) String() string { + return fmt.Sprintf("%v(%v)", s.Name, s.StorageID) +} + +// 共享存储服务的配置数据 +type SharedStorage struct { + StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` + // 调度文件时保存文件的根路径 + LoadBase string `json:"loadBase" gorm:"column:LoadBase; not null"` + // 回源数据时数据存放位置的根路径 + DataReturnBase string `json:"dataReturnBase" gorm:"column:DataReturnBase; not null"` +} + +func (SharedStorage) TableName() string { + return "SharedStorage" +} + // 存储服务地址 type StorageAddress interface { GetType() string @@ -30,34 +62,53 @@ func (a *LocalStorageAddress) String() string { return "Local" } -type Storage struct { - StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; autoIncrement;"` - Name string `json:"name" gorm:"column:Name; not null"` - // 完全管理此存储服务的Hub的ID - MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; not null"` - // 存储服务的地址,包含鉴权所需数据 - Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"` - // 存储服务拥有的特别功能 - Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` +type OSSAddress struct { + serder.Metadata `union:"Local"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` } -func (Storage) TableName() string { - return "Storage" +func (a *OSSAddress) GetType() string { + return "OSSAddress" } -func (s *Storage) String() string { - return fmt.Sprintf("%v(%v)", s.Name, s.StorageID) +func (a *OSSAddress) String() string { + return "OSSAddress" } -// 共享存储服务的配置数据 -type SharedStorage struct { - StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` - // 调度文件时保存文件的根路径 - LoadBase string `json:"loadBase" gorm:"column:LoadBase; not null"` - // 回源数据时数据存放位置的根路径 - DataReturnBase string `json:"dataReturnBase" gorm:"column:DataReturnBase; not null"` +type OBSAddress struct { + serder.Metadata `union:"Local"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` } -func (SharedStorage) TableName() string { - return "SharedStorage" +func (a *OBSAddress) GetType() string { + return "OBSAddress" +} + +func (a *OBSAddress) String() string { + return "OBSAddress" +} + +type COSAddress struct { + serder.Metadata `union:"Local"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` +} + +func (a *COSAddress) GetType() string { + return "COSAddress" +} + +func (a *COSAddress) String() string { + return "COSAddress" }