diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index c7399d6..0704613 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -45,7 +45,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") } - tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) + tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath)) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } @@ -65,7 +65,9 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) + loadTsk := tsk.Body().(*mytask.StorageLoadPackage) + + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -75,10 +77,12 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) + loadTsk := tsk.Body().(*mytask.StorageLoadPackage) + + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "")) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) } } diff --git a/agent/internal/task/download_package.go b/agent/internal/task/download_package.go deleted file mode 100644 index 9e9c275..0000000 --- a/agent/internal/task/download_package.go +++ /dev/null @@ -1,27 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" -) - -type DownloadPackage struct { - cmd *cmd.DownloadPackage -} - -func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { - return &DownloadPackage{ - cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), - } -} -func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.cmd.Execute(&cmd.DownloadPackageContext{ - Distlock: ctx.distlock, - }) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go new file mode 100644 index 0000000..ef7da0f --- /dev/null +++ b/agent/internal/task/storage_load_package.go @@ -0,0 +1,29 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" +) + +type StorageLoadPackage struct { + cmd *cmd.DownloadPackage + FullPath string +} + +func NewStorageLoadPackage(userID int64, packageID int64, outputPath string) *StorageLoadPackage { + return &StorageLoadPackage{ + cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), + FullPath: outputPath, + } +} +func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + err := t.cmd.Execute(&cmd.DownloadPackageContext{ + Distlock: ctx.distlock, + }) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index f727068..22059b9 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -14,12 +14,13 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er } for { - complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) } + fmt.Printf("Load To: %s\n", fullPath) return nil } diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index d16ff68..f707387 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -26,6 +26,10 @@ type StorageLoadPackageReq struct { StorageID *int64 `json:"storageID" binding:"required"` } +type StorageLoadPackageResp struct { + stgsdk.StorageLoadPackageResp +} + func (s *StorageService) LoadPackage(ctx *gin.Context) { log := logger.WithField("HTTP", "Storage.LoadPackage") @@ -44,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } for { - complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { log.Warnf("loading complete with: %s", err.Error()) @@ -52,7 +56,11 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(StorageLoadPackageResp{ + StorageLoadPackageResp: stgsdk.StorageLoadPackageResp{ + FullPath: fullPath, + }, + })) return } @@ -118,3 +126,38 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } } } + +type StorageGetInfoReq struct { + UserID *int64 `json:"userID" binding:"required"` + StorageID *int64 `json:"storageID" binding:"required"` +} + +type StorageGetInfoResp struct { + stgsdk.StorageGetInfoResp +} + +func (s *StorageService) GetInfo(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.GetInfo") + + var req StorageGetInfoReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + info, err := s.svc.StorageSvc().GetInfo(*req.UserID, *req.StorageID) + if err != nil { + log.Warnf("getting info: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed")) + return + } + + ctx.JSON(http.StatusOK, OK(StorageGetInfoResp{ + StorageGetInfoResp: stgsdk.StorageGetInfoResp{ + Name: info.Name, + NodeID: info.NodeID, + Directory: info.Directory, + }, + })) +} diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index d5514a8..89a075f 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -25,12 +26,13 @@ func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64 return tsk.ID(), nil } -func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) { +func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Error() + loadTsk := tsk.Body().(*task.StorageLoadPackage) + return true, loadTsk.ResultFullPath, tsk.Error() } - return false, nil + return false, "", nil } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { @@ -89,3 +91,18 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, return true, waitResp.PackageID, nil } + +func (svc *StorageService) GetInfo(userID int64, storageID int64) (*model.Storage, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return &getResp.Storage, nil +} diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 4c42b15..42e9a77 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -11,10 +11,13 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) +// TODO 可以考虑不用Task来实现这些逻辑 type StorageLoadPackage struct { userID int64 packageID int64 storageID int64 + + ResultFullPath string } func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { @@ -97,6 +100,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { return fmt.Errorf("agent loading package: %s", waitResp.Error) } + t.ResultFullPath = waitResp.FullPath break } } diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index d5f692b..a7187d2 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -61,6 +61,7 @@ type WaitStorageLoadPackageResp struct { mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` + FullPath string `json:"fullPath"` } func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage { @@ -69,10 +70,11 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageLoadPackageResp(isComplete bool, err string) *WaitStorageLoadPackageResp { +func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp { return &WaitStorageLoadPackageResp{ IsComplete: isComplete, Error: err, + FullPath: fullPath, } } func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {