Browse Source

移动缓存的函数返回移动后的FileHash

gitlink
Sydonian 2 years ago
parent
commit
97bd7717b6
6 changed files with 31 additions and 15 deletions
  1. +5
    -3
      agent/internal/services/mq/cache.go
  2. +4
    -0
      agent/internal/task/cache_move_package.go
  3. +1
    -1
      client/internal/cmdline/cache.go
  4. +8
    -2
      client/internal/http/cacah.go
  5. +7
    -6
      client/internal/services/cacah.go
  6. +6
    -3
      common/pkgs/mq/agent/cache.go

+ 5
- 3
agent/internal/services/mq/cache.go View File

@@ -127,6 +127,8 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm
return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
}

mvPkgTask := tsk.Body().(*mytask.CacheMovePackage)

if msg.WaitTimeoutMs == 0 {
tsk.Wait()

@@ -135,7 +137,7 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -145,9 +147,9 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, nil))
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "", mvPkgTask.ResultCacheInfos))
}
}

+ 4
- 0
agent/internal/task/cache_move_package.go View File

@@ -4,6 +4,7 @@ import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/globals"
@@ -15,6 +16,8 @@ import (
type CacheMovePackage struct {
userID int64
packageID int64

ResultCacheInfos []models.ObjectCacheInfo
}

func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage {
@@ -92,6 +95,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient,
}

fileHashes = append(fileHashes, rep.FileHash)
t.ResultCacheInfos = append(t.ResultCacheInfos, models.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash))
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes))


+ 1
- 1
client/internal/cmdline/cache.go View File

@@ -12,7 +12,7 @@ func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error {
}

for {
complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10)
complete, _, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)


+ 8
- 2
client/internal/http/cacah.go View File

@@ -6,6 +6,7 @@ import (

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
)

@@ -24,6 +25,9 @@ type CacheMovePackageReq struct {
PackageID *int64 `json:"packageID" binding:"required"`
NodeID *int64 `json:"nodeID" binding:"required"`
}
type CacheMovePackageResp struct {
CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"`
}

func (s *CacheService) MovePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.LoadPackage")
@@ -43,7 +47,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) {
}

for {
complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10)
complete, cacheInfos, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("moving complete with: %s", err.Error())
@@ -51,7 +55,9 @@ func (s *CacheService) MovePackage(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, OK(nil))
ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{
CacheInfos: cacheInfos,
}))
return
}



+ 7
- 6
client/internal/services/cacah.go View File

@@ -4,6 +4,7 @@ import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage/common/globals"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
)
@@ -31,25 +32,25 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no
return startResp.TaskID, nil
}

func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, error) {
func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []models.ObjectCacheInfo, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
return true, fmt.Errorf("new agent client: %w", err)
return true, nil, fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
return true, fmt.Errorf("wait cache move package: %w", err)
return true, nil, fmt.Errorf("wait cache move package: %w", err)
}

if !waitResp.IsComplete {
return false, nil
return false, nil, nil
}

if waitResp.Error != "" {
return true, fmt.Errorf("%s", waitResp.Error)
return true, nil, fmt.Errorf("%s", waitResp.Error)
}

return true, nil
return true, waitResp.CacheInfos, nil
}

+ 6
- 3
common/pkgs/mq/agent/cache.go View File

@@ -1,6 +1,7 @@
package agent

import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)
@@ -93,8 +94,9 @@ type WaitCacheMovePackage struct {
}
type WaitCacheMovePackageResp struct {
mq.MessageBodyBase
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"`
}

func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage {
@@ -103,10 +105,11 @@ func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMoveP
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp {
func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []models.ObjectCacheInfo) *WaitCacheMovePackageResp {
return &WaitCacheMovePackageResp{
IsComplete: isComplete,
Error: err,
CacheInfos: cacheInfos,
}
}
func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) {


Loading…
Cancel
Save