From b4da1d9d08f416991133fdf6086b1612a8f29831 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 21 Jul 2023 10:43:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rabbitm=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E6=94=AF=E6=8C=81=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/agent.go | 2 +- internal/services/bucket.go | 38 ++++---- internal/services/command_service_ec.go | 5 +- internal/services/object.go | 118 ++++++++++++------------ internal/services/storage.go | 24 ++--- 5 files changed, 94 insertions(+), 93 deletions(-) diff --git a/internal/services/agent.go b/internal/services/agent.go index ade07e1..3dcab0b 100644 --- a/internal/services/agent.go +++ b/internal/services/agent.go @@ -5,7 +5,7 @@ import ( ) func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) { - service.db.BatchInsertOrUpdateCache(msg.Body.Hashes, msg.Body.NodeID) + service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) } func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 2459370..3255da9 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -16,59 +16,59 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { panic("not implement yet") } -func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp { - buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.Body.UserID) +func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, *ramsg.CodeMessage) { + buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) if err != nil { - log.WithField("UserID", msg.Body.UserID). + log.WithField("UserID", msg.UserID). Warnf("get user buckets failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OPERATION_FAILED, "get all buckets failed") } - return ramsg.ReplyOK(coormsg.NewGetUserBucketsRespBody(buckets)) + return ramsg.ReplyOK(coormsg.NewGetUserBucketsResp(buckets)) } -func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp { - objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.BucketID) +func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, *ramsg.CodeMessage) { + objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.UserID, msg.BucketID) if err != nil { - log.WithField("UserID", msg.Body.UserID). - WithField("BucketID", msg.Body.BucketID). + log.WithField("UserID", msg.UserID). + WithField("BucketID", msg.BucketID). Warnf("get bucket objects failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OPERATION_FAILED, "get bucket objects failed") } - return ramsg.ReplyOK(coormsg.NewGetBucketObjectsRespBody(objects)) + return ramsg.ReplyOK(coormsg.NewGetBucketObjectsResp(objects)) } -func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp { +func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBucketResp, *ramsg.CodeMessage) { var bucketID int var err error svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { // 这里用的是外部的err - bucketID, err = svc.db.Bucket().Create(tx, msg.Body.UserID, msg.Body.BucketName) + bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) return err }) if err != nil { - log.WithField("UserID", msg.Body.UserID). - WithField("BucketName", msg.Body.BucketName). + log.WithField("UserID", msg.UserID). + WithField("BucketName", msg.BucketName). Warnf("create bucket failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "create bucket failed") } - return ramsg.ReplyOK(coormsg.NewCreateBucketRespBody(bucketID)) + return ramsg.ReplyOK(coormsg.NewCreateBucketResp(bucketID)) } -func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp { +func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, *ramsg.CodeMessage) { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.Bucket().Delete(tx, msg.Body.BucketID) + return svc.db.Bucket().Delete(tx, msg.BucketID) }) if err != nil { - log.WithField("UserID", msg.Body.UserID). - WithField("BucketID", msg.Body.BucketID). + log.WithField("UserID", msg.UserID). + WithField("BucketID", msg.BucketID). Warnf("delete bucket failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OPERATION_FAILED, "delete bucket failed") } - return ramsg.ReplyOK(coormsg.NewDeleteBucketRespBody()) + return ramsg.ReplyOK(coormsg.NewDeleteBucketResp()) } diff --git a/internal/services/command_service_ec.go b/internal/services/command_service_ec.go index 06262d8..376a174 100644 --- a/internal/services/command_service_ec.go +++ b/internal/services/command_service_ec.go @@ -1,10 +1,11 @@ package services import ( + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" ) -func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.PreUploadResp { +func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) (*coormsg.PreUploadResp, *ramsg.CodeMessage) { panic("not implement yet!") /* @@ -56,7 +57,7 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.PreUploadR */ } -func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) *coormsg.CreateObjectResp { +func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) { panic("not implement yet!") /* diff --git a/internal/services/object.go b/internal/services/object.go index 2411749..a736cbc 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -15,27 +15,27 @@ import ( scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) -func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp { +func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *ramsg.CodeMessage) { // 查询文件对象 - object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID) + object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) if err != nil { - logger.WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("ObjectID", msg.ObjectID). Warnf("query Object failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed") } // 查询客户端所属节点 foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) if err == sql.ErrNoRows { foundBelongNode = false } else if err != nil { - logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). + logger.WithField("ClientExternalIP", msg.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") } - logger.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID) + logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID) //-若redundancy是rep,查询对象副本表, 获得FileHash if object.Redundancy == consts.REDUNDANCY_REP { @@ -47,7 +47,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P } // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 - nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objectRep.FileHash) + nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash) if err != nil { logger.WithField("FileHash", objectRep.FileHash). Warnf("query Cache failed, err: %s", err.Error()) @@ -65,7 +65,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P )) } - return ramsg.ReplyOK(coormsg.NewPreDownloadObjectRespBody( + return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp( object.Redundancy, object.FileSize, ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes), @@ -116,51 +116,51 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P } } -func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg.PreUploadResp { +func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *ramsg.CodeMessage) { // 判断同名对象是否存在。等到UploadRepObject时再判断一次。 // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 - isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.UserID) + isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID) if err != nil { - logger.WithField("BucketID", msg.Body.BucketID). + logger.WithField("BucketID", msg.BucketID). Warnf("check bucket available failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed") } if !isBucketAvai { - logger.WithField("BucketID", msg.Body.BucketID). + logger.WithField("BucketID", msg.BucketID). Warnf("bucket is not available to user") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user") } - _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.ObjectName) + _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName) if err == nil { - logger.WithField("BucketID", msg.Body.BucketID). - WithField("ObjectName", msg.Body.ObjectName). + logger.WithField("BucketID", msg.BucketID). + WithField("ObjectName", msg.ObjectName). Warnf("object with given Name and BucketID already exists") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists") } if !errors.Is(err, sql.ErrNoRows) { - logger.WithField("BucketID", msg.Body.BucketID). - WithField("ObjectName", msg.Body.ObjectName). + logger.WithField("BucketID", msg.BucketID). + WithField("ObjectName", msg.ObjectName). Warnf("get object by name failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed") } //查询用户可用的节点IP - nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID) + nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) if err != nil { - logger.WithField("UserID", msg.Body.UserID). + logger.WithField("UserID", msg.UserID). Warnf("query user nodes failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed") } // 查询客户端所属节点 foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) if err == sql.ErrNoRows { foundBelongNode = false } else if err != nil { - logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). + logger.WithField("ClientExternalIP", msg.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed") } @@ -176,74 +176,74 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg )) } - return ramsg.ReplyOK(coormsg.NewPreUploadRespBody(respNodes)) + return ramsg.ReplyOK(coormsg.NewPreUploadResp(respNodes)) } -func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp { +func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - _, err := svc.db.Object().CreateRepObject(tx, msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash) + _, err := svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash) return err }) if err != nil { - logger.WithField("BucketName", msg.Body.BucketID). - WithField("ObjectName", msg.Body.ObjectName). + logger.WithField("BucketName", msg.BucketID). + WithField("ObjectName", msg.ObjectName). Warnf("create rep object failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed") } // 紧急任务 - err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true) + err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) } - return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody()) + return ramsg.ReplyOK(coormsg.NewCreateObjectResp()) } -func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp { +func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *ramsg.CodeMessage) { // TODO 检查用户是否有Object的权限 // 获取对象信息 - obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID) + obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID) if err != nil { - logger.WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("ObjectID", msg.ObjectID). Warnf("get object failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed") } if obj.Redundancy != consts.REDUNDANCY_REP { - logger.WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("ObjectID", msg.ObjectID). Warnf("this object is not a rep object") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object") } // 获取对象Rep信息 - objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID) + objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID) if err != nil { - logger.WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("ObjectID", msg.ObjectID). Warnf("get object rep failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed") } //查询用户可用的节点IP - nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID) + nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) if err != nil { - logger.WithField("UserID", msg.Body.UserID). + logger.WithField("UserID", msg.UserID). Warnf("query user nodes failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed") } // 查询客户端所属节点 foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP) + belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) if err == sql.ErrNoRows { foundBelongNode = false } else if err != nil { - logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP). + logger.WithField("ClientExternalIP", msg.ClientExternalIP). Warnf("query client belong node failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") } // 查询保存了旧文件的节点信息 - cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objRep.FileHash) + cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash) if err != nil { logger.Warnf("find caching file user nodes failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed") @@ -262,63 +262,63 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg )) } - return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectRespBody(retNodes)) + return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes)) } -func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp { +func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *ramsg.CodeMessage) { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.Object().UpdateRepObject(tx, msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash) + return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash) }) if err != nil { - logger.WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("ObjectID", msg.ObjectID). Warnf("update rep object failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed") } // 紧急任务 - err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true) + err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error()) } - return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody()) + return ramsg.ReplyOK(coormsg.NewUpdateRepObjectResp()) } -func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp { - isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID) +func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *ramsg.CodeMessage) { + isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) if err != nil { - logger.WithField("UserID", msg.Body.UserID). - WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("UserID", msg.UserID). + WithField("ObjectID", msg.ObjectID). Warnf("check object available failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed") } if !isAva { - logger.WithField("UserID", msg.Body.UserID). - WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("UserID", msg.UserID). + WithField("ObjectID", msg.ObjectID). Warnf("object is not available to the user") return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user") } err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.Object().SoftDelete(tx, msg.Body.ObjectID) + return svc.db.Object().SoftDelete(tx, msg.ObjectID) }) if err != nil { - logger.WithField("UserID", msg.Body.UserID). - WithField("ObjectID", msg.Body.ObjectID). + logger.WithField("UserID", msg.UserID). + WithField("ObjectID", msg.ObjectID). Warnf("set object deleted failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed") } - stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID) + stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID) if err != nil { logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error()) - return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody()) + return ramsg.ReplyOK(coormsg.NewDeleteObjectResp()) } // 不追求及时、准确 if len(stgs) == 0 { // 如果没有被引用,直接投递CheckObject的任务 - err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false) + err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.ObjectID}), false, false) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -327,7 +327,7 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec } else { // 有引用则让Agent去检查StorageObject for _, stg := range stgs { - err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false) + err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.ObjectID}), false, false) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -335,5 +335,5 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec logger.Debugf("post agent check storage event") } - return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody()) + return ramsg.ReplyOK(coormsg.NewDeleteObjectResp()) } diff --git a/internal/services/storage.go b/internal/services/storage.go index c6581d8..dba21b3 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -12,7 +12,7 @@ import ( coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" ) -func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) *coormsg.PreMoveObjectToStorageResp { +func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *ramsg.CodeMessage) { //查询数据库,获取冗余类型,冗余参数 //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSize //-若redundancy是rep,查询对象副本表, 获得repHash @@ -24,18 +24,18 @@ func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids // 查询用户关联的存储服务 - stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.StorageID) + stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) if err != nil { - log.WithField("UserID", msg.Body.UserID). - WithField("StorageID", msg.Body.StorageID). + log.WithField("UserID", msg.UserID). + WithField("StorageID", msg.StorageID). Warnf("get user Storage failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Storage failed") } // 查询文件对象 - object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID) + object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) if err != nil { - log.WithField("ObjectID", msg.Body.ObjectID). + log.WithField("ObjectID", msg.ObjectID). Warnf("get user Object failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Object failed") } @@ -100,17 +100,17 @@ func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) } } -func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp { +func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *ramsg.CodeMessage) { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.StorageObject().MoveObjectTo(tx, msg.Body.ObjectID, msg.Body.StorageID, msg.Body.UserID) + return svc.db.StorageObject().MoveObjectTo(tx, msg.ObjectID, msg.StorageID, msg.UserID) }) if err != nil { - log.WithField("UserID", msg.Body.UserID). - WithField("ObjectID", msg.Body.ObjectID). - WithField("StorageID", msg.Body.StorageID). + log.WithField("UserID", msg.UserID). + WithField("ObjectID", msg.ObjectID). + WithField("StorageID", msg.StorageID). Warnf("user move object to storage failed, err: %s", err.Error()) return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "user move object to storage failed") } - return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody()) + return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageResp()) }