From 18cd88060bb25e46eddc377db406dd22914ab2ad Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Thu, 24 Aug 2023 10:43:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20=E6=A0=B9=E6=8D=AEpackagei?= =?UTF-8?q?d=E6=9F=A5=E8=AF=A2node=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/package.go | 93 ++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/internal/services/package.go b/internal/services/package.go index f2c79fd..be6fe90 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -6,6 +6,7 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" @@ -187,3 +188,95 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack return mq.ReplyOK(coormq.NewDeletePackageResp()) } + +func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) (*coormq.GetCacheNodesByPackageResp, *mq.CodeMessage) { + isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("check package available failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "check package available failed") + } + if !isAva { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("package is not available to the user") + return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "package is not available to the user") + } + + pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get package: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + } + + redunancyType := pkg.Redundancy.Type + + uniqueNodeIDs := make(map[int64]bool) + var nodeIDs []int64 + if redunancyType == models.RedundancyRep { + // 备份方式为rep + objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") + } + + for _, data := range objectRepDatas { + for _, nodeID := range data.NodeIDs { + if !uniqueNodeIDs[nodeID] { + uniqueNodeIDs[nodeID] = true + nodeIDs = append(nodeIDs, nodeID) + } + } + } + } else if redunancyType == models.RedundancyEC { + // 备份方式为ec + objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") + } + + for _, ecData := range objectECDatas { + for _, block := range ecData.Blocks { + for _, nodeID := range block.NodeIDs { + if !uniqueNodeIDs[nodeID] { + uniqueNodeIDs[nodeID] = true + nodeIDs = append(nodeIDs, nodeID) + } + } + } + } + } else { + logger.WithField("PackageID", msg.PackageID). + Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) + return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "redundancy type is wrong") + } + + return mq.ReplyOK(coormq.NewGetCacheNodesByPackageResp(nodeIDs, redunancyType)) +} + +func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackage) (*coormq.GetStorageNodesByPackageResp, *mq.CodeMessage) { + storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("get storages by packageID failed, err: %s", err.Error()) + return mq.ReplyFailed[coormq.GetStorageNodesByPackageResp](errorcode.OperationFailed, "get storages by packageID failed") + } + + uniqueNodeIDs := make(map[int64]bool) + var nodeIDs []int64 + for _, stg := range storages { + if !uniqueNodeIDs[stg.NodeID] { + uniqueNodeIDs[stg.NodeID] = true + nodeIDs = append(nodeIDs, stg.NodeID) + } + } + + return mq.ReplyOK(coormq.NewGetStorageNodesByPackageResp(nodeIDs)) +}