From a17da12dfe22b305a2835be54e1514ed33ed1bd0 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 21 Jul 2023 10:43:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rabbitm=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E6=94=AF=E6=8C=81=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/agent_check_cache.go | 12 +++++------- internal/event/agent_check_state.go | 13 +++++-------- internal/event/agent_check_storage.go | 12 +++++------- internal/services/event.go | 6 +++--- 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index 38e9c2b..de84815 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -2,11 +2,13 @@ package event import ( "database/sql" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" "gitlink.org.cn/cloudream/scanner/internal/config" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" @@ -132,18 +134,14 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca } defer agentClient.Close() - checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFSBody(isComplete, caches)) + checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFS(isComplete, caches), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - return - } - if checkResp.IsFailed() { - log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) + log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) return } // 根据返回结果修改数据库 - for _, entry := range checkResp.Body.Entries { + for _, entry := range checkResp.Entries { switch entry.Operation { case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP: err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index 1b17d4b..5738f89 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -91,18 +92,14 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } defer agentClient.Close() - getResp, err := agentClient.GetState(agtmsg.NewGetStateBody()) + getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - } - if getResp.IsFailed() { - log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) - return + log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) } // 根据返回结果修改节点状态 - if getResp.Body.IPFSState != consts.IPFS_STATE_OK { - log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState) + if getResp.IPFSState != consts.IPFS_STATE_OK { + log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.IPFSState) err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 1a6e5e4..9f981ec 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -2,12 +2,14 @@ package event import ( "database/sql" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -149,18 +151,14 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage } defer agentClient.Close() - checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorageBody(stg.StorageID, stg.Directory, isComplete, objects)) + checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - } - if checkResp.IsFailed() { - log.WithField("NodeID", stg.NodeID).Warnf("agent operation failed, err: %s", err.Error()) - return + log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) } // 根据返回结果修改数据库 var chkObjIDs []int - for _, entry := range checkResp.Body.Entries { + for _, entry := range checkResp.Entries { switch entry.Operation { case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: err := execCtx.Args.DB.StorageObject().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) diff --git a/internal/services/event.go b/internal/services/event.go index 5790c20..730d69b 100644 --- a/internal/services/event.go +++ b/internal/services/event.go @@ -9,7 +9,7 @@ import ( func (svc *Service) PostEvent(msg *scmsg.PostEvent) { - evtMsg, err := scevt.MapToMessage(msg.Body.Event) + evtMsg, err := scevt.MapToMessage(msg.Event) if err != nil { logger.Warnf("convert map to event message failed, err: %s", err.Error()) return @@ -22,7 +22,7 @@ func (svc *Service) PostEvent(msg *scmsg.PostEvent) { } svc.eventExecutor.Post(evt, event.ExecuteOption{ - IsEmergency: msg.Body.IsEmergency, - DontMerge: msg.Body.DontMerge, + IsEmergency: msg.IsEmergency, + DontMerge: msg.DontMerge, }) }