diff --git a/client/internal/http/cache.go b/client/internal/http/cache.go new file mode 100644 index 0000000..e1f1210 --- /dev/null +++ b/client/internal/http/cache.go @@ -0,0 +1,66 @@ +package http + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type CacheService struct { + *Server +} + +func (s *Server) CacheSvc() *CacheService { + return &CacheService{ + Server: s, + } +} + +type CacheMovePackageReq struct { + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` + NodeID *cdssdk.NodeID `json:"nodeID" binding:"required"` +} +type CacheMovePackageResp = cdssdk.CacheMovePackageResp + +func (s *CacheService) MovePackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "Cache.LoadPackage") + + var req CacheMovePackageReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID) + if err != nil { + log.Warnf("start cache move package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + + for { + complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) + if complete { + if err != nil { + log.Warnf("moving complete with: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{})) + return + } + + if err != nil { + log.Warnf("wait moving: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) + return + } + } +} diff --git a/client/internal/http/node.go b/client/internal/http/node.go new file mode 100644 index 0000000..fa6d729 --- /dev/null +++ b/client/internal/http/node.go @@ -0,0 +1,45 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type NodeService struct { + *Server +} + +func (s *Server) NodeSvc() *NodeService { + return &NodeService{ + Server: s, + } +} + +type GetNodesReq struct { + NodeIDs *[]cdssdk.NodeID `form:"nodeIDs" binding:"required"` +} +type GetNodesResp = cdssdk.NodeGetNodesResp + +func (s *ObjectService) GetNodes(ctx *gin.Context) { + log := logger.WithField("HTTP", "Node.GetNodes") + + var req GetNodesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + nodes, err := s.svc.NodeSvc().GetNodes(*req.NodeIDs) + if err != nil { + log.Warnf("getting nodes: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) + return + } + + ctx.JSON(http.StatusOK, OK(GetNodesResp{Nodes: nodes})) +} diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go new file mode 100644 index 0000000..ac37be9 --- /dev/null +++ b/client/internal/services/cache.go @@ -0,0 +1,57 @@ +package services + +import ( + "fmt" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" +) + +type CacheService struct { + *Service +} + +func (svc *Service) CacheSvc() *CacheService { + return &CacheService{Service: svc} +} + +func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) (string, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + return "", fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID)) + if err != nil { + return "", fmt.Errorf("start cache move package: %w", err) + } + + return startResp.TaskID, nil +} + +func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + return true, fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) + if err != nil { + return true, fmt.Errorf("wait cache move package: %w", err) + } + + if !waitResp.IsComplete { + return false, nil + } + + if waitResp.Error != "" { + return true, fmt.Errorf("%s", waitResp.Error) + } + + return true, nil +} diff --git a/client/internal/services/node.go b/client/internal/services/node.go new file mode 100644 index 0000000..a4c0c7f --- /dev/null +++ b/client/internal/services/node.go @@ -0,0 +1,32 @@ +package services + +import ( + "fmt" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type NodeService struct { + *Service +} + +func (svc *Service) NodeSvc() *NodeService { + return &NodeService{Service: svc} +} + +func (svc *NodeService) GetNodes(nodeIDs []cdssdk.NodeID) ([]cdssdk.Node, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp.Nodes, nil +}