Sydonian 1 year ago
parent
commit
3212bff6a9
7 changed files with 135 additions and 170 deletions
  1. +1
    -1
      pkgs/mq/mq_test.go
  2. +61
    -27
      pkgs/mq/server.go
  3. +0
    -25
      sdks/cloudstorage/client.go
  4. +0
    -16
      sdks/cloudstorage/models.go
  5. +0
    -24
      sdks/cloudstorage/obs.go
  6. +0
    -55
      sdks/cloudstorage/oss.go
  7. +73
    -22
      sdks/storage/storage.go

+ 1
- 1
pkgs/mq/mq_test.go View File

@@ -27,7 +27,7 @@ func Test_ServerClient(t *testing.T) {
})
So(err, ShouldBeNil)

go svr.Serve()
//go svr.Start()

cli, err := NewRabbitMQTransport(rabbitURL, testQueue, "")
So(err, ShouldBeNil)


+ 61
- 27
pkgs/mq/server.go View File

@@ -2,6 +2,7 @@ package mq

import (
"fmt"
"gitlink.org.cn/cloudream/common/utils/sync2"
"net"
"time"

@@ -72,12 +73,18 @@ type RabbitMQServer struct {
connection *amqp.Connection
channel *amqp.Channel
closed chan any
config RabbitMQParam

OnMessage MessageHandlerFn
OnError func(err error)
}

func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) {
type RabbitMQParam struct {
RetryNum int `json:"retryNum"`
RetryInterval int `json:"retryInterval"`
}

func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, cfg RabbitMQParam) (*RabbitMQServer, error) {
config := amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒
@@ -102,12 +109,56 @@ func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn)
queueName: queueName,
closed: make(chan any),
OnMessage: onMessage,
config: cfg,
}

return srv, nil
}

func (s *RabbitMQServer) Serve() error {
type RabbitMQLogEvent interface{}

func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQLogEvent] {
ch := sync2.NewUnboundChannel[RabbitMQLogEvent]()

channel := s.openChannel(ch)
if channel == nil {
ch.Send(1)
return ch
}

retryNum := 0

for {
select {
case rawReq, ok := <-channel:
if !ok {
if retryNum > s.config.RetryNum {
ch.Send(fmt.Errorf("maximum number of retries exceeded"))
ch.Send(1)
return ch
}
retryNum++

time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond)
channel = s.openChannel(ch)
continue
}

reqMsg, err := Deserialize(rawReq.Body)
if err != nil {
ch.Send(NewDeserializeError(err))
continue
}

go s.handleMessage(ch, reqMsg, rawReq)

case <-s.closed:
return nil
}
}
}

func (s *RabbitMQServer) openChannel(ch *sync2.UnboundChannel[RabbitMQLogEvent]) <-chan amqp.Delivery {
_, err := s.channel.QueueDeclare(
s.queueName,
false,
@@ -117,7 +168,8 @@ func (s *RabbitMQServer) Serve() error {
nil,
)
if err != nil {
return fmt.Errorf("declare queue failed, err: %w", err)
ch.Send(fmt.Errorf("declare queue failed, err: %w", err))
return nil
}

channel, err := s.channel.Consume(
@@ -131,32 +183,14 @@ func (s *RabbitMQServer) Serve() error {
)

if err != nil {
return fmt.Errorf("open consume channel failed, err: %w", err)
ch.Send(fmt.Errorf("get rabbitmq channel failed, err: %w", err))
return nil
}

for {
select {
case rawReq, ok := <-channel:
if !ok {
s.onError(NewDeserializeError(fmt.Errorf("channel is closed")))
return NewReceiveMessageError(fmt.Errorf("channel is closed"))
}

reqMsg, err := Deserialize(rawReq.Body)
if err != nil {
s.onError(NewDeserializeError(err))
continue
}

go s.handleMessage(reqMsg, rawReq)

case <-s.closed:
return nil
}
}
return channel
}

func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) {
func (s *RabbitMQServer) handleMessage(ch *sync2.UnboundChannel[RabbitMQLogEvent], reqMsg *Message, rawReq amqp.Delivery) {
replyed := make(chan bool)
defer close(replyed)

@@ -167,7 +201,7 @@ func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) {

reply, err := s.OnMessage(reqMsg)
if err != nil {
s.onError(NewDispatchError(err))
ch.Send(NewDispatchError(err))
return
}

@@ -175,7 +209,7 @@ func (s *RabbitMQServer) handleMessage(reqMsg *Message, rawReq amqp.Delivery) {
reply.SetRequestID(reqMsg.GetRequestID())
err := s.replyToClient(*reply, &rawReq)
if err != nil {
s.onError(NewReplyError(err))
ch.Send(NewReplyError(err))
}
}
}


+ 0
- 25
sdks/cloudstorage/client.go View File

@@ -1,25 +0,0 @@
package cloudstorage

import "fmt"

//type ObjectStorageInfo interface {
// NewClient() (ObjectStorageClient, error)
//}

type ObjectStorageClient interface {
InitiateMultipartUpload(objectName string) (string, error)
UploadPart()
CompleteMultipartUpload() (string, error)
AbortMultipartUpload()
Close()
}

func NewObjectStorageClient(info ObjectStorage) (ObjectStorageClient, error) {
switch info.Manufacturer {
case AliCloud:
return NewOSSClient(info), nil
case HuaweiCloud:
return &OBSClient{}, nil
}
return nil, fmt.Errorf("unknown cloud storage manufacturer %s", info.Manufacturer)
}

+ 0
- 16
sdks/cloudstorage/models.go View File

@@ -1,16 +0,0 @@
package cloudstorage

type ObjectStorage struct {
Manufacturer string `json:"manufacturer"`
Region string `json:"region"`
AK string `json:"access_key_id"`
SK string `json:"secret_access_key"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

const (
HuaweiCloud = "HuaweiCloud"
AliCloud = "AliCloud"
SugonCloud = "SugonCloud"
)

+ 0
- 24
sdks/cloudstorage/obs.go View File

@@ -1,24 +0,0 @@
package cloudstorage

type OBSClient struct {
}

func (c *OBSClient) InitiateMultipartUpload(objectName string) (string, error) {
return "", nil
}

func (c *OBSClient) UploadPart() {

}

func (c *OBSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OBSClient) AbortMultipartUpload() {

}

func (c *OBSClient) Close() {

}

+ 0
- 55
sdks/cloudstorage/oss.go View File

@@ -1,55 +0,0 @@
package cloudstorage

import (
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"log"
)

type OSSClient struct {
client *oss.Client
bucket *oss.Bucket
}

func (c *OSSClient) InitiateMultipartUpload(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadID, nil
}

func NewOSSClient(obs ObjectStorage) *OSSClient {
// 创建OSSClient实例。
client, err := oss.New(obs.Endpoint, obs.AK, obs.SK)
if err != nil {
log.Fatalf("Error: %v", err)
}

bucket, err := client.Bucket(obs.Bucket)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &OSSClient{
client: client,
bucket: bucket,
}
}

func (c *OSSClient) UploadPart() {

}

func (c *OSSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OSSClient) AbortMultipartUpload() {

}

func (c *OSSClient) Close() {
// 关闭client

}

+ 73
- 22
sdks/storage/storage.go View File

@@ -7,6 +7,38 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder"
)

type Storage struct {
StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement;"`
Name string `json:"name" gorm:"column:Name; type:varchar(256); not null"`
// 完全管理此存储服务的Hub的ID
MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; type:bigint; not null"`
// 存储服务的地址,包含鉴权所需数据
Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"`
// 存储服务拥有的特别功能
Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"`
}

func (Storage) TableName() string {
return "Storage"
}

func (s *Storage) String() string {
return fmt.Sprintf("%v(%v)", s.Name, s.StorageID)
}

// 共享存储服务的配置数据
type SharedStorage struct {
StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint"`
// 调度文件时保存文件的根路径
LoadBase string `json:"loadBase" gorm:"column:LoadBase; type:varchar(1024); not null"`
// 回源数据时数据存放位置的根路径
DataReturnBase string `json:"dataReturnBase" gorm:"column:DataReturnBase; type:varchar(1024); not null"`
}

func (SharedStorage) TableName() string {
return "SharedStorage"
}

// 存储服务地址
type StorageAddress interface {
GetType() string
@@ -31,34 +63,53 @@ func (a *LocalStorageAddress) String() string {
return "Local"
}

type Storage struct {
StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement;"`
Name string `json:"name" gorm:"column:Name; type:varchar(256); not null"`
// 完全管理此存储服务的Hub的ID
MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; type:bigint; not null"`
// 存储服务的地址,包含鉴权所需数据
Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"`
// 存储服务拥有的特别功能
Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"`
type OSSAddress struct {
serder.Metadata `union:"Local"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

func (Storage) TableName() string {
return "Storage"
func (a *OSSAddress) GetType() string {
return "OSSAddress"
}

func (s *Storage) String() string {
return fmt.Sprintf("%v(%v)", s.Name, s.StorageID)
func (a *OSSAddress) String() string {
return "OSSAddress"
}

// 共享存储服务的配置数据
type SharedStorage struct {
StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint"`
// 调度文件时保存文件的根路径
LoadBase string `json:"loadBase" gorm:"column:LoadBase; type:varchar(1024); not null"`
// 回源数据时数据存放位置的根路径
DataReturnBase string `json:"dataReturnBase" gorm:"column:DataReturnBase; type:varchar(1024); not null"`
type OBSAddress struct {
serder.Metadata `union:"Local"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

func (SharedStorage) TableName() string {
return "SharedStorage"
func (a *OBSAddress) GetType() string {
return "OBSAddress"
}

func (a *OBSAddress) String() string {
return "OBSAddress"
}

type COSAddress struct {
serder.Metadata `union:"Local"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

func (a *COSAddress) GetType() string {
return "COSAddress"
}

func (a *COSAddress) String() string {
return "COSAddress"
}

Loading…
Cancel
Save