Browse Source

优化rabbitm相关代码,支持设置超时时间

gitlink
Sydonian 2 years ago
parent
commit
b4da1d9d08
5 changed files with 94 additions and 93 deletions
  1. +1
    -1
      internal/services/agent.go
  2. +19
    -19
      internal/services/bucket.go
  3. +3
    -2
      internal/services/command_service_ec.go
  4. +59
    -59
      internal/services/object.go
  5. +12
    -12
      internal/services/storage.go

+ 1
- 1
internal/services/agent.go View File

@@ -5,7 +5,7 @@ import (
) )


func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) { func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) {
service.db.BatchInsertOrUpdateCache(msg.Body.Hashes, msg.Body.NodeID)
service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
} }


func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) {


+ 19
- 19
internal/services/bucket.go View File

@@ -16,59 +16,59 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) {
panic("not implement yet") panic("not implement yet")
} }


func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp {
buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.Body.UserID)
func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, *ramsg.CodeMessage) {
buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID)


if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
log.WithField("UserID", msg.UserID).
Warnf("get user buckets failed, err: %s", err.Error()) Warnf("get user buckets failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OPERATION_FAILED, "get all buckets failed") return ramsg.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OPERATION_FAILED, "get all buckets failed")
} }


return ramsg.ReplyOK(coormsg.NewGetUserBucketsRespBody(buckets))
return ramsg.ReplyOK(coormsg.NewGetUserBucketsResp(buckets))
} }


func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp {
objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.BucketID)
func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, *ramsg.CodeMessage) {
objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.UserID, msg.BucketID)


if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("BucketID", msg.Body.BucketID).
log.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warnf("get bucket objects failed, err: %s", err.Error()) Warnf("get bucket objects failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OPERATION_FAILED, "get bucket objects failed") return ramsg.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OPERATION_FAILED, "get bucket objects failed")
} }


return ramsg.ReplyOK(coormsg.NewGetBucketObjectsRespBody(objects))
return ramsg.ReplyOK(coormsg.NewGetBucketObjectsResp(objects))
} }


func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp {
func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBucketResp, *ramsg.CodeMessage) {
var bucketID int var bucketID int
var err error var err error
svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
// 这里用的是外部的err // 这里用的是外部的err
bucketID, err = svc.db.Bucket().Create(tx, msg.Body.UserID, msg.Body.BucketName)
bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName)
return err return err
}) })
if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("BucketName", msg.Body.BucketName).
log.WithField("UserID", msg.UserID).
WithField("BucketName", msg.BucketName).
Warnf("create bucket failed, err: %s", err.Error()) Warnf("create bucket failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "create bucket failed") return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "create bucket failed")
} }


return ramsg.ReplyOK(coormsg.NewCreateBucketRespBody(bucketID))
return ramsg.ReplyOK(coormsg.NewCreateBucketResp(bucketID))
} }


func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp {
func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, *ramsg.CodeMessage) {
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.Bucket().Delete(tx, msg.Body.BucketID)
return svc.db.Bucket().Delete(tx, msg.BucketID)
}) })
if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("BucketID", msg.Body.BucketID).
log.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warnf("delete bucket failed, err: %s", err.Error()) Warnf("delete bucket failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OPERATION_FAILED, "delete bucket failed") return ramsg.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OPERATION_FAILED, "delete bucket failed")
} }


return ramsg.ReplyOK(coormsg.NewDeleteBucketRespBody())
return ramsg.ReplyOK(coormsg.NewDeleteBucketResp())
} }

+ 3
- 2
internal/services/command_service_ec.go View File

@@ -1,10 +1,11 @@
package services package services


import ( import (
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
) )


func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.PreUploadResp {
func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) (*coormsg.PreUploadResp, *ramsg.CodeMessage) {
panic("not implement yet!") panic("not implement yet!")


/* /*
@@ -56,7 +57,7 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.PreUploadR
*/ */
} }


