diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go index c9bda5a..8d21db9 100644 --- a/internal/services/cmd/agent.go +++ b/internal/services/cmd/agent.go @@ -6,7 +6,7 @@ import ( agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" ) -func (svc *Service) GetState(msg *agtmsg.GetState) *agtmsg.GetStateResp { +func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *ramsg.CodeMessage) { var ipfsState string if svc.ipfs.IsUp() { diff --git a/internal/services/cmd/ipfs.go b/internal/services/cmd/ipfs.go index 07bb8dc..ade8741 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/cmd/ipfs.go @@ -13,7 +13,7 @@ import ( agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" ) -func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) *agtmsg.CheckIPFSResp { +func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { filesMap, err := svc.ipfs.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) @@ -21,16 +21,16 @@ func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) *agtmsg.CheckIPFSResp { } // TODO 根据锁定清单过滤被锁定的文件的记录 - if msg.Body.IsComplete { + if msg.IsComplete { return svc.checkComplete(msg, filesMap) } else { return svc.checkIncrement(msg, filesMap) } } -func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) *agtmsg.CheckIPFSResp { +func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { var entries []agtmsg.CheckIPFSRespEntry - for _, cache := range msg.Body.Caches { + for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] if ok { if cache.State == consts.CACHE_STATE_PINNED { @@ -59,12 +59,12 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh // 增量情况下,不需要对filesMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewCheckIPFSRespBody(entries)) + return ramsg.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) } -func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) *agtmsg.CheckIPFSResp { +func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { var entries []agtmsg.CheckIPFSRespEntry - for _, cache := range msg.Body.Caches { + for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] if ok { if cache.State == consts.CACHE_STATE_PINNED { @@ -100,5 +100,5 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she entries = append(entries, agtmsg.NewCheckIPFSRespEntry(hash, agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP)) } - return ramsg.ReplyOK(agtmsg.NewCheckIPFSRespBody(entries)) + return ramsg.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) } diff --git a/internal/services/cmd/object.go b/internal/services/cmd/object.go index 17d2faa..31ae03a 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/cmd/object.go @@ -10,29 +10,29 @@ import ( agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" ) -func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) *agtmsg.StartPinningObjectResp { - log.WithField("FileHash", msg.Body.FileHash).Debugf("pin object") +func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.StartPinningObjectResp, *ramsg.CodeMessage) { + log.WithField("FileHash", msg.FileHash).Debugf("pin object") - tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.Body.FileHash)) + tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) if tsk.Error() != nil { - log.WithField("FileHash", msg.Body.FileHash). + log.WithField("FileHash", msg.FileHash). Warnf("pin object failed, err: %s", tsk.Error().Error()) return ramsg.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OPERATION_FAILED, "pin object failed") } - return ramsg.ReplyOK(agtmsg.NewStartPinningObjectRespBody(tsk.ID())) + return ramsg.ReplyOK(agtmsg.NewStartPinningObjectResp(tsk.ID())) } -func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) *agtmsg.WaitPinningObjectResp { - log.WithField("TaskID", msg.Body.TaskID).Debugf("wait pinning object") +func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.WaitPinningObjectResp, *ramsg.CodeMessage) { + log.WithField("TaskID", msg.TaskID).Debugf("wait pinning object") - tsk := svc.taskManager.FindByID(msg.Body.TaskID) + tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { return ramsg.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TASK_NOT_FOUND, "task not found") } - if msg.Body.WaitTimeoutMs == 0 { + if msg.WaitTimeoutMs == 0 { tsk.Wait() errMsg := "" @@ -40,19 +40,19 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) *agtmsg.Wai errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectRespBody(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) } else { - if tsk.WaitTimeout(time.Duration(msg.Body.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { errMsg := "" if tsk.Error() != nil { errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectRespBody(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectRespBody(false, "")) + return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(false, "")) } } diff --git a/internal/services/cmd/storage.go b/internal/services/cmd/storage.go index 1138c95..0ab3bf0 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/cmd/storage.go @@ -24,18 +24,18 @@ import ( agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" ) -func (service *Service) StartMovingObjectToStorage(msg *agtmsg.StartMovingObjectToStorage) *agtmsg.StartMovingObjectToStorageResp { - outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID) - outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory, outFileName) +func (service *Service) StartMovingObjectToStorage(msg *agtmsg.StartMovingObjectToStorage) (*agtmsg.StartMovingObjectToStorageResp, *ramsg.CodeMessage) { + outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID) + outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, outFileName) - if msg.Body.Redundancy == consts.REDUNDANCY_REP { + if msg.Redundancy == consts.REDUNDANCY_REP { taskID, err := service.moveRepObject(msg, outFilePath) if err != nil { logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error()) return ramsg.ReplyFailed[agtmsg.StartMovingObjectToStorageResp](errorcode.OPERATION_FAILED, "move rep object failed") } - return ramsg.ReplyOK(agtmsg.NewStartMovingObjectToStorageRespBody(taskID)) + return ramsg.ReplyOK(agtmsg.NewStartMovingObjectToStorageResp(taskID)) } else { // TODO 处理其他备份类型 @@ -46,7 +46,7 @@ func (service *Service) StartMovingObjectToStorage(msg *agtmsg.StartMovingObject func (svc *Service) moveRepObject(msg *agtmsg.StartMovingObjectToStorage, outFilePath string) (string, error) { var repInfo ramsg.ObjectRepInfo - err := serder.MapToObject(msg.Body.RedundancyData.(map[string]any), &repInfo) + err := serder.MapToObject(msg.RedundancyData.(map[string]any), &repInfo) if err != nil { return "", fmt.Errorf("redundancy data to rep info failed, err: %w", err) } @@ -55,15 +55,15 @@ func (svc *Service) moveRepObject(msg *agtmsg.StartMovingObjectToStorage, outFil return tsk.ID(), nil } -func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) *agtmsg.WaitMovingObjectResp { - log.WithField("TaskID", msg.Body.TaskID).Debugf("wait moving object") +func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) (*agtmsg.WaitMovingObjectResp, *ramsg.CodeMessage) { + log.WithField("TaskID", msg.TaskID).Debugf("wait moving object") - tsk := svc.taskManager.FindByID(msg.Body.TaskID) + tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { return ramsg.ReplyFailed[agtmsg.WaitMovingObjectResp](errorcode.TASK_NOT_FOUND, "task not found") } - if msg.Body.WaitTimeoutMs == 0 { + if msg.WaitTimeoutMs == 0 { tsk.Wait() errMsg := "" @@ -71,30 +71,30 @@ func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) *agtmsg.WaitM errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectRespBody(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg)) } else { - if tsk.WaitTimeout(time.Duration(msg.Body.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { errMsg := "" if tsk.Error() != nil { errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectRespBody(true, errMsg)) + return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg)) } - return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectRespBody(false, "")) + return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(false, "")) } } -func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) *agtmsg.CheckStorageResp { - dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory) +func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { + dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) infos, err := ioutil.ReadDir(dirFullPath) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) - return ramsg.ReplyOK(agtmsg.NewCheckStorageRespBody( + return ramsg.ReplyOK(agtmsg.NewCheckStorageResp( err.Error(), nil, )) @@ -102,21 +102,21 @@ func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) *agtmsg.CheckStorageR fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() }) - if msg.Body.IsComplete { + if msg.IsComplete { return svc.checkStorageComplete(msg, fileInfos) } else { return svc.checkStorageIncrement(msg, fileInfos) } } -func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) *agtmsg.CheckStorageResp { +func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info } var entries []agtmsg.CheckStorageRespEntry - for _, obj := range msg.Body.Objects { + for _, obj := range msg.Objects { fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID) _, ok := infosMap[fileName] @@ -133,10 +133,10 @@ func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos [] // 增量情况下,不需要对infosMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewCheckStorageRespBody(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) } -func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) *agtmsg.CheckStorageResp { +func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { @@ -144,7 +144,7 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []f } var entries []agtmsg.CheckStorageRespEntry - for _, obj := range msg.Body.Objects { + for _, obj := range msg.Objects { fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID) _, ok := infosMap[fileName] @@ -161,7 +161,7 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []f // Storage中多出来的文件不做处理 - return ramsg.ReplyOK(agtmsg.NewCheckStorageRespBody(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) } /*