diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index 11865d4..27891fc 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/scanner/internal/config" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" - agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) @@ -80,21 +80,51 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { } // 然后向代理端发送移动文件的请求 - agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ) + agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } defer agentClient.Close() - err = agentClient.PostEvent( - agtevt.NewCheckCache(isComplete, caches), - execCtx.Option.IsEmergency, // 继承本任务的执行选项 - execCtx.Option.DontMerge) + checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFSBody(isComplete, caches)) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) return } + if checkResp.IsFailed() { + log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) + return + } + + // 根据返回结果修改数据库 + for _, entry := range checkResp.Body.Entries { + switch entry.Operation { + case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP: + err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) + if err != nil { + log.WithField("FileHash", entry.FileHash). + WithField("NodeID", t.NodeID). + Warnf("delete temp cache failed, err: %s", err.Error()) + } + + log.WithField("FileHash", entry.FileHash). + WithField("NodeID", t.NodeID). + Debugf("delete temp cache") + + case agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP: + err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) + if err != nil { + log.WithField("FileHash", entry.FileHash). + WithField("NodeID", t.NodeID). + Warnf("create temp cache failed, err: %s", err.Error()) + } + + log.WithField("FileHash", entry.FileHash). + WithField("NodeID", t.NodeID). + Debugf("create temp cache") + } + } } func init() { RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) }) diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index 6fa20fe..1ea3d04 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/db/model" mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" - agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) @@ -72,7 +72,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ) + agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return @@ -80,10 +80,30 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { defer agentClient.Close() // 紧急任务 - err = agentClient.PostEvent(agtevt.NewCheckState(), true, true) + getResp, err := agentClient.GetState(agtmsg.NewGetStateBody()) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) } + if getResp.IsFailed() { + log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error()) + return + } + + if getResp.Body.IPFSState != consts.IPFS_STATE_OK { + log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState) + + err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) + } + return + } + + // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal + err = mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) + } } func init() { diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index ea01a5e..4b19d6c 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/db/model" mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" - agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) @@ -96,20 +96,57 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { } // 投递任务 - agentClient, err := agtcli.NewAgentClient(stg.NodeID, &config.Cfg().RabbitMQ) + agentClient, err := agtcli.NewClient(stg.NodeID, &config.Cfg().RabbitMQ) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } defer agentClient.Close() - err = agentClient.PostEvent( - agtevt.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects), - execCtx.Option.IsEmergency, // 继承本任务的执行选项 - execCtx.Option.DontMerge) + checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorageBody(stg.StorageID, stg.Directory, isComplete, objects)) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", err.Error()) } + if checkResp.IsFailed() { + log.WithField("NodeID", stg.NodeID).Warnf("agent operation failed, err: %s", err.Error()) + return + } + + var chkObjIDs []int + for _, entry := range checkResp.Body.Entries { + switch entry.Operation { + case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: + err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) + if err != nil { + log.WithField("StorageID", t.StorageID). + WithField("ObjectID", entry.ObjectID). + Warnf("delete storage object failed, err: %s", err.Error()) + } + chkObjIDs = append(chkObjIDs, entry.ObjectID) + + log.WithField("StorageID", t.StorageID). + WithField("ObjectID", entry.ObjectID). + WithField("UserID", entry.UserID). + Debugf("delete storage object") + + case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL: + err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) + if err != nil { + log.WithField("StorageID", t.StorageID). + WithField("ObjectID", entry.ObjectID). + Warnf("change storage object state failed, err: %s", err.Error()) + } + + log.WithField("StorageID", t.StorageID). + WithField("ObjectID", entry.ObjectID). + WithField("UserID", entry.UserID). + Debugf("set storage object normal") + } + } + + if len(chkObjIDs) > 0 { + execCtx.Executor.Post(NewCheckObject(chkObjIDs)) + } } func init() { diff --git a/internal/event/update_agent_state.go b/internal/event/update_agent_state.go deleted file mode 100644 index 1669738..0000000 --- a/internal/event/update_agent_state.go +++ /dev/null @@ -1,47 +0,0 @@ -package event - -import ( - "gitlink.org.cn/cloudream/common/consts" - "gitlink.org.cn/cloudream/common/pkg/logger" - mysql "gitlink.org.cn/cloudream/db/sql" - scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" -) - -type UpdateAgentState struct { - scevt.UpdateAgentState -} - -func NewUpdateAgentState(nodeID int, ipfsState string) *UpdateAgentState { - return &UpdateAgentState{ - UpdateAgentState: scevt.NewUpdateAgentState(nodeID, ipfsState), - } -} - -func (t *UpdateAgentState) TryMerge(other Event) bool { - return false -} - -func (t *UpdateAgentState) Execute(execCtx ExecuteContext) { - log := logger.WithType[UpdateAgentState]("Event") - log.Debugf("begin with %v", logger.FormatStruct(t)) - - if t.IPFSState != consts.IPFS_STATUS_OK { - log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSState) - - err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) - } - return - } - - // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal - err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) - } -} - -func init() { - RegisterMessageConvertor(func(msg scevt.UpdateAgentState) Event { return NewUpdateAgentState(msg.NodeID, msg.IPFSState) }) -} diff --git a/internal/event/update_cache.go b/internal/event/update_cache.go deleted file mode 100644 index 173838c..0000000 --- a/internal/event/update_cache.go +++ /dev/null @@ -1,70 +0,0 @@ -package event - -import ( - evtcst "gitlink.org.cn/cloudream/common/consts/event" - "gitlink.org.cn/cloudream/common/pkg/logger" - scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" -) - -type UpdateCacheEntry = scevt.UpdateCacheEntry - -type UpdateCache struct { - scevt.UpdateCache -} - -func NewUpdateCache(nodeID int, entries []scevt.UpdateCacheEntry) *UpdateCache { - return &UpdateCache{ - UpdateCache: scevt.NewUpdateCache(nodeID, entries), - } -} - -func (t *UpdateCache) TryMerge(other Event) bool { - event, ok := other.(*UpdateCache) - if !ok { - return false - } - if event.NodeID != t.NodeID { - return false - } - - // TODO 可以考虑合并同FileHash和NodeID的记录 - t.Entries = append(t.Entries, event.Entries...) - return true -} - -func (t *UpdateCache) Execute(execCtx ExecuteContext) { - log := logger.WithType[UpdateCache]("Event") - log.Debugf("begin with %v", logger.FormatStruct(t)) - - for _, entry := range t.Entries { - switch entry.Operation { - case evtcst.UPDATE_CACHE_DELETE_TEMP: - err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) - if err != nil { - log.WithField("FileHash", entry.FileHash). - WithField("NodeID", t.NodeID). - Warnf("delete temp cache failed, err: %s", err.Error()) - } - - log.WithField("FileHash", entry.FileHash). - WithField("NodeID", t.NodeID). - Debugf("delete temp cache") - - case evtcst.UPDATE_CACHE_CREATE_TEMP: - err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) - if err != nil { - log.WithField("FileHash", entry.FileHash). - WithField("NodeID", t.NodeID). - Warnf("create temp cache failed, err: %s", err.Error()) - } - - log.WithField("FileHash", entry.FileHash). - WithField("NodeID", t.NodeID). - Debugf("create temp cache") - } - } -} - -func init() { - RegisterMessageConvertor(func(msg scevt.UpdateCache) Event { return NewUpdateCache(msg.NodeID, msg.Entries) }) -} diff --git a/internal/event/update_storage.go b/internal/event/update_storage.go deleted file mode 100644 index 61e9851..0000000 --- a/internal/event/update_storage.go +++ /dev/null @@ -1,87 +0,0 @@ -package event - -import ( - tskcst "gitlink.org.cn/cloudream/common/consts/event" - "gitlink.org.cn/cloudream/common/pkg/logger" - mysql "gitlink.org.cn/cloudream/db/sql" - scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" -) - -type UpdateStorageEntry = scevt.UpdateStorageEntry -type UpdateStorage struct { - scevt.UpdateStorage -} - -func NewUpdateStorage(storageID int, dirState string, entries []UpdateStorageEntry) *UpdateStorage { - return &UpdateStorage{ - UpdateStorage: scevt.NewUpdateStorage(storageID, dirState, entries), - } -} - -func (t *UpdateStorage) TryMerge(other Event) bool { - event, ok := other.(*UpdateStorage) - if !ok { - return false - } - if event.StorageID != t.StorageID { - return false - } - - // 后投递的任务的状态更新一些 - t.DirectoryState = event.DirectoryState - // TODO 可以考虑合并同FileHash和NodeID的记录 - t.Entries = append(t.Entries, event.Entries...) - return true -} - -func (t *UpdateStorage) Execute(execCtx ExecuteContext) { - log := logger.WithType[UpdateStorage]("Event") - log.Debugf("begin with %v", logger.FormatStruct(t)) - - err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryState) - if err != nil { - log.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error()) - } - - var chkObjIDs []int - for _, entry := range t.Entries { - switch entry.Operation { - case tskcst.UPDATE_STORAGE_DELETE: - err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) - if err != nil { - log.WithField("StorageID", t.StorageID). - WithField("ObjectID", entry.ObjectID). - Warnf("delete storage object failed, err: %s", err.Error()) - } - chkObjIDs = append(chkObjIDs, entry.ObjectID) - - log.WithField("StorageID", t.StorageID). - WithField("ObjectID", entry.ObjectID). - WithField("UserID", entry.UserID). - Debugf("delete storage object") - - case tskcst.UPDATE_STORAGE_SET_NORMAL: - err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) - if err != nil { - log.WithField("StorageID", t.StorageID). - WithField("ObjectID", entry.ObjectID). - Warnf("change storage object state failed, err: %s", err.Error()) - } - - log.WithField("StorageID", t.StorageID). - WithField("ObjectID", entry.ObjectID). - WithField("UserID", entry.UserID). - Debugf("set storage object normal") - } - } - - if len(chkObjIDs) > 0 { - execCtx.Executor.Post(NewCheckObject(chkObjIDs)) - } -} - -func init() { - RegisterMessageConvertor(func(msg scevt.UpdateStorage) Event { - return NewUpdateStorage(msg.StorageID, msg.DirectoryState, msg.Entries) - }) -} diff --git a/main.go b/main.go index c40ad05..624e537 100644 --- a/main.go +++ b/main.go @@ -38,7 +38,7 @@ func main() { eventExecutor := event.NewExecutor(db) go serveEventExecutor(&eventExecutor, &wg) - agtSvr, err := scsvr.NewScannerServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) + agtSvr, err := scsvr.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -70,7 +70,7 @@ func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { wg.Done() } -func serveScannerServer(server *scsvr.ScannerServer, wg *sync.WaitGroup) { +func serveScannerServer(server *scsvr.Server, wg *sync.WaitGroup) { log.Info("start serving scanner server") err := server.Serve()