From fa0dbbc9b682192eee806da5fde38f05cc4ebee0 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 18 Aug 2023 16:34:36 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E5=8D=95=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=92=8C=E5=A4=9A=E6=96=87=E4=BB=B6=E9=80=BB=E8=BE=91=EF=BC=9B?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=A8=8B=E5=BA=8F=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/config/config.go | 4 +- internal/services/agent.go | 8 +- internal/services/bucket.go | 40 +-- internal/services/command_service_ec.go | 100 ------- internal/services/conmmon.go | 30 ++ internal/services/node.go | 36 +++ internal/services/object.go | 352 +----------------------- internal/services/package.go | 189 +++++++++++++ internal/services/service.go | 6 +- internal/services/storage.go | 84 +----- main.go | 25 +- 11 files changed, 318 insertions(+), 556 deletions(-) delete mode 100644 internal/services/command_service_ec.go create mode 100644 internal/services/conmmon.go create mode 100644 internal/services/node.go create mode 100644 internal/services/package.go diff --git a/internal/config/config.go b/internal/config/config.go index 245820b..864295f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,13 +4,13 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config" - racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { Logger log.Config `json:"logger"` DB db.Config `json:"db"` - RabbitMQ racfg.Config `json:"rabbitMQ"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` } var cfg Config diff --git a/internal/services/agent.go b/internal/services/agent.go index a4d002f..8a31e1c 100644 --- a/internal/services/agent.go +++ b/internal/services/agent.go @@ -1,14 +1,14 @@ package services import ( - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) -func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) { - service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) +func (service *Service) TempCacheReport(msg *coormq.TempCacheReport) { + //service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) } -func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { +func (service *Service) AgentStatusReport(msg *coormq.AgentStatusReport) { //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus //根据command中的Ip,插入节点延迟表 diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 83aeaa5..5ba7526 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -5,10 +5,10 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" - log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { @@ -16,32 +16,32 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { panic("not implement yet") } -func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, *mq.CodeMessage) { +func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) if err != nil { - log.WithField("UserID", msg.UserID). + logger.WithField("UserID", msg.UserID). Warnf("get user buckets failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") + return mq.ReplyFailed[coormq.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") } - return mq.ReplyOK(coormsg.NewGetUserBucketsResp(buckets)) + return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) } -func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, *mq.CodeMessage) { - objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.UserID, msg.BucketID) +func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) { + packages, err := svc.db.Package().GetBucketPackages(svc.db.SQLCtx(), msg.UserID, msg.BucketID) if err != nil { - log.WithField("UserID", msg.UserID). + logger.WithField("UserID", msg.UserID). WithField("BucketID", msg.BucketID). - Warnf("get bucket objects failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OperationFailed, "get bucket objects failed") + Warnf("get bucket packages failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetBucketPackagesResp](errorcode.OperationFailed, "get bucket packages failed") } - return mq.ReplyOK(coormsg.NewGetBucketObjectsResp(objects)) + return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages)) } -func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBucketResp, *mq.CodeMessage) { +func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { var bucketID int64 var err error svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { @@ -50,25 +50,25 @@ func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBuck return err }) if err != nil { - log.WithField("UserID", msg.UserID). + logger.WithField("UserID", msg.UserID). WithField("BucketName", msg.BucketName). Warnf("create bucket failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") + return mq.ReplyFailed[coormq.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") } - return mq.ReplyOK(coormsg.NewCreateBucketResp(bucketID)) + return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) } -func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, *mq.CodeMessage) { +func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { return svc.db.Bucket().Delete(tx, msg.BucketID) }) if err != nil { - log.WithField("UserID", msg.UserID). + logger.WithField("UserID", msg.UserID). WithField("BucketID", msg.BucketID). Warnf("delete bucket failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") + return mq.ReplyFailed[coormq.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") } - return mq.ReplyOK(coormsg.NewDeleteBucketResp()) + return mq.ReplyOK(coormq.NewDeleteBucketResp()) } diff --git a/internal/services/command_service_ec.go b/internal/services/command_service_ec.go deleted file mode 100644 index 49a9b82..0000000 --- a/internal/services/command_service_ec.go +++ /dev/null @@ -1,100 +0,0 @@ -package services - -import ( - "database/sql" - "errors" - - "github.com/jmoiron/sqlx" - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" -) - -func (svc *Service) PreUploadEcObject(msg *coormsg.PreUploadEcObject) (*coormsg.PreUploadEcResp, *mq.CodeMessage) { - - // 判断同名对象是否存在。等到UploadRepObject时再判断一次。 - // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 - isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID) - if err != nil { - logger.WithField("BucketID", msg.BucketID). - Warnf("check bucket available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "check bucket available failed") - } - if !isBucketAvai { - logger.WithField("BucketID", msg.BucketID). - Warnf("bucket is not available to user") - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "bucket is not available to user") - } - - _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName) - if err == nil { - logger.WithField("BucketID", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("object with given Name and BucketID already exists") - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "object with given Name and BucketID already exists") - } - if !errors.Is(err, sql.ErrNoRows) { - logger.WithField("BucketID", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("get object by name failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "get object by name failed") - } - - //查询用户可用的节点IP - nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) - if err != nil { - logger.WithField("UserID", msg.UserID). - Warnf("query user nodes failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "query user nodes failed") - } - - // 查询客户端所属节点 - foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) - if err == sql.ErrNoRows { - foundBelongNode = false - } else if err != nil { - logger.WithField("ClientExternalIP", msg.ClientExternalIP). - Warnf("query client belong node failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "query client belong node failed") - } - - var respNodes []mymq.RespNode - for _, node := range nodes { - respNodes = append(respNodes, mymq.NewRespNode( - node.NodeID, - node.ExternalIP, - node.LocalIP, - // LocationID 相同则认为是在同一个地域 - foundBelongNode && belongNode.LocationID == node.LocationID, - )) - } - //查询纠删码参数 - ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.EcName) - if err != nil { - logger.WithField("Ec", msg.EcName). - Warnf("check ec type failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "check bucket available failed") - } - ecc := mymq.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) - return mq.ReplyOK(coormsg.NewPreUploadEcResp(respNodes, ecc)) -} - -func (svc *Service) CreateEcObject(msg *coormsg.CreateEcObject) (*coormsg.CreateObjectResp, *mq.CodeMessage) { - var objID int64 - err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - var err error - objID, err = svc.db.Object().CreateEcObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.UserID, msg.NodeIDs, msg.Hashes, msg.EcName, msg.DirName) - return err - }) - if err != nil { - logger.WithField("BucketName", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("create rep object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed") - } - - return mq.ReplyOK(coormsg.NewCreateObjectResp(objID)) -} diff --git a/internal/services/conmmon.go b/internal/services/conmmon.go new file mode 100644 index 0000000..928679e --- /dev/null +++ b/internal/services/conmmon.go @@ -0,0 +1,30 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) { + location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP) + if err != nil { + logger.WithField("IP", msg.IP). + Warnf("query client location failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query client location failed") + } + + return mq.ReplyOK(coormq.NewFindClientLocationResp(location)) +} + +func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) { + ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName) + if err != nil { + logger.WithField("ECName", msg.ECName). + Warnf("query ec failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query ec failed") + } + + return mq.ReplyOK(coormq.NewGetECConfigResp(ec)) +} diff --git a/internal/services/node.go b/internal/services/node.go new file mode 100644 index 0000000..9a12708 --- /dev/null +++ b/internal/services/node.go @@ -0,0 +1,36 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) { + nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) + if err != nil { + logger.WithField("UserID", msg.UserID). + Warnf("query user nodes failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query user nodes failed") + } + + return mq.ReplyOK(coormq.NewGetUserNodesResp(nodes)) +} + +func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) { + var nodes []model.Node + for _, id := range msg.NodeIDs { + node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) + if err != nil { + logger.WithField("NodeID", id). + Warnf("query node failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query node failed") + } + + nodes = append(nodes, node) + } + + return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) +} diff --git a/internal/services/object.go b/internal/services/object.go index 810631f..68272d3 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -1,358 +1,32 @@ package services import ( - "database/sql" - "errors" - - "github.com/jmoiron/sqlx" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" - scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) -func (svc *Service) GetObjectsByDirName(msg *coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, *mq.CodeMessage) { - //查询dirName下所有文件 - objects, err := svc.db.Object().GetByDirName(svc.db.SQLCtx(), msg.DirName) - if err != nil { - logger.WithField("DirName", msg.DirName). - Warnf("query dirname failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.GetObjectsResp](errorcode.OperationFailed, "get objects failed") - } - - return mq.ReplyOK(coormsg.NewGetObjectsResp(objects)) -} - -func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *mq.CodeMessage) { - - // 查询文件对象 - object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) - if err != nil { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("query Object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Object failed") - } - - // 查询客户端所属节点 - foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) - if err == sql.ErrNoRows { - foundBelongNode = false - } else if err != nil { - logger.WithField("ClientExternalIP", msg.ClientExternalIP). - Warnf("query client belong node failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query client belong node failed") - } - logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID) - - //-若redundancy是rep,查询对象副本表, 获得FileHash - if object.Redundancy == models.RedundancyRep { - objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID) - if err != nil { - logger.WithField("ObjectID", object.ObjectID). - Warnf("get ObjectRep failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query ObjectRep failed") - } - - // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 - nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash) - if err != nil { - logger.WithField("FileHash", objectRep.FileHash). - Warnf("query Cache failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed") - } - - var respNodes []mymq.RespNode - for _, node := range nodes { - respNodes = append(respNodes, mymq.NewRespNode( - node.NodeID, - node.ExternalIP, - node.LocalIP, - // LocationID 相同则认为是在同一个地域 - foundBelongNode && belongNode.LocationID == node.LocationID, - )) - } - - return mq.ReplyOK(coormsg.NewPreDownloadObjectResp( - object.FileSize, - mymq.NewRespRepRedundancyData(objectRep.FileHash, respNodes), - )) - - } else { - // TODO 参考上面进行重写 - ecName := object.Redundancy - blocks, err := svc.db.QueryObjectBlock(object.ObjectID) - if err != nil { - logger.WithField("ObjectID", object.ObjectID). - Warnf("query Blocks failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Blocks failed") - } - logger.Debugf(blocks[4].BlockHash) - //查询纠删码参数 - ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName) - ecc := mymq.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) - //查询每个编码块存放的所有节点 - respNodes := make([][]mymq.RespNode, len(blocks)) - for i := 0; i < len(blocks); i++ { - nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, blocks[i].BlockHash) - if err != nil { - logger.WithField("FileHash", blocks[i].BlockHash). - Warnf("query Cache failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed") - } - var nd []mymq.RespNode - for _, node := range nodes { - nd = append(nd, mymq.NewRespNode( - node.NodeID, - node.ExternalIP, - node.LocalIP, - // LocationID 相同则认为是在同一个地域 - foundBelongNode && belongNode.LocationID == node.LocationID, - )) - } - respNodes[i] = nd - logger.Debugf("##%d\n", i) - } - - var blockss []mymq.RespObjectBlock - for i := 0; i < len(blocks); i++ { - blockss = append(blockss, mymq.NewRespObjectBlock( - blocks[i].InnerID, - blocks[i].BlockHash, - )) - } - return mq.ReplyOK(coormsg.NewPreDownloadObjectResp( - object.FileSize, - mymq.NewRespEcRedundancyData(ecc, blockss, respNodes), - )) - } -} - -func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *mq.CodeMessage) { - - // 判断同名对象是否存在。等到UploadRepObject时再判断一次。 - // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 - isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID) - if err != nil { - logger.WithField("BucketID", msg.BucketID). - Warnf("check bucket available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "check bucket available failed") - } - if !isBucketAvai { - logger.WithField("BucketID", msg.BucketID). - Warnf("bucket is not available to user") - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "bucket is not available to user") - } - - _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName) - if err == nil { - logger.WithField("BucketID", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("object with given Name and BucketID already exists") - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "object with given Name and BucketID already exists") - } - if !errors.Is(err, sql.ErrNoRows) { - logger.WithField("BucketID", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("get object by name failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "get object by name failed") - } - - //查询用户可用的节点IP - nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) +func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { + data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { - logger.WithField("UserID", msg.UserID). - Warnf("query user nodes failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query user nodes failed") - } + logger.WithField("PackageID", msg.PackageID). + Warnf("query object rep and node id in package: %s", err.Error()) - // 查询客户端所属节点 - foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) - if err == sql.ErrNoRows { - foundBelongNode = false - } else if err != nil { - logger.WithField("ClientExternalIP", msg.ClientExternalIP). - Warnf("query client belong node failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query client belong node failed") + return nil, mq.Failed(errorcode.OperationFailed, "query object rep and node id in package failed") } - var respNodes []mymq.RespNode - for _, node := range nodes { - respNodes = append(respNodes, mymq.NewRespNode( - node.NodeID, - node.ExternalIP, - node.LocalIP, - // LocationID 相同则认为是在同一个地域 - foundBelongNode && belongNode.LocationID == node.LocationID, - )) - } - - return mq.ReplyOK(coormsg.NewPreUploadResp(respNodes)) + return mq.ReplyOK(coormq.NewGetPackageObjectRepDataResp(data)) } -func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *mq.CodeMessage) { - var objID int64 - err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - var err error - objID, err = svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash, msg.DirName) - return err - }) - if err != nil { - logger.WithField("BucketName", msg.BucketID). - WithField("ObjectName", msg.ObjectName). - Warnf("create rep object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed") - } - - // 紧急任务 - err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) +func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) (*coormq.GetPackageObjectECDataResp, *mq.CodeMessage) { + data, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) if err != nil { - logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) - } - - return mq.ReplyOK(coormsg.NewCreateObjectResp(objID)) -} - -func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *mq.CodeMessage) { - // TODO 检查用户是否有Object的权限 - // 获取对象信息 - obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID) - if err != nil { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("get object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object failed") - } - if obj.Redundancy != models.RedundancyRep { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("this object is not a rep object") - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "this object is not a rep object") - } - - // 获取对象Rep信息 - objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID) - if err != nil { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("get object rep failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object rep failed") - } - - //查询用户可用的节点IP - nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) - if err != nil { - logger.WithField("UserID", msg.UserID). - Warnf("query user nodes failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query user nodes failed") - } - - // 查询客户端所属节点 - foundBelongNode := true - belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) - if err == sql.ErrNoRows { - foundBelongNode = false - } else if err != nil { - logger.WithField("ClientExternalIP", msg.ClientExternalIP). - Warnf("query client belong node failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query client belong node failed") - } - - // 查询保存了旧文件的节点信息 - cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash) - if err != nil { - logger.Warnf("find caching file user nodes failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "find caching file user nodes failed") - } - - var retNodes []coormsg.PreUpdateRepObjectRespNode - for _, node := range nodes { - retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode( - node.NodeID, - node.ExternalIP, - node.LocalIP, - // LocationID 相同则认为是在同一个地域 - foundBelongNode && belongNode.LocationID == node.LocationID, - // 此节点存储了对象旧文件 - lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }), - )) - } - - return mq.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes)) -} - -func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *mq.CodeMessage) { - err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash) - }) - if err != nil { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("update rep object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OperationFailed, "update rep object failed") - } - - // 紧急任务 - err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) - if err != nil { - logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error()) - } - - return mq.ReplyOK(coormsg.NewUpdateRepObjectResp()) -} - -func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *mq.CodeMessage) { - isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("ObjectID", msg.ObjectID). - Warnf("check object available failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "check object available failed") - } - if !isAva { - logger.WithField("UserID", msg.UserID). - WithField("ObjectID", msg.ObjectID). - Warnf("object is not available to the user") - return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "object is not available to the user") - } - - err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.Object().SoftDelete(tx, msg.ObjectID) - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("ObjectID", msg.ObjectID). - Warnf("set object deleted failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "set object deleted failed") - } - - stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID) - if err != nil { - logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error()) - return mq.ReplyOK(coormsg.NewDeleteObjectResp()) - } - - // 不追求及时、准确 - if len(stgs) == 0 { - // 如果没有被引用,直接投递CheckObject的任务 - err := svc.scanner.PostEvent(scevt.NewCheckObject([]int64{msg.ObjectID}), false, false) - if err != nil { - logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) - } - logger.Debugf("post check object event") + logger.WithField("PackageID", msg.PackageID). + Warnf("query object ec and node id in package: %s", err.Error()) - } else { - // 有引用则让Agent去检查StorageObject - for _, stg := range stgs { - err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.ObjectID}), false, false) - if err != nil { - logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) - } - } - logger.Debugf("post agent check storage event") + return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed") } - return mq.ReplyOK(coormsg.NewDeleteObjectResp()) + return mq.ReplyOK(coormq.NewGetPackageObjectECDataResp(data)) } diff --git a/internal/services/package.go b/internal/services/package.go new file mode 100644 index 0000000..f2c79fd --- /dev/null +++ b/internal/services/package.go @@ -0,0 +1,189 @@ +package services + +import ( + "database/sql" + "fmt" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" +) + +func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) +} + +func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { + // TODO 检查用户是否有权限 + objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package objects: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") + } + + return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) +} + +func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { + var pkgID int64 + err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + var err error + pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy) + return err + }) + if err != nil { + logger.WithField("BucketID", msg.BucketID). + WithField("Name", msg.Name). + Warnf("creating package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "creating package failed") + } + + return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) +} + +func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) { + _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + // 先执行删除操作 + if len(msg.Deletes) > 0 { + if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { + return fmt.Errorf("deleting objects: %w", err) + } + } + + // 再执行添加操作 + if len(msg.Adds) > 0 { + if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil { + return fmt.Errorf("adding objects: %w", err) + } + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed") + } + + // 紧急任务 + var affectFileHashes []string + for _, add := range msg.Adds { + affectFileHashes = append(affectFileHashes, add.FileHash) + } + + err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) + } + + return mq.ReplyOK(coormq.NewUpdateRepPackageResp()) +} + +func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) { + _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + // 先执行删除操作 + if len(msg.Deletes) > 0 { + if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { + return fmt.Errorf("deleting objects: %w", err) + } + } + + // 再执行添加操作 + if len(msg.Adds) > 0 { + if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil { + return fmt.Errorf("adding objects: %w", err) + } + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed") + } + + return mq.ReplyOK(coormq.NewUpdateECPackageResp()) +} + +func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { + isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("check package available failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed") + } + if !isAva { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("package is not available to the user") + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user") + } + + err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { + return svc.db.Package().SoftDelete(tx, msg.PackageID) + }) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("set package deleted failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed") + } + + stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.Warnf("find package storages failed, but this will not affect the deleting, err: %s", err.Error()) + return mq.ReplyOK(coormq.NewDeletePackageResp()) + } + + // 不追求及时、准确 + if len(stgs) == 0 { + // 如果没有被引用,直接投递CheckPackage的任务 + err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) + } + logger.Debugf("post check package event") + + } else { + // 有引用则让Agent去检查StoragePackage + for _, stg := range stgs { + err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) + if err != nil { + logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) + } + } + logger.Debugf("post agent check storage event") + } + + return mq.ReplyOK(coormq.NewDeletePackageResp()) +} diff --git a/internal/services/service.go b/internal/services/service.go index a40da25..dbc4f8a 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -2,15 +2,15 @@ package services import ( mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" - sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" ) type Service struct { db *mydb.DB - scanner *sccli.Client + scanner *scmq.Client } -func NewService(db *mydb.DB, scanner *sccli.Client) *Service { +func NewService(db *mydb.DB, scanner *scmq.Client) *Service { return &Service{ db: db, scanner: scanner, diff --git a/internal/services/storage.go b/internal/services/storage.go index fffe18b..3771440 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -5,100 +5,34 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" - mymodels "gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/common/pkgs/mq" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) -func (svc *Service) GetStorageInfo(msg *coormsg.GetStorageInfo) (*coormsg.GetStorageInfoResp, *mq.CodeMessage) { +func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) { stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) if err != nil { logger.Warnf("getting user storage: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") } - return mq.ReplyOK(coormsg.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) + return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) } -func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *mq.CodeMessage) { - // 查询用户关联的存储服务 - stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("StorageID", msg.StorageID). - Warnf("get user Storage failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Storage failed") - } - - // 查询文件对象 - object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) - if err != nil { - logger.WithField("ObjectID", msg.ObjectID). - Warnf("get user Object failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Object failed") - } - - //-若redundancy是rep,查询对象副本表, 获得FileHash - if object.Redundancy == models.RedundancyRep { - objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID) - if err != nil { - logger.Warnf("get ObjectRep failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get ObjectRep failed") - } - - return mq.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody( - stg.NodeID, - stg.Directory, - object, - mymodels.NewRedundancyRepData(objectRep.FileHash), - )) - - } else { - // TODO 以EC_开头的Redundancy才是EC策略 - ecName := object.Redundancy - blocks, err := svc.db.QueryObjectBlock(object.ObjectID) - if err != nil { - logger.WithField("ObjectID", object.ObjectID). - Warnf("query Blocks failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "query Blocks failed") - } - //查询纠删码参数 - ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName) - // TODO zkx 异常处理 - ecc := mymodels.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) - - blockss := make([]mymodels.ObjectBlock, len(blocks)) - for i := 0; i < len(blocks); i++ { - blockss[i] = mymodels.NewObjectBlock( - blocks[i].InnerID, - blocks[i].BlockHash, - ) - } - - return mq.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody( - stg.NodeID, - stg.Directory, - object, - mymodels.NewRedundancyEcData(ecc, blockss), - )) - } -} - -func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *mq.CodeMessage) { +func (svc *Service) PackageMovedToStorage(msg *coormq.PackageMovedToStorage) (*coormq.PackageMovedToStorageResp, *mq.CodeMessage) { // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件 err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - return svc.db.StorageObject().MoveObjectTo(tx, msg.ObjectID, msg.StorageID, msg.UserID) + return svc.db.StoragePackage().MovePackageTo(tx, msg.PackageID, msg.StorageID, msg.UserID) }) if err != nil { logger.WithField("UserID", msg.UserID). - WithField("ObjectID", msg.ObjectID). + WithField("PackageID", msg.PackageID). WithField("StorageID", msg.StorageID). - Warnf("user move object to storage failed, err: %s", err.Error()) - return mq.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OperationFailed, "user move object to storage failed") + Warnf("user move package to storage failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "user move package to storage failed") } - return mq.ReplyOK(coormsg.NewMoveObjectToStorageResp()) + return mq.ReplyOK(coormq.NewPackageMovedToStorageResp()) } diff --git a/main.go b/main.go index a284362..f08b964 100644 --- a/main.go +++ b/main.go @@ -5,10 +5,9 @@ import ( "os" "gitlink.org.cn/cloudream/common/pkgs/logger" - log "gitlink.org.cn/cloudream/common/pkgs/logger" mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" - sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" - rasvr "gitlink.org.cn/cloudream/storage-common/pkgs/mq/server/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage-coordinator/internal/config" "gitlink.org.cn/cloudream/storage-coordinator/internal/services" ) @@ -28,21 +27,21 @@ func main() { db, err := mydb.NewDB(&config.Cfg().DB) if err != nil { - log.Fatalf("new db failed, err: %s", err.Error()) + logger.Fatalf("new db failed, err: %s", err.Error()) } - scanner, err := sccli.NewClient(&config.Cfg().RabbitMQ) + scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) if err != nil { - log.Fatalf("new scanner client failed, err: %s", err.Error()) + logger.Fatalf("new scanner client failed, err: %s", err.Error()) } - coorSvr, err := rasvr.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) + coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) if err != nil { - log.Fatalf("new coordinator server failed, err: %s", err.Error()) + logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } coorSvr.OnError = func(err error) { - log.Warnf("coordinator server err: %s", err.Error()) + logger.Warnf("coordinator server err: %s", err.Error()) } // 启动服务 @@ -52,13 +51,13 @@ func main() { <-forever } -func serveCoorServer(server *rasvr.Server) { - log.Info("start serving command server") +func serveCoorServer(server *coormq.Server) { + logger.Info("start serving command server") err := server.Serve() if err != nil { - log.Errorf("command server stopped with error: %s", err.Error()) + logger.Errorf("command server stopped with error: %s", err.Error()) } - log.Info("command server stopped") + logger.Info("command server stopped") }