From eedc5a4e82b200eaf3d448b94977f5debfaf7de1 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 24 May 2023 14:57:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8Coordinator=E4=B8=AD=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=A7=A6=E5=8F=91=E4=BA=8B=E4=BB=B6=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/object.go | 26 +++++++++++++++++++++++++- internal/services/service.go | 13 +++++++++---- main.go | 8 +++++++- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/internal/services/object.go b/internal/services/object.go index 8de10cb..b5a3a74 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -7,10 +7,13 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/utils/logger" log "gitlink.org.cn/cloudream/common/utils/logger" "gitlink.org.cn/cloudream/db/model" ramsg "gitlink.org.cn/cloudream/rabbitmq/message" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" + scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp { @@ -180,7 +183,8 @@ func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.Creat return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed") } - // TODO 通知scanner检查备份数 + // 紧急任务 + svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)) return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody()) } @@ -255,6 +259,9 @@ func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.Updat } + // 紧急任务 + svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)) + return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody()) } @@ -268,5 +275,22 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed") } + stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID) + if err != nil { + logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error()) + return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody()) + } + + // 不追求及时、准确 + if len(stgs) == 0 { + // 如果没有被引用,直接投递CheckObject的任务 + svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false)) + } else { + // 有引用则让Agent去检查StorageObject + for _, stg := range stgs { + svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false)) + } + } + return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody()) } diff --git a/internal/services/service.go b/internal/services/service.go index 9bc1d23..8dbda03 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -1,13 +1,18 @@ package services -import mydb "gitlink.org.cn/cloudream/db" +import ( + mydb "gitlink.org.cn/cloudream/db" + sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner" +) type Service struct { - db *mydb.DB + db *mydb.DB + scanner *sccli.ScannerClient } -func NewService(db *mydb.DB) *Service { +func NewService(db *mydb.DB, scanner *sccli.ScannerClient) *Service { return &Service{ - db: db, + db: db, + scanner: scanner, } } diff --git a/main.go b/main.go index 92ecb7a..96c69ae 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/coordinator/internal/config" "gitlink.org.cn/cloudream/coordinator/internal/services" mydb "gitlink.org.cn/cloudream/db" + sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner" rasvr "gitlink.org.cn/cloudream/rabbitmq/server/coordinator" ) @@ -30,7 +31,12 @@ func main() { log.Fatalf("new db failed, err: %s", err.Error()) } - coorSvr, err := rasvr.NewCoordinatorServer(services.NewService(db), &config.Cfg().RabbitMQ) + scanner, err := sccli.NewScannerClient(&config.Cfg().RabbitMQ) + if err != nil { + log.Fatalf("new scanner client failed, err: %s", err.Error()) + } + + coorSvr, err := rasvr.NewCoordinatorServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new coordinator server failed, err: %s", err.Error()) }