diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index 45d2691..cbc298b 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -20,14 +20,14 @@ func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *m ipfsCli, err := globals.IPFSPool.Acquire() if err != nil { logger.Warnf("new ipfs client: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed") + return nil, mq.Failed(errorcode.OperationFailed, "new ipfs client failed") } defer ipfsCli.Close() filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed") + return nil, mq.Failed(errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 @@ -124,7 +124,7 @@ func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*ag func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found") + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { diff --git a/agent/internal/services/mq/object.go b/agent/internal/services/mq/object.go index 451adf1..5331b84 100644 --- a/agent/internal/services/mq/object.go +++ b/agent/internal/services/mq/object.go @@ -18,7 +18,7 @@ func (svc *Service) StartPinningObject(msg *agtmq.StartPinningObject) (*agtmq.St if tsk.Error() != nil { log.WithField("FileHash", msg.FileHash). Warnf("pin object failed, err: %s", tsk.Error().Error()) - return mq.ReplyFailed[agtmq.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed") + return nil, mq.Failed(errorcode.OperationFailed, "pin object failed") } return mq.ReplyOK(agtmq.NewStartPinningObjectResp(tsk.ID())) @@ -29,7 +29,7 @@ func (svc *Service) WaitPinningObject(msg *agtmq.WaitPinningObject) (*agtmq.Wait tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return mq.ReplyFailed[agtmq.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found") + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index df845f9..6498a97 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -54,7 +54,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found") + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { diff --git a/common/pkgs/mq/agent/agent.go b/common/pkgs/mq/agent/agent.go index b4149c5..287d9b6 100644 --- a/common/pkgs/mq/agent/agent.go +++ b/common/pkgs/mq/agent/agent.go @@ -9,22 +9,24 @@ type AgentService interface { } // 获取agent状态 -var _ = Register(AgentService.GetState) +var _ = Register(Service.GetState) type GetState struct { + mq.MessageBodyBase } type GetStateResp struct { + mq.MessageBodyBase IPFSState string `json:"ipfsState"` } -func NewGetState() GetState { - return GetState{} +func NewGetState() *GetState { + return &GetState{} } -func NewGetStateResp(ipfsState string) GetStateResp { - return GetStateResp{ +func NewGetStateResp(ipfsState string) *GetStateResp { + return &GetStateResp{ IPFSState: ipfsState, } } -func (client *Client) GetState(msg GetState, opts ...mq.RequestOption) (*GetStateResp, error) { - return mq.Request[GetStateResp](client.rabbitCli, msg, opts...) +func (client *Client) GetState(msg *GetState, opts ...mq.RequestOption) (*GetStateResp, error) { + return mq.Request(Service.GetState, client.rabbitCli, msg, opts...) } diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index a1b1a90..4dbf7c0 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -13,7 +13,7 @@ type CacheService interface { } // 检查节点上的IPFS -var _ = Register(CacheService.CheckCache) +var _ = Register(Service.CheckCache) const ( CHECK_IPFS_RESP_OP_DELETE_TEMP = "DeleteTemp" @@ -21,10 +21,12 @@ const ( ) type CheckCache struct { + mq.MessageBodyBase IsComplete bool `json:"isComplete"` Caches []model.Cache `json:"caches"` } type CheckCacheResp struct { + mq.MessageBodyBase Entries []CheckIPFSRespEntry `json:"entries"` } type CheckIPFSRespEntry struct { @@ -32,14 +34,14 @@ type CheckIPFSRespEntry struct { Operation string `json:"operation"` } -func NewCheckCache(isComplete bool, caches []model.Cache) CheckCache { - return CheckCache{ +func NewCheckCache(isComplete bool, caches []model.Cache) *CheckCache { + return &CheckCache{ IsComplete: isComplete, Caches: caches, } } -func NewCheckCacheResp(entries []CheckIPFSRespEntry) CheckCacheResp { - return CheckCacheResp{ +func NewCheckCacheResp(entries []CheckIPFSRespEntry) *CheckCacheResp { + return &CheckCacheResp{ Entries: entries, } } @@ -49,60 +51,64 @@ func NewCheckCacheRespEntry(fileHash string, op string) CheckIPFSRespEntry { Operation: op, } } -func (client *Client) CheckCache(msg CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) { - return mq.Request[CheckCacheResp](client.rabbitCli, msg, opts...) +func (client *Client) CheckCache(msg *CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) { + return mq.Request(Service.CheckCache, client.rabbitCli, msg, opts...) } // 将Package的缓存移动到这个节点 -var _ = Register(CacheService.StartCacheMovePackage) +var _ = Register(Service.StartCacheMovePackage) type StartCacheMovePackage struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } type StartCacheMovePackageResp struct { + mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartCacheMovePackage(userID int64, packageID int64) StartCacheMovePackage { - return StartCacheMovePackage{ +func NewStartCacheMovePackage(userID int64, packageID int64) *StartCacheMovePackage { + return &StartCacheMovePackage{ UserID: userID, PackageID: packageID, } } -func NewStartCacheMovePackageResp(taskID string) StartCacheMovePackageResp { - return StartCacheMovePackageResp{ +func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp { + return &StartCacheMovePackageResp{ TaskID: taskID, } } -func (client *Client) StartCacheMovePackage(msg StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) { - return mq.Request[StartCacheMovePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) { + return mq.Request(Service.StartCacheMovePackage, client.rabbitCli, msg, opts...) } // 将Package的缓存移动到这个节点 -var _ = Register(CacheService.WaitCacheMovePackage) +var _ = Register(Service.WaitCacheMovePackage) type WaitCacheMovePackage struct { + mq.MessageBodyBase TaskID string `json:"taskID"` WaitTimeoutMs int64 `json:"waitTimeout"` } type WaitCacheMovePackageResp struct { + mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` } -func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) WaitCacheMovePackage { - return WaitCacheMovePackage{ +func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { + return &WaitCacheMovePackage{ TaskID: taskID, WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitCacheMovePackageResp(isComplete bool, err string) WaitCacheMovePackageResp { - return WaitCacheMovePackageResp{ +func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp { + return &WaitCacheMovePackageResp{ IsComplete: isComplete, Error: err, } } -func (client *Client) WaitCacheMovePackage(msg WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) { - return mq.Request[WaitCacheMovePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) { + return mq.Request(Service.WaitCacheMovePackage, client.rabbitCli, msg, opts...) } diff --git a/common/pkgs/mq/agent/object.go b/common/pkgs/mq/agent/object.go index 94f56ac..8fc5907 100644 --- a/common/pkgs/mq/agent/object.go +++ b/common/pkgs/mq/agent/object.go @@ -8,53 +8,57 @@ type ObjectService interface { } // 启动Pin对象的任务 -var _ = Register(ObjectService.StartPinningObject) +var _ = Register(Service.StartPinningObject) type StartPinningObject struct { + mq.MessageBodyBase FileHash string `json:"fileHash"` } type StartPinningObjectResp struct { + mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartPinningObject(fileHash string) StartPinningObject { - return StartPinningObject{ +func NewStartPinningObject(fileHash string) *StartPinningObject { + return &StartPinningObject{ FileHash: fileHash, } } -func NewStartPinningObjectResp(taskID string) StartPinningObjectResp { - return StartPinningObjectResp{ +func NewStartPinningObjectResp(taskID string) *StartPinningObjectResp { + return &StartPinningObjectResp{ TaskID: taskID, } } -func (client *Client) StartPinningObject(msg StartPinningObject, opts ...mq.RequestOption) (*StartPinningObjectResp, error) { - return mq.Request[StartPinningObjectResp](client.rabbitCli, msg, opts...) +func (client *Client) StartPinningObject(msg *StartPinningObject, opts ...mq.RequestOption) (*StartPinningObjectResp, error) { + return mq.Request(Service.StartPinningObject, client.rabbitCli, msg, opts...) } // 等待Pin对象的任务 -var _ = Register(ObjectService.WaitPinningObject) +var _ = Register(Service.WaitPinningObject) type WaitPinningObject struct { + mq.MessageBodyBase TaskID string `json:"taskID"` WaitTimeoutMs int64 `json:"waitTimeout"` } type WaitPinningObjectResp struct { + mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` } -func NewWaitPinningObject(taskID string, waitTimeoutMs int64) WaitPinningObject { - return WaitPinningObject{ +func NewWaitPinningObject(taskID string, waitTimeoutMs int64) *WaitPinningObject { + return &WaitPinningObject{ TaskID: taskID, WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitPinningObjectResp(isComplete bool, err string) WaitPinningObjectResp { - return WaitPinningObjectResp{ +func NewWaitPinningObjectResp(isComplete bool, err string) *WaitPinningObjectResp { + return &WaitPinningObjectResp{ IsComplete: isComplete, Error: err, } } -func (client *Client) WaitPinningObject(msg WaitPinningObject, opts ...mq.RequestOption) (*WaitPinningObjectResp, error) { - return mq.Request[WaitPinningObjectResp](client.rabbitCli, msg, opts...) +func (client *Client) WaitPinningObject(msg *WaitPinningObject, opts ...mq.RequestOption) (*WaitPinningObjectResp, error) { + return mq.Request(Service.WaitPinningObject, client.rabbitCli, msg, opts...) } diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 91dedb3..b2e6dd0 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -55,7 +55,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any { +func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { mq.AddServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() mq.RegisterMessage[TResp]() @@ -65,7 +65,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T // RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any { +func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any { mq.AddNoRespServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index d2b96be..9929b14 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -19,63 +19,67 @@ type StorageService interface { } // 启动调度Package的任务 -var _ = Register(StorageService.StartStorageLoadPackage) +var _ = Register(Service.StartStorageLoadPackage) type StartStorageLoadPackage struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` StorageID int64 `json:"storageID"` } type StartStorageLoadPackageResp struct { + mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartStorageLoadPackage(userID int64, packageID int64, storageID int64) StartStorageLoadPackage { - return StartStorageLoadPackage{ +func NewStartStorageLoadPackage(userID int64, packageID int64, storageID int64) *StartStorageLoadPackage { + return &StartStorageLoadPackage{ UserID: userID, PackageID: packageID, StorageID: storageID, } } -func NewStartStorageLoadPackageResp(taskID string) StartStorageLoadPackageResp { - return StartStorageLoadPackageResp{ +func NewStartStorageLoadPackageResp(taskID string) *StartStorageLoadPackageResp { + return &StartStorageLoadPackageResp{ TaskID: taskID, } } -func (client *Client) StartStorageLoadPackage(msg StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) { - return mq.Request[StartStorageLoadPackageResp](client.rabbitCli, msg, opts...) +func (client *Client) StartStorageLoadPackage(msg *StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) { + return mq.Request(Service.StartStorageLoadPackage, client.rabbitCli, msg, opts...) } // 等待调度Package的任务 -var _ = Register(StorageService.WaitStorageLoadPackage) +var _ = Register(Service.WaitStorageLoadPackage) type WaitStorageLoadPackage struct { + mq.MessageBodyBase TaskID string `json:"taskID"` WaitTimeoutMs int64 `json:"waitTimeout"` } type WaitStorageLoadPackageResp struct { + mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` } -func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) WaitStorageLoadPackage { - return WaitStorageLoadPackage{ +func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage { + return &WaitStorageLoadPackage{ TaskID: taskID, WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageLoadPackageResp(isComplete bool, err string) WaitStorageLoadPackageResp { - return WaitStorageLoadPackageResp{ +func NewWaitStorageLoadPackageResp(isComplete bool, err string) *WaitStorageLoadPackageResp { + return &WaitStorageLoadPackageResp{ IsComplete: isComplete, Error: err, } } -func (client *Client) WaitStorageLoadPackage(msg WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { - return mq.Request[WaitStorageLoadPackageResp](client.rabbitCli, msg, opts...) +func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { + return mq.Request(Service.WaitStorageLoadPackage, client.rabbitCli, msg, opts...) } // 检查Storage -var _ = Register(StorageService.StorageCheck) +var _ = Register(Service.StorageCheck) const ( CHECK_STORAGE_RESP_OP_DELETE = "Delete" @@ -83,12 +87,14 @@ const ( ) type StorageCheck struct { + mq.MessageBodyBase StorageID int64 `json:"storageID"` Directory string `json:"directory"` IsComplete bool `json:"isComplete"` Packages []model.StoragePackage `json:"packages"` } type StorageCheckResp struct { + mq.MessageBodyBase DirectoryState string `json:"directoryState"` Entries []StorageCheckRespEntry `json:"entries"` } @@ -98,16 +104,16 @@ type StorageCheckRespEntry struct { Operation string `json:"operation"` } -func NewStorageCheck(storageID int64, directory string, isComplete bool, packages []model.StoragePackage) StorageCheck { - return StorageCheck{ +func NewStorageCheck(storageID int64, directory string, isComplete bool, packages []model.StoragePackage) *StorageCheck { + return &StorageCheck{ StorageID: storageID, Directory: directory, IsComplete: isComplete, Packages: packages, } } -func NewStorageCheckResp(dirState string, entries []StorageCheckRespEntry) StorageCheckResp { - return StorageCheckResp{ +func NewStorageCheckResp(dirState string, entries []StorageCheckRespEntry) *StorageCheckResp { + return &StorageCheckResp{ DirectoryState: dirState, Entries: entries, } @@ -119,14 +125,15 @@ func NewStorageCheckRespEntry(packageID int64, userID int64, op string) StorageC Operation: op, } } -func (client *Client) StorageCheck(msg StorageCheck, opts ...mq.RequestOption) (*StorageCheckResp, error) { - return mq.Request[StorageCheckResp](client.rabbitCli, msg, opts...) +func (client *Client) StorageCheck(msg *StorageCheck, opts ...mq.RequestOption) (*StorageCheckResp, error) { + return mq.Request(Service.StorageCheck, client.rabbitCli, msg, opts...) } // 启动从Storage上传Package的任务 -var _ = Register(StorageService.StartStorageCreatePackage) +var _ = Register(Service.StartStorageCreatePackage) type StartStorageCreatePackage struct { + mq.MessageBodyBase UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` Name string `json:"name"` @@ -136,11 +143,12 @@ type StartStorageCreatePackage struct { NodeAffinity *int64 `json:"nodeAffinity"` } type StartStorageCreatePackageResp struct { + mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) StartStorageCreatePackage { - return StartStorageCreatePackage{ +func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage { + return &StartStorageCreatePackage{ UserID: userID, BucketID: bucketID, Name: name, @@ -150,41 +158,43 @@ func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, sto NodeAffinity: nodeAffinity, } } -func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp { - return StartStorageCreatePackageResp{ +func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp { + return &StartStorageCreatePackageResp{ TaskID: taskID, } } -func (client *Client) StartStorageCreatePackage(msg StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) { - return mq.Request[StartStorageCreatePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) { + return mq.Request(Service.StartStorageCreatePackage, client.rabbitCli, msg, opts...) } // 等待从Storage上传Package的任务 -var _ = Register(StorageService.WaitStorageCreatePackage) +var _ = Register(Service.WaitStorageCreatePackage) type WaitStorageCreatePackage struct { + mq.MessageBodyBase TaskID string `json:"taskID"` WaitTimeoutMs int64 `json:"waitTimeout"` } type WaitStorageCreatePackageResp struct { + mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` PackageID int64 `json:"packageID"` } -func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) WaitStorageCreatePackage { - return WaitStorageCreatePackage{ +func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) *WaitStorageCreatePackage { + return &WaitStorageCreatePackage{ TaskID: taskID, WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID int64) WaitStorageCreatePackageResp { - return WaitStorageCreatePackageResp{ +func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID int64) *WaitStorageCreatePackageResp { + return &WaitStorageCreatePackageResp{ IsComplete: isComplete, Error: err, PackageID: packageID, } } -func (client *Client) WaitStorageCreatePackage(msg WaitStorageCreatePackage, opts ...mq.RequestOption) (*WaitStorageCreatePackageResp, error) { - return mq.Request[WaitStorageCreatePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) WaitStorageCreatePackage(msg *WaitStorageCreatePackage, opts ...mq.RequestOption) (*WaitStorageCreatePackageResp, error) { + return mq.Request(Service.WaitStorageCreatePackage, client.rabbitCli, msg, opts...) } diff --git a/common/pkgs/mq/coordinator/agent.go b/common/pkgs/mq/coordinator/agent.go index 731f332..1de6f52 100644 --- a/common/pkgs/mq/coordinator/agent.go +++ b/common/pkgs/mq/coordinator/agent.go @@ -9,27 +9,29 @@ type AgentService interface { } // 代理端发给协调端,告知临时缓存的数据 -var _ = RegisterNoReply(AgentService.TempCacheReport) +var _ = RegisterNoReply(Service.TempCacheReport) type TempCacheReport struct { + mq.MessageBodyBase NodeID int64 `json:"nodeID"` Hashes []string `json:"hashes"` } -func NewTempCacheReportBody(nodeID int64, hashes []string) TempCacheReport { - return TempCacheReport{ +func NewTempCacheReportBody(nodeID int64, hashes []string) *TempCacheReport { + return &TempCacheReport{ NodeID: nodeID, Hashes: hashes, } } -func (client *Client) TempCacheReport(msg TempCacheReport) error { - return mq.Send(client.rabbitCli, msg) +func (client *Client) TempCacheReport(msg *TempCacheReport) error { + return mq.Send(AgentService.TempCacheReport, client.rabbitCli, msg) } // 代理端发给协调端,告知延迟、ipfs和资源目录的可达性 -var _ = RegisterNoReply(AgentService.AgentStatusReport) +var _ = RegisterNoReply(Service.AgentStatusReport) type AgentStatusReport struct { + mq.MessageBodyBase NodeID int64 `json:"nodeID"` NodeDelayIDs []int64 `json:"nodeDelayIDs"` NodeDelays []int `json:"nodeDelays"` @@ -37,8 +39,8 @@ type AgentStatusReport struct { LocalDirStatus string `json:"localDirStatus"` } -func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []int, ipfsStatus string, localDirStatus string) AgentStatusReport { - return AgentStatusReport{ +func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []int, ipfsStatus string, localDirStatus string) *AgentStatusReport { + return &AgentStatusReport{ NodeID: nodeID, NodeDelayIDs: nodeDelayIDs, NodeDelays: nodeDelays, @@ -46,6 +48,6 @@ func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []i LocalDirStatus: localDirStatus, } } -func (client *Client) AgentStatusReport(msg AgentStatusReport) error { - return mq.Send(client.rabbitCli, msg) +func (client *Client) AgentStatusReport(msg *AgentStatusReport) error { + return mq.Send(AgentService.AgentStatusReport, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/bucket.go b/common/pkgs/mq/coordinator/bucket.go index e1df696..08828b0 100644 --- a/common/pkgs/mq/coordinator/bucket.go +++ b/common/pkgs/mq/coordinator/bucket.go @@ -16,99 +16,108 @@ type BucketService interface { } // 获取用户所有的桶 -var _ = Register(BucketService.GetUserBuckets) +var _ = Register(Service.GetUserBuckets) type GetUserBuckets struct { + mq.MessageBodyBase UserID int64 `json:"userID"` } type GetUserBucketsResp struct { + mq.MessageBodyBase Buckets []model.Bucket `json:"buckets"` } -func NewGetUserBuckets(userID int64) GetUserBuckets { - return GetUserBuckets{ +func NewGetUserBuckets(userID int64) *GetUserBuckets { + return &GetUserBuckets{ UserID: userID, } } -func NewGetUserBucketsResp(buckets []model.Bucket) GetUserBucketsResp { - return GetUserBucketsResp{ +func NewGetUserBucketsResp(buckets []model.Bucket) *GetUserBucketsResp { + return &GetUserBucketsResp{ Buckets: buckets, } } -func (client *Client) GetUserBuckets(msg GetUserBuckets) (*GetUserBucketsResp, error) { - return mq.Request[GetUserBucketsResp](client.rabbitCli, msg) +func (client *Client) GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, error) { + return mq.Request(Service.GetUserBuckets, client.rabbitCli, msg) } // 获取桶中的所有Package -var _ = Register(BucketService.GetBucketPackages) +var _ = Register(Service.GetBucketPackages) type GetBucketPackages struct { + mq.MessageBodyBase UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` } type GetBucketPackagesResp struct { + mq.MessageBodyBase Packages []model.Package `json:"packages"` } -func NewGetBucketPackages(userID int64, bucketID int64) GetBucketPackages { - return GetBucketPackages{ +func NewGetBucketPackages(userID int64, bucketID int64) *GetBucketPackages { + return &GetBucketPackages{ UserID: userID, BucketID: bucketID, } } -func NewGetBucketPackagesResp(packages []model.Package) GetBucketPackagesResp { - return GetBucketPackagesResp{ +func NewGetBucketPackagesResp(packages []model.Package) *GetBucketPackagesResp { + return &GetBucketPackagesResp{ Packages: packages, } } -func (client *Client) GetBucketPackages(msg GetBucketPackages) (*GetBucketPackagesResp, error) { - return mq.Request[GetBucketPackagesResp](client.rabbitCli, msg) +func (client *Client) GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, error) { + return mq.Request(Service.GetBucketPackages, client.rabbitCli, msg) } // 创建桶 -var _ = Register(BucketService.CreateBucket) +var _ = Register(Service.CreateBucket) type CreateBucket struct { + mq.MessageBodyBase UserID int64 `json:"userID"` BucketName string `json:"bucketName"` } type CreateBucketResp struct { + mq.MessageBodyBase BucketID int64 `json:"bucketID"` } -func NewCreateBucket(userID int64, bucketName string) CreateBucket { - return CreateBucket{ +func NewCreateBucket(userID int64, bucketName string) *CreateBucket { + return &CreateBucket{ UserID: userID, BucketName: bucketName, } } -func NewCreateBucketResp(bucketID int64) CreateBucketResp { - return CreateBucketResp{ +func NewCreateBucketResp(bucketID int64) *CreateBucketResp { + return &CreateBucketResp{ BucketID: bucketID, } } -func (client *Client) CreateBucket(msg CreateBucket) (*CreateBucketResp, error) { - return mq.Request[CreateBucketResp](client.rabbitCli, msg) +func (client *Client) CreateBucket(msg *CreateBucket) (*CreateBucketResp, error) { + return mq.Request(Service.CreateBucket, client.rabbitCli, msg) } // 删除桶 -var _ = Register(BucketService.DeleteBucket) +var _ = Register(Service.DeleteBucket) type DeleteBucket struct { + mq.MessageBodyBase UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` } -type DeleteBucketResp struct{} +type DeleteBucketResp struct { + mq.MessageBodyBase +} -func NewDeleteBucket(userID int64, bucketID int64) DeleteBucket { - return DeleteBucket{ +func NewDeleteBucket(userID int64, bucketID int64) *DeleteBucket { + return &DeleteBucket{ UserID: userID, BucketID: bucketID, } } -func NewDeleteBucketResp() DeleteBucketResp { - return DeleteBucketResp{} +func NewDeleteBucketResp() *DeleteBucketResp { + return &DeleteBucketResp{} } -func (client *Client) DeleteBucket(msg DeleteBucket) (*DeleteBucketResp, error) { - return mq.Request[DeleteBucketResp](client.rabbitCli, msg) +func (client *Client) DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, error) { + return mq.Request(Service.DeleteBucket, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index dfbf944..4a68a53 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -9,25 +9,28 @@ type CacheService interface { } // Package的Object移动到了节点的Cache中 -var _ = Register(CacheService.CachePackageMoved) +var _ = Register(Service.CachePackageMoved) type CachePackageMoved struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` NodeID int64 `json:"nodeID"` FileHashes []string `json:"fileHashes"` } -type CachePackageMovedResp struct{} +type CachePackageMovedResp struct { + mq.MessageBodyBase +} -func NewCachePackageMoved(packageID int64, nodeID int64, fileHashes []string) CachePackageMoved { - return CachePackageMoved{ +func NewCachePackageMoved(packageID int64, nodeID int64, fileHashes []string) *CachePackageMoved { + return &CachePackageMoved{ PackageID: packageID, NodeID: nodeID, FileHashes: fileHashes, } } -func NewCachePackageMovedResp() CachePackageMovedResp { - return CachePackageMovedResp{} +func NewCachePackageMovedResp() *CachePackageMovedResp { + return &CachePackageMovedResp{} } -func (client *Client) CachePackageMoved(msg CachePackageMoved) (*CachePackageMovedResp, error) { - return mq.Request[CachePackageMovedResp](client.rabbitCli, msg) +func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { + return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/common.go b/common/pkgs/mq/coordinator/common.go index 7d3ceca..f0a60b9 100644 --- a/common/pkgs/mq/coordinator/common.go +++ b/common/pkgs/mq/coordinator/common.go @@ -12,49 +12,53 @@ type CommonService interface { } // 查询指定IP所属的地域 -var _ = Register(CommonService.FindClientLocation) +var _ = Register(Service.FindClientLocation) type FindClientLocation struct { + mq.MessageBodyBase IP string `json:"ip"` } type FindClientLocationResp struct { + mq.MessageBodyBase Location model.Location `json:"location"` } -func NewFindClientLocation(ip string) FindClientLocation { - return FindClientLocation{ +func NewFindClientLocation(ip string) *FindClientLocation { + return &FindClientLocation{ IP: ip, } } -func NewFindClientLocationResp(location model.Location) FindClientLocationResp { - return FindClientLocationResp{ +func NewFindClientLocationResp(location model.Location) *FindClientLocationResp { + return &FindClientLocationResp{ Location: location, } } -func (client *Client) FindClientLocation(msg FindClientLocation) (*FindClientLocationResp, error) { - return mq.Request[FindClientLocationResp](client.rabbitCli, msg) +func (client *Client) FindClientLocation(msg *FindClientLocation) (*FindClientLocationResp, error) { + return mq.Request(Service.FindClientLocation, client.rabbitCli, msg) } // 获取EC具体配置 -var _ = Register(CommonService.GetECConfig) +var _ = Register(Service.GetECConfig) type GetECConfig struct { + mq.MessageBodyBase ECName string `json:"ecName"` } type GetECConfigResp struct { + mq.MessageBodyBase Config model.Ec `json:"config"` } -func NewGetECConfig(ecName string) GetECConfig { - return GetECConfig{ +func NewGetECConfig(ecName string) *GetECConfig { + return &GetECConfig{ ECName: ecName, } } -func NewGetECConfigResp(config model.Ec) GetECConfigResp { - return GetECConfigResp{ +func NewGetECConfigResp(config model.Ec) *GetECConfigResp { + return &GetECConfigResp{ Config: config, } } -func (client *Client) GetECConfig(msg GetECConfig) (*GetECConfigResp, error) { - return mq.Request[GetECConfigResp](client.rabbitCli, msg) +func (client *Client) GetECConfig(msg *GetECConfig) (*GetECConfigResp, error) { + return mq.Request(Service.GetECConfig, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 066f1e4..530341f 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -12,49 +12,53 @@ type NodeService interface { } // 查询用户可用的节点 -var _ = Register(NodeService.GetUserNodes) +var _ = Register(Service.GetUserNodes) type GetUserNodes struct { + mq.MessageBodyBase UserID int64 `json:"userID"` } type GetUserNodesResp struct { + mq.MessageBodyBase Nodes []model.Node `json:"nodes"` } -func NewGetUserNodes(userID int64) GetUserNodes { - return GetUserNodes{ +func NewGetUserNodes(userID int64) *GetUserNodes { + return &GetUserNodes{ UserID: userID, } } -func NewGetUserNodesResp(nodes []model.Node) GetUserNodesResp { - return GetUserNodesResp{ +func NewGetUserNodesResp(nodes []model.Node) *GetUserNodesResp { + return &GetUserNodesResp{ Nodes: nodes, } } -func (client *Client) GetUserNodes(msg GetUserNodes) (*GetUserNodesResp, error) { - return mq.Request[GetUserNodesResp](client.rabbitCli, msg) +func (client *Client) GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, error) { + return mq.Request(Service.GetUserNodes, client.rabbitCli, msg) } // 获取指定节点的信息 -var _ = Register(NodeService.GetNodes) +var _ = Register(Service.GetNodes) type GetNodes struct { + mq.MessageBodyBase NodeIDs []int64 `json:"nodeIDs"` } type GetNodesResp struct { + mq.MessageBodyBase Nodes []model.Node `json:"nodes"` } -func NewGetNodes(nodeIDs []int64) GetNodes { - return GetNodes{ +func NewGetNodes(nodeIDs []int64) *GetNodes { + return &GetNodes{ NodeIDs: nodeIDs, } } -func NewGetNodesResp(nodes []model.Node) GetNodesResp { - return GetNodesResp{ +func NewGetNodesResp(nodes []model.Node) *GetNodesResp { + return &GetNodesResp{ Nodes: nodes, } } -func (client *Client) GetNodes(msg GetNodes) (*GetNodesResp, error) { - return mq.Request[GetNodesResp](client.rabbitCli, msg) +func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) { + return mq.Request(Service.GetNodes, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index c8d5008..047d6d5 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -12,49 +12,53 @@ type ObjectService interface { } // 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序 -var _ = Register(ObjectService.GetPackageObjectRepData) +var _ = Register(Service.GetPackageObjectRepData) type GetPackageObjectRepData struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` } type GetPackageObjectRepDataResp struct { + mq.MessageBodyBase Data []models.ObjectRepData `json:"data"` } -func NewGetPackageObjectRepData(packageID int64) GetPackageObjectRepData { - return GetPackageObjectRepData{ +func NewGetPackageObjectRepData(packageID int64) *GetPackageObjectRepData { + return &GetPackageObjectRepData{ PackageID: packageID, } } -func NewGetPackageObjectRepDataResp(data []models.ObjectRepData) GetPackageObjectRepDataResp { - return GetPackageObjectRepDataResp{ +func NewGetPackageObjectRepDataResp(data []models.ObjectRepData) *GetPackageObjectRepDataResp { + return &GetPackageObjectRepDataResp{ Data: data, } } -func (client *Client) GetPackageObjectRepData(msg GetPackageObjectRepData) (*GetPackageObjectRepDataResp, error) { - return mq.Request[GetPackageObjectRepDataResp](client.rabbitCli, msg) +func (client *Client) GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, error) { + return mq.Request(Service.GetPackageObjectRepData, client.rabbitCli, msg) } // 获取指定Object的EC数据,返回的Objects会按照ObjectID升序 -var _ = Register(ObjectService.GetPackageObjectECData) +var _ = Register(Service.GetPackageObjectECData) type GetPackageObjectECData struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` } type GetPackageObjectECDataResp struct { + mq.MessageBodyBase Data []models.ObjectECData `json:"data"` } -func NewGetPackageObjectECData(packageID int64) GetPackageObjectECData { - return GetPackageObjectECData{ +func NewGetPackageObjectECData(packageID int64) *GetPackageObjectECData { + return &GetPackageObjectECData{ PackageID: packageID, } } -func NewGetPackageObjectECDataResp(data []models.ObjectECData) GetPackageObjectECDataResp { - return GetPackageObjectECDataResp{ +func NewGetPackageObjectECDataResp(data []models.ObjectECData) *GetPackageObjectECDataResp { + return &GetPackageObjectECDataResp{ Data: data, } } -func (client *Client) GetPackageObjectECData(msg GetPackageObjectECData) (*GetPackageObjectECDataResp, error) { - return mq.Request[GetPackageObjectECDataResp](client.rabbitCli, msg) +func (client *Client) GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, error) { + return mq.Request(Service.GetPackageObjectECData, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index d16e661..69771c9 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -25,96 +25,105 @@ type PackageService interface { } // 获取Package基本信息 -var _ = Register(PackageService.GetPackage) +var _ = Register(Service.GetPackage) type GetPackage struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } type GetPackageResp struct { + mq.MessageBodyBase model.Package } -func NewGetPackage(userID int64, packageID int64) GetPackage { - return GetPackage{ +func NewGetPackage(userID int64, packageID int64) *GetPackage { + return &GetPackage{ UserID: userID, PackageID: packageID, } } -func NewGetPackageResp(pkg model.Package) GetPackageResp { - return GetPackageResp{ +func NewGetPackageResp(pkg model.Package) *GetPackageResp { + return &GetPackageResp{ Package: pkg, } } -func (client *Client) GetPackage(msg GetPackage) (*GetPackageResp, error) { - return mq.Request[GetPackageResp](client.rabbitCli, msg) +func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) { + return mq.Request(Service.GetPackage, client.rabbitCli, msg) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 -var _ = Register(PackageService.GetPackageObjects) +var _ = Register(Service.GetPackageObjects) type GetPackageObjects struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } type GetPackageObjectsResp struct { + mq.MessageBodyBase Objects []model.Object `json:"objects"` } -func NewGetPackageObjects(userID int64, packageID int64) GetPackageObjects { - return GetPackageObjects{ +func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects { + return &GetPackageObjects{ UserID: userID, PackageID: packageID, } } -func NewGetPackageObjectsResp(objects []model.Object) GetPackageObjectsResp { - return GetPackageObjectsResp{ +func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp { + return &GetPackageObjectsResp{ Objects: objects, } } -func (client *Client) GetPackageObjects(msg GetPackageObjects) (*GetPackageObjectsResp, error) { - return mq.Request[GetPackageObjectsResp](client.rabbitCli, msg) +func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) { + return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) } // 创建一个Package -var _ = Register(PackageService.CreatePackage) +var _ = Register(Service.CreatePackage) type CreatePackage struct { + mq.MessageBodyBase UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` Name string `json:"name"` Redundancy models.TypedRedundancyInfo `json:"redundancy"` } type CreatePackageResp struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` } -func NewCreatePackage(userID int64, bucketID int64, name string, redundancy models.TypedRedundancyInfo) CreatePackage { - return CreatePackage{ +func NewCreatePackage(userID int64, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *CreatePackage { + return &CreatePackage{ UserID: userID, BucketID: bucketID, Name: name, Redundancy: redundancy, } } -func NewCreatePackageResp(packageID int64) CreatePackageResp { - return CreatePackageResp{ +func NewCreatePackageResp(packageID int64) *CreatePackageResp { + return &CreatePackageResp{ PackageID: packageID, } } -func (client *Client) CreatePackage(msg CreatePackage) (*CreatePackageResp, error) { - return mq.Request[CreatePackageResp](client.rabbitCli, msg) +func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, error) { + return mq.Request(Service.CreatePackage, client.rabbitCli, msg) } // 更新Rep备份模式的Package -var _ = Register(PackageService.UpdateRepPackage) +var _ = Register(Service.UpdateRepPackage) type UpdateRepPackage struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` Adds []AddRepObjectInfo `json:"objects"` Deletes []int64 `json:"deletes"` } -type UpdateRepPackageResp struct{} +type UpdateRepPackageResp struct { + mq.MessageBodyBase +} type AddRepObjectInfo struct { Path string `json:"path"` Size int64 `json:"size,string"` @@ -122,15 +131,15 @@ type AddRepObjectInfo struct { NodeIDs []int64 `json:"nodeIDs"` } -func NewUpdateRepPackage(packageID int64, adds []AddRepObjectInfo, deletes []int64) UpdateRepPackage { - return UpdateRepPackage{ +func NewUpdateRepPackage(packageID int64, adds []AddRepObjectInfo, deletes []int64) *UpdateRepPackage { + return &UpdateRepPackage{ PackageID: packageID, Adds: adds, Deletes: deletes, } } -func NewUpdateRepPackageResp() UpdateRepPackageResp { - return UpdateRepPackageResp{} +func NewUpdateRepPackageResp() *UpdateRepPackageResp { + return &UpdateRepPackageResp{} } func NewAddRepObjectInfo(path string, size int64, fileHash string, nodeIDs []int64) AddRepObjectInfo { return AddRepObjectInfo{ @@ -140,19 +149,22 @@ func NewAddRepObjectInfo(path string, size int64, fileHash string, nodeIDs []int NodeIDs: nodeIDs, } } -func (client *Client) UpdateRepPackage(msg UpdateRepPackage) (*UpdateRepPackageResp, error) { - return mq.Request[UpdateRepPackageResp](client.rabbitCli, msg) +func (client *Client) UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, error) { + return mq.Request(Service.UpdateRepPackage, client.rabbitCli, msg) } // 更新EC备份模式的Package -var _ = Register(PackageService.UpdateECPackage) +var _ = Register(Service.UpdateECPackage) type UpdateECPackage struct { + mq.MessageBodyBase PackageID int64 `json:"packageID"` Adds []AddECObjectInfo `json:"objects"` Deletes []int64 `json:"deletes"` } -type UpdateECPackageResp struct{} +type UpdateECPackageResp struct { + mq.MessageBodyBase +} type AddECObjectInfo struct { Path string `json:"path"` Size int64 `json:"size,string"` @@ -160,15 +172,15 @@ type AddECObjectInfo struct { NodeIDs []int64 `json:"nodeIDs"` } -func NewUpdateECPackage(packageID int64, adds []AddECObjectInfo, deletes []int64) UpdateECPackage { - return UpdateECPackage{ +func NewUpdateECPackage(packageID int64, adds []AddECObjectInfo, deletes []int64) *UpdateECPackage { + return &UpdateECPackage{ PackageID: packageID, Adds: adds, Deletes: deletes, } } -func NewUpdateECPackageResp() UpdateECPackageResp { - return UpdateECPackageResp{} +func NewUpdateECPackageResp() *UpdateECPackageResp { + return &UpdateECPackageResp{} } func NewAddECObjectInfo(path string, size int64, fileHashes []string, nodeIDs []int64) AddECObjectInfo { return AddECObjectInfo{ @@ -178,36 +190,40 @@ func NewAddECObjectInfo(path string, size int64, fileHashes []string, nodeIDs [] NodeIDs: nodeIDs, } } -func (client *Client) UpdateECPackage(msg UpdateECPackage) (*UpdateECPackageResp, error) { - return mq.Request[UpdateECPackageResp](client.rabbitCli, msg) +func (client *Client) UpdateECPackage(msg *UpdateECPackage) (*UpdateECPackageResp, error) { + return mq.Request(Service.UpdateECPackage, client.rabbitCli, msg) } // 删除对象 -var _ = Register(PackageService.DeletePackage) +var _ = Register(Service.DeletePackage) type DeletePackage struct { + mq.MessageBodyBase UserID int64 `db:"userID"` PackageID int64 `db:"packageID"` } -type DeletePackageResp struct{} +type DeletePackageResp struct { + mq.MessageBodyBase +} -func NewDeletePackage(userID int64, packageID int64) DeletePackage { - return DeletePackage{ +func NewDeletePackage(userID int64, packageID int64) *DeletePackage { + return &DeletePackage{ UserID: userID, PackageID: packageID, } } -func NewDeletePackageResp() DeletePackageResp { - return DeletePackageResp{} +func NewDeletePackageResp() *DeletePackageResp { + return &DeletePackageResp{} } -func (client *Client) DeletePackage(msg DeletePackage) (*DeletePackageResp, error) { - return mq.Request[DeletePackageResp](client.rabbitCli, msg) +func (client *Client) DeletePackage(msg *DeletePackage) (*DeletePackageResp, error) { + return mq.Request(Service.DeletePackage, client.rabbitCli, msg) } // 根据PackageID获取object分布情况 -var _ = Register(PackageService.GetPackageCachedNodes) +var _ = Register(Service.GetPackageCachedNodes) type GetPackageCachedNodes struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } @@ -219,18 +235,19 @@ type PackageCachedNodeInfo struct { } type GetPackageCachedNodesResp struct { + mq.MessageBodyBase models.PackageCachingInfo } -func NewGetPackageCachedNodes(userID int64, packageID int64) GetPackageCachedNodes { - return GetPackageCachedNodes{ +func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNodes { + return &GetPackageCachedNodes{ UserID: userID, PackageID: packageID, } } -func NewGetPackageCachedNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) GetPackageCachedNodesResp { - return GetPackageCachedNodesResp{ +func NewGetPackageCachedNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp { + return &GetPackageCachedNodesResp{ PackageCachingInfo: models.PackageCachingInfo{ NodeInfos: nodeInfos, PackageSize: packageSize, @@ -239,35 +256,37 @@ func NewGetPackageCachedNodesResp(nodeInfos []models.NodePackageCachingInfo, pac } } -func (client *Client) GetPackageCachedNodes(msg GetPackageCachedNodes) (*GetPackageCachedNodesResp, error) { - return mq.Request[GetPackageCachedNodesResp](client.rabbitCli, msg) +func (client *Client) GetPackageCachedNodes(msg *GetPackageCachedNodes) (*GetPackageCachedNodesResp, error) { + return mq.Request(Service.GetPackageCachedNodes, client.rabbitCli, msg) } // 根据PackageID获取storage分布情况 -var _ = Register(PackageService.GetPackageLoadedNodes) +var _ = Register(Service.GetPackageLoadedNodes) type GetPackageLoadedNodes struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } type GetPackageLoadedNodesResp struct { + mq.MessageBodyBase NodeIDs []int64 `json:"nodeIDs"` } -func NewGetPackageLoadedNodes(userID int64, packageID int64) GetPackageLoadedNodes { - return GetPackageLoadedNodes{ +func NewGetPackageLoadedNodes(userID int64, packageID int64) *GetPackageLoadedNodes { + return &GetPackageLoadedNodes{ UserID: userID, PackageID: packageID, } } -func NewGetPackageLoadedNodesResp(nodeIDs []int64) GetPackageLoadedNodesResp { - return GetPackageLoadedNodesResp{ +func NewGetPackageLoadedNodesResp(nodeIDs []int64) *GetPackageLoadedNodesResp { + return &GetPackageLoadedNodesResp{ NodeIDs: nodeIDs, } } -func (client *Client) GetPackageLoadedNodes(msg GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, error) { - return mq.Request[GetPackageLoadedNodesResp](client.rabbitCli, msg) +func (client *Client) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, error) { + return mq.Request(Service.GetPackageLoadedNodes, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index d221f3b..437a3f9 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -63,7 +63,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any { +func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { mq.AddServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() mq.RegisterMessage[TResp]() @@ -73,7 +73,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T // RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any { +func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any { mq.AddNoRespServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index 04281f2..d36f164 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -12,25 +12,27 @@ type StorageService interface { } // 获取Storage信息 -var _ = Register(StorageService.GetStorageInfo) +var _ = Register(Service.GetStorageInfo) type GetStorageInfo struct { + mq.MessageBodyBase UserID int64 `json:"userID"` StorageID int64 `json:"storageID"` } type GetStorageInfoResp struct { + mq.MessageBodyBase model.Storage } -func NewGetStorageInfo(userID int64, storageID int64) GetStorageInfo { - return GetStorageInfo{ +func NewGetStorageInfo(userID int64, storageID int64) *GetStorageInfo { + return &GetStorageInfo{ UserID: userID, StorageID: storageID, } } -func NewGetStorageInfoResp(storageID int64, name string, nodeID int64, dir string, state string) GetStorageInfoResp { - return GetStorageInfoResp{ - model.Storage{ +func NewGetStorageInfoResp(storageID int64, name string, nodeID int64, dir string, state string) *GetStorageInfoResp { + return &GetStorageInfoResp{ + Storage: model.Storage{ StorageID: storageID, Name: name, NodeID: nodeID, @@ -39,30 +41,33 @@ func NewGetStorageInfoResp(storageID int64, name string, nodeID int64, dir strin }, } } -func (client *Client) GetStorageInfo(msg GetStorageInfo) (*GetStorageInfoResp, error) { - return mq.Request[GetStorageInfoResp](client.rabbitCli, msg) +func (client *Client) GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, error) { + return mq.Request(Service.GetStorageInfo, client.rabbitCli, msg) } // 提交调度记录 -var _ = Register(StorageService.StoragePackageLoaded) +var _ = Register(Service.StoragePackageLoaded) type StoragePackageLoaded struct { + mq.MessageBodyBase UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` StorageID int64 `json:"storageID"` } -type StoragePackageLoadedResp struct{} +type StoragePackageLoadedResp struct { + mq.MessageBodyBase +} -func NewStoragePackageLoaded(userID int64, packageID int64, stgID int64) StoragePackageLoaded { - return StoragePackageLoaded{ +func NewStoragePackageLoaded(userID int64, packageID int64, stgID int64) *StoragePackageLoaded { + return &StoragePackageLoaded{ UserID: userID, PackageID: packageID, StorageID: stgID, } } -func NewStoragePackageLoadedResp() StoragePackageLoadedResp { - return StoragePackageLoadedResp{} +func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { + return &StoragePackageLoadedResp{} } -func (client *Client) StoragePackageLoaded(msg StoragePackageLoaded) (*StoragePackageLoadedResp, error) { - return mq.Request[StoragePackageLoadedResp](client.rabbitCli, msg) +func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) { + return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/scanner/event.go b/common/pkgs/mq/scanner/event.go index 6062d01..09b0ba0 100644 --- a/common/pkgs/mq/scanner/event.go +++ b/common/pkgs/mq/scanner/event.go @@ -10,23 +10,24 @@ type EventService interface { } // 投递Event -var _ = RegisterNoReply(EventService.PostEvent) +var _ = RegisterNoReply(Service.PostEvent) type PostEvent struct { + mq.MessageBodyBase Event scevt.Event `json:"event"` IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 DontMerge bool `json:"dontMerge"` // 不可合并此消息 } -func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) PostEvent { - return PostEvent{ +func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) *PostEvent { + return &PostEvent{ Event: event, IsEmergency: isEmergency, DontMerge: dontMerge, } } -func (client *Client) PostEvent(msg PostEvent) error { - return mq.Send[PostEvent](client.rabbitCli, msg) +func (client *Client) PostEvent(msg *PostEvent) error { + return mq.Send(Service.PostEvent, client.rabbitCli, msg) } func init() { diff --git a/common/pkgs/mq/scanner/event/agent_check_cache.go b/common/pkgs/mq/scanner/event/agent_check_cache.go index 9bbca67..cd0496e 100644 --- a/common/pkgs/mq/scanner/event/agent_check_cache.go +++ b/common/pkgs/mq/scanner/event/agent_check_cache.go @@ -5,8 +5,8 @@ type AgentCheckCache struct { FileHashes []string `json:"fileHashes"` // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckCache(nodeID int64, fileHashes []string) AgentCheckCache { - return AgentCheckCache{ +func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { + return &AgentCheckCache{ NodeID: nodeID, FileHashes: fileHashes, } diff --git a/common/pkgs/mq/scanner/event/agent_check_state.go b/common/pkgs/mq/scanner/event/agent_check_state.go index 753ccea..5f19ab7 100644 --- a/common/pkgs/mq/scanner/event/agent_check_state.go +++ b/common/pkgs/mq/scanner/event/agent_check_state.go @@ -4,8 +4,8 @@ type AgentCheckState struct { NodeID int64 `json:"nodeID"` } -func NewAgentCheckState(nodeID int64) AgentCheckState { - return AgentCheckState{ +func NewAgentCheckState(nodeID int64) *AgentCheckState { + return &AgentCheckState{ NodeID: nodeID, } } diff --git a/common/pkgs/mq/scanner/event/agent_check_storage.go b/common/pkgs/mq/scanner/event/agent_check_storage.go index 38d4f37..d50df77 100644 --- a/common/pkgs/mq/scanner/event/agent_check_storage.go +++ b/common/pkgs/mq/scanner/event/agent_check_storage.go @@ -5,8 +5,8 @@ type AgentCheckStorage struct { PackageIDs []int64 `json:"packageIDs"` // 需要检查的Package文件列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckStorage(storageID int64, packageIDs []int64) AgentCheckStorage { - return AgentCheckStorage{ +func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage { + return &AgentCheckStorage{ StorageID: storageID, PackageIDs: packageIDs, } diff --git a/common/pkgs/mq/scanner/event/check_cache.go b/common/pkgs/mq/scanner/event/check_cache.go index 536fc20..36a306c 100644 --- a/common/pkgs/mq/scanner/event/check_cache.go +++ b/common/pkgs/mq/scanner/event/check_cache.go @@ -4,8 +4,8 @@ type CheckCache struct { NodeID int64 `json:"nodeID"` } -func NewCheckCache(nodeID int64) CheckCache { - return CheckCache{ +func NewCheckCache(nodeID int64) *CheckCache { + return &CheckCache{ NodeID: nodeID, } } diff --git a/common/pkgs/mq/scanner/event/check_package.go b/common/pkgs/mq/scanner/event/check_package.go index 3d5fd5c..4ba71d2 100644 --- a/common/pkgs/mq/scanner/event/check_package.go +++ b/common/pkgs/mq/scanner/event/check_package.go @@ -4,8 +4,8 @@ type CheckPackage struct { PackageIDs []int64 `json:"packageIDs"` } -func NewCheckPackage(packageIDs []int64) CheckPackage { - return CheckPackage{ +func NewCheckPackage(packageIDs []int64) *CheckPackage { + return &CheckPackage{ PackageIDs: packageIDs, } } diff --git a/common/pkgs/mq/scanner/event/check_rep_count.go b/common/pkgs/mq/scanner/event/check_rep_count.go index 56b076b..ce639a7 100644 --- a/common/pkgs/mq/scanner/event/check_rep_count.go +++ b/common/pkgs/mq/scanner/event/check_rep_count.go @@ -4,8 +4,8 @@ type CheckRepCount struct { FileHashes []string `json:"fileHashes"` } -func NewCheckRepCount(fileHashes []string) CheckRepCount { - return CheckRepCount{ +func NewCheckRepCount(fileHashes []string) *CheckRepCount { + return &CheckRepCount{ FileHashes: fileHashes, } } diff --git a/common/pkgs/mq/scanner/server.go b/common/pkgs/mq/scanner/server.go index eb04bb4..73fd65f 100644 --- a/common/pkgs/mq/scanner/server.go +++ b/common/pkgs/mq/scanner/server.go @@ -49,7 +49,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any { +func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { mq.AddServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() mq.RegisterMessage[TResp]() @@ -59,7 +59,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T // RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 // TODO 需要约束:Service实现了TSvc接口 -func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any { +func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any { mq.AddNoRespServiceFn(&msgDispatcher, svcFn) mq.RegisterMessage[TReq]() diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/services/bucket.go index 7659c99..ba9f13d 100644 --- a/coordinator/internal/services/bucket.go +++ b/coordinator/internal/services/bucket.go @@ -22,7 +22,7 @@ func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserB if err != nil { logger.WithField("UserID", msg.UserID). Warnf("get user buckets failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") + return nil, mq.Failed(errorcode.OperationFailed, "get all buckets failed") } return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) @@ -35,7 +35,7 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge logger.WithField("UserID", msg.UserID). WithField("BucketID", msg.BucketID). Warnf("get bucket packages failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetBucketPackagesResp](errorcode.OperationFailed, "get bucket packages failed") + return nil, mq.Failed(errorcode.OperationFailed, "get bucket packages failed") } return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages)) @@ -53,7 +53,7 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket logger.WithField("UserID", msg.UserID). WithField("BucketName", msg.BucketName). Warnf("create bucket failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") + return nil, mq.Failed(errorcode.OperationFailed, "create bucket failed") } return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) @@ -67,7 +67,7 @@ func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucket logger.WithField("UserID", msg.UserID). WithField("BucketID", msg.BucketID). Warnf("delete bucket failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") + return nil, mq.Failed(errorcode.OperationFailed, "delete bucket failed") } return mq.ReplyOK(coormq.NewDeleteBucketResp()) diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 4098dd5..9da79c3 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -143,13 +143,13 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("check package available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed") + return nil, mq.Failed(errorcode.OperationFailed, "check package available failed") } if !isAva { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("package is not available to the user") - return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user") + return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user") } err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { @@ -159,7 +159,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("set package deleted failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed") + return nil, mq.Failed(errorcode.OperationFailed, "set package deleted failed") } stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) @@ -197,13 +197,13 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("check package available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed") + return nil, mq.Failed(errorcode.OperationFailed, "check package available failed") } if !isAva { logger.WithField("UserID", msg.UserID). WithField("PackageID", msg.PackageID). Warnf("package is not available to the user") - return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user") + return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user") } pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) @@ -222,7 +222,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") + return nil, mq.Failed(errorcode.OperationFailed, "get objectRepDatas by packageID failed") } for _, data := range objectRepDatas { @@ -249,7 +249,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") + return nil, mq.Failed(errorcode.OperationFailed, "get objectECDatas by packageID failed") } for _, ecData := range objectECDatas { @@ -275,7 +275,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } else { logger.WithField("PackageID", msg.PackageID). Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) - return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong") + return nil, mq.Failed(errorcode.OperationFailed, "redundancy type is wrong") } var nodeInfos []models.NodePackageCachingInfo @@ -294,7 +294,7 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get storages by packageID failed, err: %s", err.Error()) - return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed") + return nil, mq.Failed(errorcode.OperationFailed, "get storages by packageID failed") } uniqueNodeIDs := make(map[int64]bool) diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 7cff9df..b003141 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -16,7 +16,7 @@ import ( ) type AgentCheckCache struct { - scevt.AgentCheckCache + *scevt.AgentCheckCache } func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index f65a00a..2f66425 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -17,7 +17,7 @@ import ( ) type AgentCheckState struct { - scevt.AgentCheckState + *scevt.AgentCheckState } func NewAgentCheckState(nodeID int64) *AgentCheckState { diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index 54f89d6..3f37ce2 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -16,7 +16,7 @@ import ( ) type AgentCheckStorage struct { - scevt.AgentCheckStorage + *scevt.AgentCheckStorage } func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage { diff --git a/scanner/internal/event/check_cache.go b/scanner/internal/event/check_cache.go index 61cbbed..89dbec5 100644 --- a/scanner/internal/event/check_cache.go +++ b/scanner/internal/event/check_cache.go @@ -12,7 +12,7 @@ import ( ) type CheckCache struct { - scevt.CheckCache + *scevt.CheckCache } func NewCheckCache(nodeID int64) *CheckCache { diff --git a/scanner/internal/event/check_package.go b/scanner/internal/event/check_package.go index 962a847..0a5bc91 100644 --- a/scanner/internal/event/check_package.go +++ b/scanner/internal/event/check_package.go @@ -8,7 +8,7 @@ import ( ) type CheckPackage struct { - scevt.CheckPackage + *scevt.CheckPackage } func NewCheckPackage(objIDs []int64) *CheckPackage { diff --git a/scanner/internal/event/check_rep_count.go b/scanner/internal/event/check_rep_count.go index 2b2d049..78fb874 100644 --- a/scanner/internal/event/check_rep_count.go +++ b/scanner/internal/event/check_rep_count.go @@ -17,7 +17,7 @@ import ( ) type CheckRepCount struct { - scevt.CheckRepCount + *scevt.CheckRepCount } func NewCheckRepCount(fileHashes []string) *CheckRepCount {