| @@ -0,0 +1,66 @@ | |||||
| package http | |||||
| import ( | |||||
| "net/http" | |||||
| "time" | |||||
| "github.com/gin-gonic/gin" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| type CacheService struct { | |||||
| *Server | |||||
| } | |||||
| func (s *Server) CacheSvc() *CacheService { | |||||
| return &CacheService{ | |||||
| Server: s, | |||||
| } | |||||
| } | |||||
| type CacheMovePackageReq struct { | |||||
| UserID *cdssdk.UserID `json:"userID" binding:"required"` | |||||
| PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` | |||||
| NodeID *cdssdk.NodeID `json:"nodeID" binding:"required"` | |||||
| } | |||||
| type CacheMovePackageResp = cdssdk.CacheMovePackageResp | |||||
| func (s *CacheService) MovePackage(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Cache.LoadPackage") | |||||
| var req CacheMovePackageReq | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID) | |||||
| if err != nil { | |||||
| log.Warnf("start cache move package: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) | |||||
| return | |||||
| } | |||||
| for { | |||||
| complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) | |||||
| if complete { | |||||
| if err != nil { | |||||
| log.Warnf("moving complete with: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{})) | |||||
| return | |||||
| } | |||||
| if err != nil { | |||||
| log.Warnf("wait moving: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) | |||||
| return | |||||
| } | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,45 @@ | |||||
| package http | |||||
| import ( | |||||
| "net/http" | |||||
| "github.com/gin-gonic/gin" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| type NodeService struct { | |||||
| *Server | |||||
| } | |||||
| func (s *Server) NodeSvc() *NodeService { | |||||
| return &NodeService{ | |||||
| Server: s, | |||||
| } | |||||
| } | |||||
| type GetNodesReq struct { | |||||
| NodeIDs *[]cdssdk.NodeID `form:"nodeIDs" binding:"required"` | |||||
| } | |||||
| type GetNodesResp = cdssdk.NodeGetNodesResp | |||||
| func (s *ObjectService) GetNodes(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Node.GetNodes") | |||||
| var req GetNodesReq | |||||
| if err := ctx.ShouldBindQuery(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| nodes, err := s.svc.NodeSvc().GetNodes(*req.NodeIDs) | |||||
| if err != nil { | |||||
| log.Warnf("getting nodes: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(GetNodesResp{Nodes: nodes})) | |||||
| } | |||||
| @@ -0,0 +1,57 @@ | |||||
| package services | |||||
| import ( | |||||
| "fmt" | |||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" | |||||
| ) | |||||
| type CacheService struct { | |||||
| *Service | |||||
| } | |||||
| func (svc *Service) CacheSvc() *CacheService { | |||||
| return &CacheService{Service: svc} | |||||
| } | |||||
| func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) (string, error) { | |||||
| agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) | |||||
| if err != nil { | |||||
| return "", fmt.Errorf("new agent client: %w", err) | |||||
| } | |||||
| defer stgglb.AgentMQPool.Release(agentCli) | |||||
| startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID)) | |||||
| if err != nil { | |||||
| return "", fmt.Errorf("start cache move package: %w", err) | |||||
| } | |||||
| return startResp.TaskID, nil | |||||
| } | |||||
| func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, error) { | |||||
| agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) | |||||
| if err != nil { | |||||
| return true, fmt.Errorf("new agent client: %w", err) | |||||
| } | |||||
| defer stgglb.AgentMQPool.Release(agentCli) | |||||
| waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) | |||||
| if err != nil { | |||||
| return true, fmt.Errorf("wait cache move package: %w", err) | |||||
| } | |||||
| if !waitResp.IsComplete { | |||||
| return false, nil | |||||
| } | |||||
| if waitResp.Error != "" { | |||||
| return true, fmt.Errorf("%s", waitResp.Error) | |||||
| } | |||||
| return true, nil | |||||
| } | |||||
| @@ -0,0 +1,32 @@ | |||||
| package services | |||||
| import ( | |||||
| "fmt" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| ) | |||||
| type NodeService struct { | |||||
| *Service | |||||
| } | |||||
| func (svc *Service) NodeSvc() *NodeService { | |||||
| return &NodeService{Service: svc} | |||||
| } | |||||
| func (svc *NodeService) GetNodes(nodeIDs []cdssdk.NodeID) ([]cdssdk.Node, error) { | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("new coordinator client: %w", err) | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| getResp, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("requsting to coodinator: %w", err) | |||||
| } | |||||
| return getResp.Nodes, nil | |||||
| } | |||||