| @@ -273,6 +273,25 @@ func PackageDeletePackage(ctx CommandContext, packageID int64) error { | |||
| return nil | |||
| } | |||
| func PackageGetCacheNodesByPackage(ctx CommandContext, packageID int64, userID int64) error { | |||
| nodeIDs, redunancyType, err := ctx.Cmdline.Svc.PackageSvc().GetCacheNodesByPackage(userID, packageID) | |||
| fmt.Printf("nodeIDs: %v\n", nodeIDs) | |||
| fmt.Printf("redunancyType: %v\n", redunancyType) | |||
| if err != nil { | |||
| return fmt.Errorf("get cache nodes by packageID %d failed, err: %w", packageID, err) | |||
| } | |||
| return nil | |||
| } | |||
| func PackageGetStorageNodesByPackage(ctx CommandContext, packageID int64, userID int64) error { | |||
| nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetStorageNodesByPackage(userID, packageID) | |||
| fmt.Printf("nodeIDs: %v\n", nodeIDs) | |||
| if err != nil { | |||
| return fmt.Errorf("get storage nodes by packageID %d failed, err: %w", packageID, err) | |||
| } | |||
| return nil | |||
| } | |||
| func init() { | |||
| commands.MustAdd(PackageListBucketPackages, "pkg", "ls") | |||
| @@ -287,4 +306,8 @@ func init() { | |||
| commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec") | |||
| commands.MustAdd(PackageDeletePackage, "pkg", "delete") | |||
| commands.MustAdd(PackageGetCacheNodesByPackage, "pkg", "cache", "nodes") | |||
| commands.MustAdd(PackageGetStorageNodesByPackage, "pkg", "storage", "nodes") | |||
| } | |||
| @@ -171,3 +171,66 @@ func (s *PackageService) Delete(ctx *gin.Context) { | |||
| ctx.JSON(http.StatusOK, OK(nil)) | |||
| } | |||
| type PackageGetCacheNodeIDs struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| PackageID *int64 `json:"packageID" binding:"required"` | |||
| } | |||
| type GetCacheNodesByPackageResp struct { | |||
| NodeIDs []int64 `json:"nodeIDs"` | |||
| RedunancyType string `json:"redunancyType,string"` | |||
| } | |||
| func (s *PackageService) GetCacheNodeIDs(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetCacheNodeIDs") | |||
| var req PackageGetCacheNodeIDs | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| nodeIDs, redunancyType, err := s.svc.PackageSvc().GetCacheNodesByPackage(*req.UserID, *req.PackageID) | |||
| if err != nil { | |||
| log.Warnf("get cache nodes by packageID failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get cache nodes by packageID failed")) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(GetCacheNodesByPackageResp{ | |||
| NodeIDs: nodeIDs, | |||
| RedunancyType: redunancyType, | |||
| })) | |||
| } | |||
| type PackageGetStorageNodeIDs struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| PackageID *int64 `json:"packageID" binding:"required"` | |||
| } | |||
| type GetStorageNodesByPackageResp struct { | |||
| NodeIDs []int64 `json:"nodeIDs"` | |||
| } | |||
| func (s *PackageService) GetStorageNodeIDs(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetStorageNodeIDs") | |||
| var req PackageGetStorageNodeIDs | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| nodeIDs, err := s.svc.PackageSvc().GetStorageNodesByPackage(*req.UserID, *req.PackageID) | |||
| if err != nil { | |||
| log.Warnf("get storage nodes by packageID failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage nodes by packageID failed")) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(GetStorageNodesByPackageResp{ | |||
| NodeIDs: nodeIDs, | |||
| })) | |||
| } | |||
| @@ -42,6 +42,8 @@ func (s *Server) initRouters() { | |||
| s.engine.POST("/package/upload", s.PackageSvc().Upload) | |||
| s.engine.POST("/package/delete", s.PackageSvc().Delete) | |||
| s.engine.GET("/package/getCacheNodeIDs", s.PackageSvc().GetCacheNodeIDs) | |||
| s.engine.GET("/package/getStorageNodeIDs", s.PackageSvc().GetStorageNodeIDs) | |||
| s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) | |||
| s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) | |||
| @@ -218,3 +218,31 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { | |||
| return nil | |||
| } | |||
| func (svc *PackageService) GetCacheNodesByPackage(userID int64, packageID int64) ([]int64, string, error) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return nil, "", fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer coorCli.Close() | |||
| resp, err := coorCli.GetCacheNodesByPackage(coormq.NewGetCacheNodesByPackage(userID, packageID)) | |||
| if err != nil { | |||
| return nil, "", fmt.Errorf("get node by package: %w", err) | |||
| } | |||
| return resp.NodeIDs, resp.RedundancyType, nil | |||
| } | |||
| func (svc *PackageService) GetStorageNodesByPackage(userID int64, packageID int64) ([]int64, error) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return nil, fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer coorCli.Close() | |||
| resp, err := coorCli.GetStorageNodesByPackage(coormq.NewGetStorageNodesByPackage(userID, packageID)) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("get node by package: %w", err) | |||
| } | |||
| return resp.NodeIDs, nil | |||
| } | |||