Browse Source

Load之后返回文件路径

gitlink
Sydonian 2 years ago
parent
commit
f19f274a7d
8 changed files with 111 additions and 38 deletions
  1. +8
    -4
      agent/internal/services/mq/storage.go
  2. +0
    -27
      agent/internal/task/download_package.go
  3. +29
    -0
      agent/internal/task/storage_load_package.go
  4. +2
    -1
      client/internal/cmdline/storage.go
  5. +45
    -2
      client/internal/http/storage.go
  6. +20
    -3
      client/internal/services/storage.go
  7. +4
    -0
      client/internal/task/storage_load_package.go
  8. +3
    -1
      common/pkgs/mq/agent/storage.go

+ 8
- 4
agent/internal/services/mq/storage.go View File

@@ -45,7 +45,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage)
return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
}

tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))
tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
}

@@ -65,7 +65,9 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -75,10 +77,12 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
}

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))
}
}



+ 0
- 27
agent/internal/task/download_package.go View File

@@ -1,27 +0,0 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
)

type DownloadPackage struct {
cmd *cmd.DownloadPackage
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
return &DownloadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
}
}
func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 29
- 0
agent/internal/task/storage_load_package.go View File

@@ -0,0 +1,29 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
)

type StorageLoadPackage struct {
cmd *cmd.DownloadPackage
FullPath string
}

func NewStorageLoadPackage(userID int64, packageID int64, outputPath string) *StorageLoadPackage {
return &StorageLoadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
FullPath: outputPath,
}
}
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 2
- 1
client/internal/cmdline/storage.go View File

@@ -14,12 +14,13 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er
}

for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}

fmt.Printf("Load To: %s\n", fullPath)
return nil
}



+ 45
- 2
client/internal/http/storage.go View File

@@ -26,6 +26,10 @@ type StorageLoadPackageReq struct {
StorageID *int64 `json:"storageID" binding:"required"`
}

type StorageLoadPackageResp struct {
stgsdk.StorageLoadPackageResp
}

func (s *StorageService) LoadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.LoadPackage")

@@ -44,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}

for {
complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("loading complete with: %s", err.Error())
@@ -52,7 +56,11 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, OK(nil))
ctx.JSON(http.StatusOK, OK(StorageLoadPackageResp{
StorageLoadPackageResp: stgsdk.StorageLoadPackageResp{
FullPath: fullPath,
},
}))
return
}

@@ -118,3 +126,38 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) {
}
}
}

type StorageGetInfoReq struct {
UserID *int64 `json:"userID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
}

type StorageGetInfoResp struct {
stgsdk.StorageGetInfoResp
}

func (s *StorageService) GetInfo(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.GetInfo")

var req StorageGetInfoReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

info, err := s.svc.StorageSvc().GetInfo(*req.UserID, *req.StorageID)
if err != nil {
log.Warnf("getting info: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed"))
return
}

ctx.JSON(http.StatusOK, OK(StorageGetInfoResp{
StorageGetInfoResp: stgsdk.StorageGetInfoResp{
Name: info.Name,
NodeID: info.NodeID,
Directory: info.Directory,
},
}))
}

+ 20
- 3
client/internal/services/storage.go View File

@@ -8,6 +8,7 @@ import (

"gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@@ -25,12 +26,13 @@ func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64
return tsk.ID(), nil
}

func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) {
func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
loadTsk := tsk.Body().(*task.StorageLoadPackage)
return true, loadTsk.ResultFullPath, tsk.Error()
}
return false, nil
return false, "", nil
}

func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {
@@ -89,3 +91,18 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string,

return true, waitResp.PackageID, nil
}

func (svc *StorageService) GetInfo(userID int64, storageID int64) (*model.Storage, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return &getResp.Storage, nil
}

+ 4
- 0
client/internal/task/storage_load_package.go View File

@@ -11,10 +11,13 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

// TODO 可以考虑不用Task来实现这些逻辑
type StorageLoadPackage struct {
userID int64
packageID int64
storageID int64

ResultFullPath string
}

func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
@@ -97,6 +100,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error {
return fmt.Errorf("agent loading package: %s", waitResp.Error)
}

t.ResultFullPath = waitResp.FullPath
break
}
}


+ 3
- 1
common/pkgs/mq/agent/storage.go View File

@@ -61,6 +61,7 @@ type WaitStorageLoadPackageResp struct {
mq.MessageBodyBase
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
FullPath string `json:"fullPath"`
}

func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage {
@@ -69,10 +70,11 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitStorageLoadPackageResp(isComplete bool, err string) *WaitStorageLoadPackageResp {
func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp {
return &WaitStorageLoadPackageResp{
IsComplete: isComplete,
Error: err,
FullPath: fullPath,
}
}
func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {


Loading…
Cancel
Save