From ac4efa7c19bf0b11126833a8eb1c73dd2b0ab7b9 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 19 Dec 2024 09:23:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq=E9=85=8D=E7=BD=AE=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/cmd/serve.go | 4 +- agent/internal/config/config.go | 4 +- client/internal/config/config.go | 4 +- client/main.go | 2 +- common/globals/pools.go | 4 +- common/pkgs/mq/agent/client.go | 8 +-- common/pkgs/mq/agent/server.go | 5 +- common/pkgs/mq/config.go | 19 ------ common/pkgs/mq/coordinator/client.go | 8 +-- common/pkgs/mq/coordinator/server.go | 7 +- common/pkgs/mq/scanner/client.go | 8 +-- common/pkgs/mq/scanner/server.go | 5 +- common/pkgs/storage/s3/s3_test.go | 95 ++++++++++++++++++++------- coordinator/internal/cmd/serve.go | 6 +- coordinator/internal/config/config.go | 8 +-- scanner/internal/config/config.go | 4 +- scanner/main.go | 4 +- 17 files changed, 110 insertions(+), 85 deletions(-) delete mode 100644 common/pkgs/mq/config.go diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index 83d9433..720c3f6 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -46,7 +46,7 @@ func serve(configPath string) { } stgglb.InitLocal(&config.Cfg().Local) - stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) // 获取Hub配置 @@ -131,7 +131,7 @@ func serve(configPath string) { // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgMgr), config.Cfg().ID, config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) } diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index e16235a..172564e 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -3,13 +3,13 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" c "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/grpc" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { @@ -18,7 +18,7 @@ type Config struct { Local stgmodels.LocalMachineInfo `json:"local"` GRPC *grpc.Config `json:"grpc"` Logger log.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` + RabbitMQ mq.Config `json:"rabbitMQ"` DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` diff --git a/client/internal/config/config.go b/client/internal/config/config.go index b0cbb24..5523bbf 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -3,20 +3,20 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { Local stgmodels.LocalMachineInfo `json:"local"` AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` + RabbitMQ mq.Config `json:"rabbitMQ"` DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` diff --git a/client/main.go b/client/main.go index 158dca7..4412541 100644 --- a/client/main.go +++ b/client/main.go @@ -37,7 +37,7 @@ func main() { } stgglb.InitLocal(&config.Cfg().Local) - stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) // 连接性信息收集 diff --git a/common/globals/pools.go b/common/globals/pools.go index 71eca60..75b975e 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -1,8 +1,8 @@ package stgglb import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" @@ -18,7 +18,7 @@ var ScannerMQPool scmq.Pool // // @Description: 初始化MQ连接池 // @param cfg -func InitMQPool(cfg *stgmq.Config) { +func InitMQPool(cfg mq.Config) { AgentMQPool = agtmq.NewPool(cfg) CoordinatorMQPool = coormq.NewPool(cfg) diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index 5debb8a..4a84ed7 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -13,8 +13,8 @@ type Client struct { id cdssdk.HubID } -func NewClient(id cdssdk.HubID, cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(int64(id)), "") +func NewClient(id cdssdk.HubID, cfg mq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.MakeAgentQueueName(int64(id)), "") if err != nil { return nil, err } @@ -35,12 +35,12 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg mq.Config shareds map[cdssdk.HubID]*Client lock sync.Mutex } -func NewPool(mqcfg *stgmq.Config) Pool { +func NewPool(mqcfg mq.Config) Pool { return &pool{ mqcfg: mqcfg, shareds: make(map[cdssdk.HubID]*Client), diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index eeeccfc..ec55be5 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -20,18 +20,17 @@ type Server struct { rabbitSvr mq.RabbitMQServer } -func NewServer(svc Service, id cdssdk.HubID, cfg *mymq.Config) (*Server, error) { +func NewServer(svc Service, id cdssdk.HubID, cfg mq.Config) (*Server, error) { srv := &Server{ service: svc, } rabbitSvr, err := mq.NewRabbitMQServer( - cfg.MakeConnectingURL(), + cfg, mymq.MakeAgentQueueName(int64(id)), func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, - cfg.Param, ) if err != nil { return nil, err diff --git a/common/pkgs/mq/config.go b/common/pkgs/mq/config.go deleted file mode 100644 index f572d53..0000000 --- a/common/pkgs/mq/config.go +++ /dev/null @@ -1,19 +0,0 @@ -package mq - -import ( - "fmt" - - "gitlink.org.cn/cloudream/common/pkgs/mq" -) - -type Config struct { - Address string `json:"address"` - Account string `json:"account"` - Password string `json:"password"` - VHost string `json:"vhost"` - Param mq.RabbitMQParam `json:"param"` -} - -func (cfg *Config) MakeConnectingURL() string { - return fmt.Sprintf("amqp://%s:%s@%s%s", cfg.Account, cfg.Password, cfg.Address, cfg.VHost) -} diff --git a/common/pkgs/mq/coordinator/client.go b/common/pkgs/mq/coordinator/client.go index 84a4f28..ad7c1cb 100644 --- a/common/pkgs/mq/coordinator/client.go +++ b/common/pkgs/mq/coordinator/client.go @@ -11,8 +11,8 @@ type Client struct { rabbitCli *mq.RabbitMQTransport } -func NewClient(cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.COORDINATOR_QUEUE_NAME, "") +func NewClient(cfg mq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.COORDINATOR_QUEUE_NAME, "") if err != nil { return nil, err } @@ -32,12 +32,12 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg mq.Config shared *Client lock sync.Mutex } -func NewPool(mqcfg *stgmq.Config) Pool { +func NewPool(mqcfg mq.Config) Pool { return &pool{ mqcfg: mqcfg, } diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index 49d0319..d286b2f 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -28,18 +28,17 @@ type Server struct { rabbitSvr mq.RabbitMQServer } -func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { +func NewServer(svc Service, cfg mq.Config) (*Server, error) { srv := &Server{ service: svc, } rabbitSvr, err := mq.NewRabbitMQServer( - cfg.MakeConnectingURL(), + cfg, mymq.COORDINATOR_QUEUE_NAME, func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, - cfg.Param, ) if err != nil { return nil, err @@ -53,7 +52,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Start(cfg mymq.Config) *sync2.UnboundChannel[mq.RabbitMQServerEvent] { +func (s *Server) Start(cfg mq.Config) *sync2.UnboundChannel[mq.RabbitMQServerEvent] { return s.rabbitSvr.Start() } diff --git a/common/pkgs/mq/scanner/client.go b/common/pkgs/mq/scanner/client.go index 156a16d..0970d53 100644 --- a/common/pkgs/mq/scanner/client.go +++ b/common/pkgs/mq/scanner/client.go @@ -11,8 +11,8 @@ type Client struct { rabbitCli *mq.RabbitMQTransport } -func NewClient(cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.SCANNER_QUEUE_NAME, "") +func NewClient(cfg mq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.SCANNER_QUEUE_NAME, "") if err != nil { return nil, err } @@ -32,12 +32,12 @@ type Pool interface { } type pool struct { - mqcfg *stgmq.Config + mqcfg mq.Config shared *Client lock sync.Mutex } -func NewPool(mqcfg *stgmq.Config) Pool { +func NewPool(mqcfg mq.Config) Pool { return &pool{ mqcfg: mqcfg, } diff --git a/common/pkgs/mq/scanner/server.go b/common/pkgs/mq/scanner/server.go index 5b11565..2ca0c64 100644 --- a/common/pkgs/mq/scanner/server.go +++ b/common/pkgs/mq/scanner/server.go @@ -15,18 +15,17 @@ type Server struct { rabbitSvr mq.RabbitMQServer } -func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { +func NewServer(svc Service, cfg mq.Config) (*Server, error) { srv := &Server{ service: svc, } rabbitSvr, err := mq.NewRabbitMQServer( - cfg.MakeConnectingURL(), + cfg, mymq.SCANNER_QUEUE_NAME, func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, - cfg.Param, ) if err != nil { return nil, err diff --git a/common/pkgs/storage/s3/s3_test.go b/common/pkgs/storage/s3/s3_test.go index a0765a1..7c3f2cd 100644 --- a/common/pkgs/storage/s3/s3_test.go +++ b/common/pkgs/storage/s3/s3_test.go @@ -1,8 +1,12 @@ package s3 import ( + "bytes" "context" "fmt" + "os" + "path/filepath" + "strings" "testing" "github.com/aws/aws-sdk-go-v2/aws" @@ -14,34 +18,79 @@ import ( func Test_S3(t *testing.T) { Convey("OBS", t, func() { cli, bkt, err := createS3Client(&cdssdk.OBSType{ - Region: "0", - AK: "0", - SK: "0", - Endpoint: "0", - Bucket: "0", + Region: "cn-north-4", + AK: "CANMDYKXIWRDR0IYDB32", + SK: "V67yEYpu7ol2NT8nhLlNF1g9k5hq2VwIP5N5jIoQ", + Endpoint: "https://obs.cn-north-4.myhuaweicloud.com", + Bucket: "pcm3-bucket3", }) So(err, ShouldEqual, nil) - var marker *string - for { - resp, err := cli.ListObjects(context.Background(), &s3.ListObjectsInput{ - Bucket: aws.String(bkt), - Prefix: aws.String("cds"), - MaxKeys: aws.Int32(5), - Marker: marker, - }) - So(err, ShouldEqual, nil) - - fmt.Printf("\n") - for _, obj := range resp.Contents { - fmt.Printf("%v, %v\n", *obj.Key, *obj.LastModified) + // file, err := os.Open("./sky") + So(err, ShouldEqual, nil) + // defer file.Close() + + _, err = cli.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bkt), + Key: aws.String("sky2"), + Body: bytes.NewReader([]byte("hello world")), + // ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, + // ContentType: aws.String("application/octet-stream"), + ContentLength: aws.Int64(11), + // ContentEncoding: aws.String("identity"), + }) + + So(err, ShouldEqual, nil) + + // var marker *string + // for { + // resp, err := cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + // Bucket: aws.String(bkt), + // Prefix: aws.String("cds"), + // MaxKeys: aws.Int32(5), + // Marker: marker, + // }) + // So(err, ShouldEqual, nil) + + // fmt.Printf("\n") + // for _, obj := range resp.Contents { + // fmt.Printf("%v, %v\n", *obj.Key, *obj.LastModified) + // } + + // if *resp.IsTruncated { + // marker = resp.NextMarker + // } else { + // break + // } + // } + + }) +} + +func Test_2(t *testing.T) { + Convey("OBS", t, func() { + dir := "d:\\Projects\\cloudream\\workspace\\storage\\common\\pkgs\\storage\\s3" + filepath.WalkDir(dir, func(fname string, d os.DirEntry, err error) error { + if err != nil { + return nil } - if *resp.IsTruncated { - marker = resp.NextMarker - } else { - break + info, err := d.Info() + if err != nil { + return nil } - } + + if info.IsDir() { + return nil + } + + path := strings.TrimPrefix(fname, dir+string(os.PathSeparator)) + // path := fname + comps := strings.Split(filepath.ToSlash(path), "/") + fmt.Println(path) + fmt.Println(comps) + // s.fs.syncer.SyncObject(append([]string{userName}, comps...), info.Size()) + return nil + }) }) } diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index 30c07e3..59057d5 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -4,8 +4,6 @@ import ( "fmt" "os" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" - "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" @@ -32,7 +30,7 @@ func serve(configPath string) { logger.Fatalf("new db2 failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(mymq.NewService(db2), &config.Cfg().RabbitMQ) + coorSvr, err := coormq.NewServer(mymq.NewService(db2), config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } @@ -48,7 +46,7 @@ func serve(configPath string) { <-forever } -func serveCoorServer(server *coormq.Server, cfg stgmq.Config) { +func serveCoorServer(server *coormq.Server, cfg mq.Config) { logger.Info("start serving command server") ch := server.Start(cfg) diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index 49ae494..76789e7 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -2,15 +2,15 @@ package config import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" db "gitlink.org.cn/cloudream/storage/common/pkgs/db2/config" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { - Logger log.Config `json:"logger"` - DB db.Config `json:"db"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` + Logger log.Config `json:"logger"` + DB db.Config `json:"db"` + RabbitMQ mq.Config `json:"rabbitMQ"` } var cfg Config diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go index 281db97..46cd5f2 100644 --- a/scanner/internal/config/config.go +++ b/scanner/internal/config/config.go @@ -3,9 +3,9 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" db "gitlink.org.cn/cloudream/storage/common/pkgs/db2/config" - stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { @@ -14,7 +14,7 @@ type Config struct { HubUnavailableSeconds int `json:"hubUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 Logger log.Config `json:"logger"` DB db.Config `json:"db"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` + RabbitMQ mq.Config `json:"rabbitMQ"` DistLock distlock.Config `json:"distlock"` } diff --git a/scanner/main.go b/scanner/main.go index b227ee7..573e209 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -35,7 +35,7 @@ func main() { logger.Fatalf("new db failed, err: %s", err.Error()) } - stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) @@ -54,7 +54,7 @@ func main() { eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr) go serveEventExecutor(&eventExecutor) - agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ) + agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) }