func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) *coormsg.CreateObjectResp {
func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) {
panic("not implement yet!") panic("not implement yet!")


/* /*


+ 59
- 59
internal/services/object.go View File

@@ -15,27 +15,27 @@ import (
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
) )


func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp {
func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *ramsg.CodeMessage) {


// 查询文件对象 // 查询文件对象
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
if err != nil { if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("ObjectID", msg.ObjectID).
Warnf("query Object failed, err: %s", err.Error()) Warnf("query Object failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed") return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
} }


// 查询客户端所属节点 // 查询客户端所属节点
foundBelongNode := true foundBelongNode := true
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
foundBelongNode = false foundBelongNode = false
} else if err != nil { } else if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
logger.WithField("ClientExternalIP", msg.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error()) Warnf("query client belong node failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
} }
logger.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID)
logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID)


//-若redundancy是rep,查询对象副本表, 获得FileHash //-若redundancy是rep,查询对象副本表, 获得FileHash
if object.Redundancy == consts.REDUNDANCY_REP { if object.Redundancy == consts.REDUNDANCY_REP {
@@ -47,7 +47,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P
} }


// 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objectRep.FileHash)
nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash)
if err != nil { if err != nil {
logger.WithField("FileHash", objectRep.FileHash). logger.WithField("FileHash", objectRep.FileHash).
Warnf("query Cache failed, err: %s", err.Error()) Warnf("query Cache failed, err: %s", err.Error())
@@ -65,7 +65,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P
)) ))
} }


return ramsg.ReplyOK(coormsg.NewPreDownloadObjectRespBody(
return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp(
object.Redundancy, object.Redundancy,
object.FileSize, object.FileSize,
ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes), ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
@@ -116,51 +116,51 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P
} }
} }


func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg.PreUploadResp {
func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *ramsg.CodeMessage) {


// 判断同名对象是否存在。等到UploadRepObject时再判断一次。 // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
// 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.UserID)
isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID)
if err != nil { if err != nil {
logger.WithField("BucketID", msg.Body.BucketID).
logger.WithField("BucketID", msg.BucketID).
Warnf("check bucket available failed, err: %s", err.Error()) Warnf("check bucket available failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
} }
if !isBucketAvai { if !isBucketAvai {
logger.WithField("BucketID", msg.Body.BucketID).
logger.WithField("BucketID", msg.BucketID).
Warnf("bucket is not available to user") Warnf("bucket is not available to user")
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
} }


_, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.ObjectName)
_, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName)
if err == nil { if err == nil {
logger.WithField("BucketID", msg.Body.BucketID).
WithField("ObjectName", msg.Body.ObjectName).
logger.WithField("BucketID", msg.BucketID).
WithField("ObjectName", msg.ObjectName).
Warnf("object with given Name and BucketID already exists") Warnf("object with given Name and BucketID already exists")
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
} }
if !errors.Is(err, sql.ErrNoRows) { if !errors.Is(err, sql.ErrNoRows) {
logger.WithField("BucketID", msg.Body.BucketID).
WithField("ObjectName", msg.Body.ObjectName).
logger.WithField("BucketID", msg.BucketID).
WithField("ObjectName", msg.ObjectName).
Warnf("get object by name failed, err: %s", err.Error()) Warnf("get object by name failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
} }


//查询用户可用的节点IP //查询用户可用的节点IP
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
if err != nil { if err != nil {
logger.WithField("UserID", msg.Body.UserID).
logger.WithField("UserID", msg.UserID).
Warnf("query user nodes failed, err: %s", err.Error()) Warnf("query user nodes failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
} }


// 查询客户端所属节点 // 查询客户端所属节点
foundBelongNode := true foundBelongNode := true
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
foundBelongNode = false foundBelongNode = false
} else if err != nil { } else if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
logger.WithField("ClientExternalIP", msg.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error()) Warnf("query client belong node failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed") return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
} }
@@ -176,74 +176,74 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg
)) ))
} }


return ramsg.ReplyOK(coormsg.NewPreUploadRespBody(respNodes))
return ramsg.ReplyOK(coormsg.NewPreUploadResp(respNodes))
} }


func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp {
func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) {
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
_, err := svc.db.Object().CreateRepObject(tx, msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
_, err := svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash)
return err return err
}) })
if err != nil { if err != nil {
logger.WithField("BucketName", msg.Body.BucketID).
WithField("ObjectName", msg.Body.ObjectName).
logger.WithField("BucketName", msg.BucketID).
WithField("ObjectName", msg.ObjectName).
Warnf("create rep object failed, err: %s", err.Error()) Warnf("create rep object failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed") return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
} }


// 紧急任务 // 紧急任务
err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
} }


return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody())
return ramsg.ReplyOK(coormsg.NewCreateObjectResp())
} }


func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp {
func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *ramsg.CodeMessage) {
// TODO 检查用户是否有Object的权限 // TODO 检查用户是否有Object的权限
// 获取对象信息 // 获取对象信息
obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID)
if err != nil { if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("ObjectID", msg.ObjectID).
Warnf("get object failed, err: %s", err.Error()) Warnf("get object failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
} }
if obj.Redundancy != consts.REDUNDANCY_REP { if obj.Redundancy != consts.REDUNDANCY_REP {
logger.WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("ObjectID", msg.ObjectID).
Warnf("this object is not a rep object") Warnf("this object is not a rep object")
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
} }


// 获取对象Rep信息 // 获取对象Rep信息
objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID)
if err != nil { if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("ObjectID", msg.ObjectID).
Warnf("get object rep failed, err: %s", err.Error()) Warnf("get object rep failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
} }


//查询用户可用的节点IP //查询用户可用的节点IP
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
if err != nil { if err != nil {
logger.WithField("UserID", msg.Body.UserID).
logger.WithField("UserID", msg.UserID).
Warnf("query user nodes failed, err: %s", err.Error()) Warnf("query user nodes failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
} }


// 查询客户端所属节点 // 查询客户端所属节点
foundBelongNode := true foundBelongNode := true
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
foundBelongNode = false foundBelongNode = false
} else if err != nil { } else if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
logger.WithField("ClientExternalIP", msg.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error()) Warnf("query client belong node failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
} }


// 查询保存了旧文件的节点信息 // 查询保存了旧文件的节点信息
cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objRep.FileHash)
cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash)
if err != nil { if err != nil {
logger.Warnf("find caching file user nodes failed, err: %s", err.Error()) logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed") return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
@@ -262,63 +262,63 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg
)) ))
} }


return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectRespBody(retNodes))
return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes))
} }


func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp {
func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *ramsg.CodeMessage) {
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.Object().UpdateRepObject(tx, msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash)
}) })
if err != nil { if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("ObjectID", msg.ObjectID).
Warnf("update rep object failed, err: %s", err.Error()) Warnf("update rep object failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed") return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
} }


// 紧急任务 // 紧急任务
err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
} }


return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody())
return ramsg.ReplyOK(coormsg.NewUpdateRepObjectResp())
} }


func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp {
isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *ramsg.CodeMessage) {
isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
if err != nil { if err != nil {
logger.WithField("UserID", msg.Body.UserID).
WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("UserID", msg.UserID).
WithField("ObjectID", msg.ObjectID).
Warnf("check object available failed, err: %s", err.Error()) Warnf("check object available failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed") return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed")
} }
if !isAva { if !isAva {
logger.WithField("UserID", msg.Body.UserID).
WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("UserID", msg.UserID).
WithField("ObjectID", msg.ObjectID).
Warnf("object is not available to the user") Warnf("object is not available to the user")
return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user") return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user")
} }


err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.Object().SoftDelete(tx, msg.Body.ObjectID)
return svc.db.Object().SoftDelete(tx, msg.ObjectID)
}) })
if err != nil { if err != nil {
logger.WithField("UserID", msg.Body.UserID).
WithField("ObjectID", msg.Body.ObjectID).
logger.WithField("UserID", msg.UserID).
WithField("ObjectID", msg.ObjectID).
Warnf("set object deleted failed, err: %s", err.Error()) Warnf("set object deleted failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed") return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
} }


stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID)
stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID)
if err != nil { if err != nil {
logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error()) logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
} }


// 不追求及时、准确 // 不追求及时、准确
if len(stgs) == 0 { if len(stgs) == 0 {
// 如果没有被引用,直接投递CheckObject的任务 // 如果没有被引用,直接投递CheckObject的任务
err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false)
err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.ObjectID}), false, false)
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
} }
@@ -327,7 +327,7 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec
} else { } else {
// 有引用则让Agent去检查StorageObject // 有引用则让Agent去检查StorageObject
for _, stg := range stgs { for _, stg := range stgs {
err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false)
err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.ObjectID}), false, false)
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
} }
@@ -335,5 +335,5 @@ func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjec
logger.Debugf("post agent check storage event") logger.Debugf("post agent check storage event")
} }


return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
} }

+ 12
- 12
internal/services/storage.go View File

@@ -12,7 +12,7 @@ import (
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
) )


func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) *coormsg.PreMoveObjectToStorageResp {
func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *ramsg.CodeMessage) {
//查询数据库,获取冗余类型,冗余参数 //查询数据库,获取冗余类型,冗余参数
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSize //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSize
//-若redundancy是rep,查询对象副本表, 获得repHash //-若redundancy是rep,查询对象副本表, 获得repHash
@@ -24,18 +24,18 @@ func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage)
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids


// 查询用户关联的存储服务 // 查询用户关联的存储服务
stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.StorageID)
stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID)
if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("StorageID", msg.Body.StorageID).
log.WithField("UserID", msg.UserID).
WithField("StorageID", msg.StorageID).
Warnf("get user Storage failed, err: %s", err.Error()) Warnf("get user Storage failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Storage failed") return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Storage failed")
} }


// 查询文件对象 // 查询文件对象
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
if err != nil { if err != nil {
log.WithField("ObjectID", msg.Body.ObjectID).
log.WithField("ObjectID", msg.ObjectID).
Warnf("get user Object failed, err: %s", err.Error()) Warnf("get user Object failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Object failed") return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get user Object failed")
} }
@@ -100,17 +100,17 @@ func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage)
} }
} }


func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp {
func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *ramsg.CodeMessage) {
err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
return svc.db.StorageObject().MoveObjectTo(tx, msg.Body.ObjectID, msg.Body.StorageID, msg.Body.UserID)
return svc.db.StorageObject().MoveObjectTo(tx, msg.ObjectID, msg.StorageID, msg.UserID)
}) })
if err != nil { if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("ObjectID", msg.Body.ObjectID).
WithField("StorageID", msg.Body.StorageID).
log.WithField("UserID", msg.UserID).
WithField("ObjectID", msg.ObjectID).
WithField("StorageID", msg.StorageID).
Warnf("user move object to storage failed, err: %s", err.Error()) Warnf("user move object to storage failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "user move object to storage failed") return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "user move object to storage failed")
} }


return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody())
return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageResp())
} }

Loading…
Cancel
Save