| @@ -6,6 +6,7 @@ import ( | |||||
| "github.com/jmoiron/sqlx" | "github.com/jmoiron/sqlx" | ||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| "gitlink.org.cn/cloudream/common/models" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | ||||
| @@ -187,3 +188,95 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack | |||||
| return mq.ReplyOK(coormq.NewDeletePackageResp()) | return mq.ReplyOK(coormq.NewDeletePackageResp()) | ||||
| } | } | ||||
| func (svc *Service) GetCacheNodesByPackage(msg *coormq.GetCacheNodesByPackage) (*coormq.GetCacheNodesByPackageResp, *mq.CodeMessage) { | |||||
| isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) | |||||
| if err != nil { | |||||
| logger.WithField("UserID", msg.UserID). | |||||
| WithField("PackageID", msg.PackageID). | |||||
| Warnf("check package available failed, err: %s", err.Error()) | |||||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "check package available failed") | |||||
| } | |||||
| if !isAva { | |||||
| logger.WithField("UserID", msg.UserID). | |||||
| WithField("PackageID", msg.PackageID). | |||||
| Warnf("package is not available to the user") | |||||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "package is not available to the user") | |||||
| } | |||||
| pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) | |||||
| if err != nil { | |||||
| logger.WithField("PackageID", msg.PackageID). | |||||
| Warnf("get package: %s", err.Error()) | |||||
| return nil, mq.Failed(errorcode.OperationFailed, "get package failed") | |||||
| } | |||||
| redunancyType := pkg.Redundancy.Type | |||||
| uniqueNodeIDs := make(map[int64]bool) | |||||
| var nodeIDs []int64 | |||||
| if redunancyType == models.RedundancyRep { | |||||
| // 备份方式为rep | |||||
| objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||||
| if err != nil { | |||||
| logger.WithField("PackageID", msg.PackageID). | |||||
| Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) | |||||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed") | |||||
| } | |||||
| for _, data := range objectRepDatas { | |||||
| for _, nodeID := range data.NodeIDs { | |||||
| if !uniqueNodeIDs[nodeID] { | |||||
| uniqueNodeIDs[nodeID] = true | |||||
| nodeIDs = append(nodeIDs, nodeID) | |||||
| } | |||||
| } | |||||
| } | |||||
| } else if redunancyType == models.RedundancyEC { | |||||
| // 备份方式为ec | |||||
| objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||||
| if err != nil { | |||||
| logger.WithField("PackageID", msg.PackageID). | |||||
| Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) | |||||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "get objectECDatas by packageID failed") | |||||
| } | |||||
| for _, ecData := range objectECDatas { | |||||
| for _, block := range ecData.Blocks { | |||||
| for _, nodeID := range block.NodeIDs { | |||||
| if !uniqueNodeIDs[nodeID] { | |||||
| uniqueNodeIDs[nodeID] = true | |||||
| nodeIDs = append(nodeIDs, nodeID) | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| } else { | |||||
| logger.WithField("PackageID", msg.PackageID). | |||||
| Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) | |||||
| return mq.ReplyFailed[coormq.GetCacheNodesByPackageResp](errorcode.OperationFailed, "redundancy type is wrong") | |||||
| } | |||||
| return mq.ReplyOK(coormq.NewGetCacheNodesByPackageResp(nodeIDs, redunancyType)) | |||||
| } | |||||
| func (svc *Service) GetStorageNodesByPackage(msg *coormq.GetStorageNodesByPackage) (*coormq.GetStorageNodesByPackageResp, *mq.CodeMessage) { | |||||
| storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) | |||||
| if err != nil { | |||||
| logger.WithField("PackageID", msg.PackageID). | |||||
| Warnf("get storages by packageID failed, err: %s", err.Error()) | |||||
| return mq.ReplyFailed[coormq.GetStorageNodesByPackageResp](errorcode.OperationFailed, "get storages by packageID failed") | |||||
| } | |||||
| uniqueNodeIDs := make(map[int64]bool) | |||||
| var nodeIDs []int64 | |||||
| for _, stg := range storages { | |||||
| if !uniqueNodeIDs[stg.NodeID] { | |||||
| uniqueNodeIDs[stg.NodeID] = true | |||||
| nodeIDs = append(nodeIDs, stg.NodeID) | |||||
| } | |||||
| } | |||||
| return mq.ReplyOK(coormq.NewGetStorageNodesByPackageResp(nodeIDs)) | |||||
| } | |||||