diff --git a/agent/main.go b/agent/main.go index 22333df..e54f99a 100644 --- a/agent/main.go +++ b/agent/main.go @@ -148,12 +148,28 @@ func main() { func serveAgentServer(server *agtmq.Server) { logger.Info("start serving command server") - err := server.Serve() - - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) + ch := server.Start() + if ch == nil { + logger.Errorf("RabbitMQ logEvent is nil") + os.Exit(1) } + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("rabbitmq connect with error: %v", val) + case int: + if val == 1 { + break + } + } + } logger.Info("command server stopped") // TODO 仅简单结束了程序 diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index 0ef7da6..05d7ad4 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -15,8 +15,8 @@ func init() { type MultipartManage struct { Address cdssdk.StorageAddress `json:"address"` - UploadID *exec.StringVar `json:"uploadID"` - ObjectID *exec.StringVar `json:"objectID"` + UploadID *exec.VarID `json:"uploadID"` + ObjectID *exec.VarID `json:"objectID"` } func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -41,8 +41,6 @@ func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error if err != nil { return err } - o.UploadID.Value = uploadID - e.PutVars(o.UploadID) objectID, err := client.CompleteMultipartUpload() if err != nil { @@ -79,7 +77,7 @@ func (t *MultipartManageNode) GenerateOp() (exec.Op, error) { type MultipartUpload struct { Address cdssdk.StorageAddress `json:"address"` - FileMD5 *exec.StringVar `json:"fileMD5"` + FileMD5 *exec.VarID `json:"fileMD5"` } func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error { diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index ece9016..deb9a6d 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -2,6 +2,7 @@ package agent import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -29,6 +30,7 @@ func NewServer(svc Service, id int64, cfg *mymq.Config) (*Server, error) { func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, + cfg.RabbitMQParam, ) if err != nil { return nil, err @@ -43,8 +45,8 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Serve() error { - return s.rabbitSvr.Serve() +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { + return s.rabbitSvr.Start() } func (s *Server) OnError(callback func(error)) { diff --git a/common/pkgs/mq/config.go b/common/pkgs/mq/config.go index 07100af..4b4f4bb 100644 --- a/common/pkgs/mq/config.go +++ b/common/pkgs/mq/config.go @@ -1,12 +1,16 @@ package mq -import "fmt" +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/mq" +) type Config struct { - Address string `json:"address"` - Account string `json:"account"` - Password string `json:"password"` - VHost string `json:"vhost"` + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + VHost string `json:"vhost"` + RabbitMQParam mq.RabbitMQParam `json:"param"` } func (cfg *Config) MakeConnectingURL() string { diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index 8444c6f..7e10faf 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -2,6 +2,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -38,6 +39,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, + cfg.RabbitMQParam, ) if err != nil { return nil, err @@ -51,8 +53,8 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Serve() error { - return s.rabbitSvr.Serve() +func (s *Server) Start(cfg mymq.Config) *sync2.UnboundChannel[mq.RabbitMQLogEvent] { + return s.rabbitSvr.Start(cfg) } func (s *Server) OnError(callback func(error)) { diff --git a/common/pkgs/mq/scanner/server.go b/common/pkgs/mq/scanner/server.go index 8219519..b9362d7 100644 --- a/common/pkgs/mq/scanner/server.go +++ b/common/pkgs/mq/scanner/server.go @@ -2,6 +2,7 @@ package scanner import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -25,6 +26,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { func(msg *mq.Message) (*mq.Message, error) { return msgDispatcher.Handle(srv.service, msg) }, + cfg.RabbitMQParam, ) if err != nil { return nil, err @@ -39,8 +41,8 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Serve() error { - return s.rabbitSvr.Serve() +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { + return s.rabbitSvr.Start() } func (s *Server) OnError(callback func(error)) { diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index 7f08173..8eca5e5 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" "os" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -45,20 +46,35 @@ func serve(configPath string) { }) // 启动服务 - go serveCoorServer(coorSvr) + go serveCoorServer(coorSvr, config.Cfg().RabbitMQ) forever := make(chan bool) <-forever } -func serveCoorServer(server *coormq.Server) { +func serveCoorServer(server *coormq.Server, cfg stgmq.Config) { logger.Info("start serving command server") - err := server.Serve() - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) + ch := server.Start(cfg) + if ch == nil { + logger.Errorf("RabbitMQ logEvent is nil") + os.Exit(1) } + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("rabbitmq connect with error: %v", val) + case int: + break + } + } logger.Info("command server stopped") // TODO 仅简单结束了程序 diff --git a/coordinator/internal/mq/temp.go b/coordinator/internal/mq/temp.go index abe21f7..5c2a561 100644 --- a/coordinator/internal/mq/temp.go +++ b/coordinator/internal/mq/temp.go @@ -1,10 +1,9 @@ package mq import ( - "database/sql" "fmt" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" - "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" @@ -18,15 +17,15 @@ func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatab var pkgs []cdssdk.Package var objs []stgmod.ObjectDetail - err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + err := svc.db2.DoTx(func(tx db2.SQLContext) error { var err error - bkts, err = svc.db.Bucket().GetUserBuckets(tx, msg.UserID) + bkts, err = svc.db2.Bucket().GetUserBuckets(tx, msg.UserID) if err != nil { return fmt.Errorf("get user buckets: %w", err) } for _, bkt := range bkts { - ps, err := svc.db.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID) + ps, err := svc.db2.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID) if err != nil { return fmt.Errorf("get bucket packages: %w", err) } @@ -34,7 +33,7 @@ func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatab } for _, pkg := range pkgs { - os, err := svc.db.Object().GetPackageObjectDetails(tx, pkg.PackageID) + os, err := svc.db2.Object().GetPackageObjectDetails(tx, pkg.PackageID) if err != nil { return fmt.Errorf("get package object details: %w", err) } diff --git a/scanner/main.go b/scanner/main.go index 064430b..aa55320 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -85,13 +85,29 @@ func serveEventExecutor(executor *event.Executor) { func serveScannerServer(server *scmq.Server) { logger.Info("start serving scanner server") - err := server.Serve() - - if err != nil { - logger.Errorf("scanner server stopped with error: %s", err.Error()) + ch := server.Start() + if ch == nil { + logger.Errorf("RabbitMQ logEvent is nil") + os.Exit(1) } - logger.Info("scanner server stopped") + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("rabbitmq connect with error: %v", val) + case int: + if val == 1 { + break + } + } + } + logger.Info("command server stopped") // TODO 仅简单结束了程序 os.Exit(1)