Browse Source

增加rabbitmq断连重试机制

gitlink
JeshuaRen 1 year ago
parent
commit
d0ef067a92
9 changed files with 91 additions and 36 deletions
  1. +20
    -4
      agent/main.go
  2. +3
    -5
      common/pkgs/ioswitch2/ops2/multipart.go
  3. +4
    -2
      common/pkgs/mq/agent/server.go
  4. +9
    -5
      common/pkgs/mq/config.go
  5. +4
    -2
      common/pkgs/mq/coordinator/server.go
  6. +4
    -2
      common/pkgs/mq/scanner/server.go
  7. +21
    -5
      coordinator/internal/cmd/serve.go
  8. +5
    -6
      coordinator/internal/mq/temp.go
  9. +21
    -5
      scanner/main.go

+ 20
- 4
agent/main.go View File

@@ -148,12 +148,28 @@ func main() {
func serveAgentServer(server *agtmq.Server) {
logger.Info("start serving command server")

err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
ch := server.Start()
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}

for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}

switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
if val == 1 {
break
}
}
}
logger.Info("command server stopped")

// TODO 仅简单结束了程序


+ 3
- 5
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -15,8 +15,8 @@ func init() {

type MultipartManage struct {
Address cdssdk.StorageAddress `json:"address"`
UploadID *exec.StringVar `json:"uploadID"`
ObjectID *exec.StringVar `json:"objectID"`
UploadID *exec.VarID `json:"uploadID"`
ObjectID *exec.VarID `json:"objectID"`
}

func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
@@ -41,8 +41,6 @@ func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error
if err != nil {
return err
}
o.UploadID.Value = uploadID
e.PutVars(o.UploadID)

objectID, err := client.CompleteMultipartUpload()
if err != nil {
@@ -79,7 +77,7 @@ func (t *MultipartManageNode) GenerateOp() (exec.Op, error) {

type MultipartUpload struct {
Address cdssdk.StorageAddress `json:"address"`
FileMD5 *exec.StringVar `json:"fileMD5"`
FileMD5 *exec.VarID `json:"fileMD5"`
}

func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {


+ 4
- 2
common/pkgs/mq/agent/server.go View File

@@ -2,6 +2,7 @@ package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

@@ -29,6 +30,7 @@ func NewServer(svc Service, id int64, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@@ -43,8 +45,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start()
}

func (s *Server) OnError(callback func(error)) {


+ 9
- 5
common/pkgs/mq/config.go View File

@@ -1,12 +1,16 @@
package mq

import "fmt"
import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
RabbitMQParam mq.RabbitMQParam `json:"param"`
}

func (cfg *Config) MakeConnectingURL() string {


+ 4
- 2
common/pkgs/mq/coordinator/server.go View File

@@ -2,6 +2,7 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

@@ -38,6 +39,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@@ -51,8 +53,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start(cfg mymq.Config) *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start(cfg)
}

func (s *Server) OnError(callback func(error)) {


+ 4
- 2
common/pkgs/mq/scanner/server.go View File

@@ -2,6 +2,7 @@ package scanner

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

@@ -25,6 +26,7 @@ func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.RabbitMQParam,
)
if err != nil {
return nil, err
@@ -39,8 +41,8 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
return s.rabbitSvr.Start()
}

func (s *Server) OnError(callback func(error)) {


+ 21
- 5
coordinator/internal/cmd/serve.go View File

@@ -2,6 +2,7 @@ package cmd

import (
"fmt"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
"os"

"gitlink.org.cn/cloudream/common/pkgs/logger"
@@ -45,20 +46,35 @@ func serve(configPath string) {
})

// 启动服务
go serveCoorServer(coorSvr)
go serveCoorServer(coorSvr, config.Cfg().RabbitMQ)

forever := make(chan bool)
<-forever
}

func serveCoorServer(server *coormq.Server) {
func serveCoorServer(server *coormq.Server, cfg stgmq.Config) {
logger.Info("start serving command server")

err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
ch := server.Start(cfg)
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}

for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}

switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
break
}
}
logger.Info("command server stopped")

// TODO 仅简单结束了程序


+ 5
- 6
coordinator/internal/mq/temp.go View File

@@ -1,10 +1,9 @@
package mq

import (
"database/sql"
"fmt"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
@@ -18,15 +17,15 @@ func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatab
var pkgs []cdssdk.Package
var objs []stgmod.ObjectDetail

err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error
bkts, err = svc.db.Bucket().GetUserBuckets(tx, msg.UserID)
bkts, err = svc.db2.Bucket().GetUserBuckets(tx, msg.UserID)
if err != nil {
return fmt.Errorf("get user buckets: %w", err)
}

for _, bkt := range bkts {
ps, err := svc.db.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID)
ps, err := svc.db2.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID)
if err != nil {
return fmt.Errorf("get bucket packages: %w", err)
}
@@ -34,7 +33,7 @@ func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatab
}

for _, pkg := range pkgs {
os, err := svc.db.Object().GetPackageObjectDetails(tx, pkg.PackageID)
os, err := svc.db2.Object().GetPackageObjectDetails(tx, pkg.PackageID)
if err != nil {
return fmt.Errorf("get package object details: %w", err)
}


+ 21
- 5
scanner/main.go View File

@@ -85,13 +85,29 @@ func serveEventExecutor(executor *event.Executor) {
func serveScannerServer(server *scmq.Server) {
logger.Info("start serving scanner server")

err := server.Serve()
if err != nil {
logger.Errorf("scanner server stopped with error: %s", err.Error())
ch := server.Start()
if ch == nil {
logger.Errorf("RabbitMQ logEvent is nil")
os.Exit(1)
}

logger.Info("scanner server stopped")
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
break
}

switch val := val.(type) {
case error:
logger.Errorf("rabbitmq connect with error: %v", val)
case int:
if val == 1 {
break
}
}
}
logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)


Loading…
Cancel
Save