diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index 38e9c2b..de84815 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -2,11 +2,13 @@ package event import ( "database/sql" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" "gitlink.org.cn/cloudream/scanner/internal/config" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" @@ -132,18 +134,14 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca } defer agentClient.Close() - checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFSBody(isComplete, caches)) + checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFS(isComplete, caches), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - return - } - if checkResp.IsFailed() { - log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) + log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) return } // 根据返回结果修改数据库 - for _, entry := range checkResp.Body.Entries { + for _, entry := range checkResp.Entries { switch entry.Operation { case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP: err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index 1b17d4b..5738f89 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -91,18 +92,14 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } defer agentClient.Close() - getResp, err := agentClient.GetState(agtmsg.NewGetStateBody()) + getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - } - if getResp.IsFailed() { - log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) - return + log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) } // 根据返回结果修改节点状态 - if getResp.Body.IPFSState != consts.IPFS_STATE_OK { - log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState) + if getResp.IPFSState != consts.IPFS_STATE_OK { + log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.IPFSState) err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 1a6e5e4..9f981ec 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -2,12 +2,14 @@ package event import ( "database/sql" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" + "gitlink.org.cn/cloudream/rabbitmq" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -149,18 +151,14 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage } defer agentClient.Close() - checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorageBody(stg.StorageID, stg.Directory, isComplete, objects)) + checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { - log.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", err.Error()) - } - if checkResp.IsFailed() { - log.WithField("NodeID", stg.NodeID).Warnf("agent operation failed, err: %s", err.Error()) - return + log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) } // 根据返回结果修改数据库 var chkObjIDs []int - for _, entry := range checkResp.Body.Entries { + for _, entry := range checkResp.Entries { switch entry.Operation { case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: err := execCtx.Args.DB.StorageObject().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) diff --git a/internal/services/event.go b/internal/services/event.go index 5790c20..730d69b 100644 --- a/internal/services/event.go +++ b/internal/services/event.go @@ -9,7 +9,7 @@ import ( func (svc *Service) PostEvent(msg *scmsg.PostEvent) { - evtMsg, err := scevt.MapToMessage(msg.Body.Event) + evtMsg, err := scevt.MapToMessage(msg.Event) if err != nil { logger.Warnf("convert map to event message failed, err: %s", err.Error()) return @@ -22,7 +22,7 @@ func (svc *Service) PostEvent(msg *scmsg.PostEvent) { } svc.eventExecutor.Post(evt, event.ExecuteOption{ - IsEmergency: msg.Body.IsEmergency, - DontMerge: msg.Body.DontMerge, + IsEmergency: msg.IsEmergency, + DontMerge: msg.DontMerge, }) }