Browse Source

Merge pull request 'Load之后返回文件路径;调整Load操作的产生的目录的结构' (#8) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
aff4d55a88
24 changed files with 347 additions and 97 deletions
  1. +0
    -1
      agent/internal/config/config.go
  2. +13
    -12
      agent/internal/services/mq/storage.go
  3. +1
    -1
      agent/internal/task/cache_move_package.go
  4. +0
    -27
      agent/internal/task/download_package.go
  5. +29
    -0
      agent/internal/task/storage_load_package.go
  6. +2
    -1
      client/internal/cmdline/storage.go
  7. +27
    -0
      client/internal/http/cacah.go
  8. +29
    -0
      client/internal/http/package.go
  9. +2
    -0
      client/internal/http/server.go
  10. +45
    -2
      client/internal/http/storage.go
  11. +16
    -0
      client/internal/services/cacah.go
  12. +23
    -1
      client/internal/services/object.go
  13. +15
    -0
      client/internal/services/package.go
  14. +20
    -3
      client/internal/services/storage.go
  15. +4
    -0
      client/internal/task/storage_load_package.go
  16. +0
    -1
      common/assets/confs/agent.config.json
  17. +4
    -13
      common/pkgs/db/model/model.go
  18. +22
    -0
      common/pkgs/db/object_rep.go
  19. +3
    -1
      common/pkgs/mq/agent/storage.go
  20. +31
    -0
      common/pkgs/mq/coordinator/cache.go
  21. +31
    -0
      common/pkgs/mq/coordinator/object.go
  22. +0
    -30
      common/pkgs/mq/coordinator/package.go
  23. +5
    -4
      common/utils/utils.go
  24. +25
    -0
      coordinator/internal/services/object.go

+ 0
- 1
agent/internal/config/config.go View File

@@ -15,7 +15,6 @@ type Config struct {
Local stgmodels.LocalMachineInfo `json:"local"` Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"` GRPC *grpc.Config `json:"grpc"`
ECPacketSize int64 `json:"ecPacketSize"` ECPacketSize int64 `json:"ecPacketSize"`
StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"` Logger log.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"` RabbitMQ stgmq.Config `json:"rabbitMQ"`


+ 13
- 12
agent/internal/services/mq/storage.go View File

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -37,7 +36,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage)
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }


outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID))
outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil { if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID). logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error()) Warnf("creating output directory: %s", err.Error())
@@ -45,7 +44,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage)
return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
} }


tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))
tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
} }


@@ -65,7 +64,9 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))


} else { } else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -75,17 +76,17 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
} }


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))
} }
} }


func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)

infos, err := os.ReadDir(dirFullPath)
infos, err := os.ReadDir(msg.Directory)
if err != nil { if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error()) logger.Warnf("list storage directory failed, err: %s", err.Error())
return mq.ReplyOK(agtmq.NewStorageCheckResp( return mq.ReplyOK(agtmq.NewStorageCheckResp(
@@ -111,7 +112,7 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs


var entries []agtmq.StorageCheckRespEntry var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages { for _, obj := range msg.Packages {
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID)
_, ok := infosMap[dirName] _, ok := infosMap[dirName]


if ok { if ok {
@@ -139,7 +140,7 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.


var entries []agtmq.StorageCheckRespEntry var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages { for _, obj := range msg.Packages {
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID)
_, ok := infosMap[dirName] _, ok := infosMap[dirName]


if ok { if ok {
@@ -173,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }


fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path))
fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path))


var uploadFilePathes []string var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {


+ 1
- 1
agent/internal/task/cache_move_package.go View File

@@ -96,7 +96,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg
} }


fileHashes = append(fileHashes, rep.FileHash) fileHashes = append(fileHashes, rep.FileHash)
t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash))
t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object, rep.FileHash))
} }


_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes)) _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes))


+ 0
- 27
agent/internal/task/download_package.go View File

@@ -1,27 +0,0 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
)

type DownloadPackage struct {
cmd *cmd.DownloadPackage
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
return &DownloadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
}
}
func (t *DownloadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 29
- 0
agent/internal/task/storage_load_package.go View File

@@ -0,0 +1,29 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
)

type StorageLoadPackage struct {
cmd *cmd.DownloadPackage
FullPath string
}

func NewStorageLoadPackage(userID int64, packageID int64, outputPath string) *StorageLoadPackage {
return &StorageLoadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
FullPath: outputPath,
}
}
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 2
- 1
client/internal/cmdline/storage.go View File

@@ -14,12 +14,13 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er
} }


for { for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("moving complete with: %w", err) return fmt.Errorf("moving complete with: %w", err)
} }


fmt.Printf("Load To: %s\n", fullPath)
return nil return nil
} }




+ 27
- 0
client/internal/http/cacah.go View File

@@ -68,3 +68,30 @@ func (s *CacheService) MovePackage(ctx *gin.Context) {
} }
} }
} }

type CacheGetPackageObjectCacheInfosReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}

type CacheGetPackageObjectCacheInfosResp = stgsdk.CacheGetPackageObjectCacheInfosResp

func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos")

var req CacheGetPackageObjectCacheInfosReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

infos, err := s.svc.CacheSvc().GetPackageObjectCacheInfos(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("getting package object cache infos: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object cache infos failed"))
return
}

ctx.JSON(http.StatusOK, OK(CacheGetPackageObjectCacheInfosResp{Infos: infos}))
}

+ 29
- 0
client/internal/http/package.go View File

@@ -11,6 +11,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"


"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )


@@ -24,6 +25,34 @@ func (s *Server) PackageSvc() *PackageService {
} }
} }


type PackageGetReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}
type PackageGetResp struct {
model.Package
}

func (s *PackageService) Get(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Get")

var req PackageGetReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("getting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg}))
}

type PackageUploadReq struct { type PackageUploadReq struct {
Info PackageUploadInfo `form:"info" binding:"required"` Info PackageUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"` Files []*multipart.FileHeader `form:"files"`


+ 2
- 0
client/internal/http/server.go View File

@@ -40,6 +40,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() { func (s *Server) initRouters() {
s.engine.GET("/object/download", s.ObjectSvc().Download) s.engine.GET("/object/download", s.ObjectSvc().Download)


s.engine.GET("/package/get", s.PackageSvc().Get)
s.engine.POST("/package/upload", s.PackageSvc().Upload) s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete) s.engine.POST("/package/delete", s.PackageSvc().Delete)
s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes)
@@ -49,4 +50,5 @@ func (s *Server) initRouters() {
s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage)


s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage) s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage)
s.engine.GET("/cache/getPackageObjectCacheInfos", s.CacheSvc().GetPackageObjectCacheInfos)
} }

+ 45
- 2
client/internal/http/storage.go View File

@@ -26,6 +26,10 @@ type StorageLoadPackageReq struct {
StorageID *int64 `json:"storageID" binding:"required"` StorageID *int64 `json:"storageID" binding:"required"`
} }


type StorageLoadPackageResp struct {
stgsdk.StorageLoadPackageResp
}

func (s *StorageService) LoadPackage(ctx *gin.Context) { func (s *StorageService) LoadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.LoadPackage") log := logger.WithField("HTTP", "Storage.LoadPackage")


@@ -44,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
} }


for { for {
complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
log.Warnf("loading complete with: %s", err.Error()) log.Warnf("loading complete with: %s", err.Error())
@@ -52,7 +56,11 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
return return
} }


ctx.JSON(http.StatusOK, OK(nil))
ctx.JSON(http.StatusOK, OK(StorageLoadPackageResp{
StorageLoadPackageResp: stgsdk.StorageLoadPackageResp{
FullPath: fullPath,
},
}))
return return
} }


@@ -118,3 +126,38 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) {
} }
} }
} }

type StorageGetInfoReq struct {
UserID *int64 `json:"userID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
}

type StorageGetInfoResp struct {
stgsdk.StorageGetInfoResp
}

func (s *StorageService) GetInfo(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.GetInfo")

var req StorageGetInfoReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

info, err := s.svc.StorageSvc().GetInfo(*req.UserID, *req.StorageID)
if err != nil {
log.Warnf("getting info: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed"))
return
}

ctx.JSON(http.StatusOK, OK(StorageGetInfoResp{
StorageGetInfoResp: stgsdk.StorageGetInfoResp{
Name: info.Name,
NodeID: info.NodeID,
Directory: info.Directory,
},
}))
}

+ 16
- 0
client/internal/services/cacah.go View File

@@ -8,6 +8,7 @@ import (


stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
) )


type CacheService struct { type CacheService struct {
@@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT


return true, waitResp.CacheInfos, nil return true, waitResp.CacheInfos, nil
} }

func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackageObjectCacheInfos(coormq.NewGetPackageObjectCacheInfos(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requesting to coodinator: %w", err)
}

return getResp.Infos, nil
}

+ 23
- 1
client/internal/services/object.go View File

@@ -1,6 +1,13 @@
package services package services


import "io"
import (
"fmt"
"io"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)


type ObjectService struct { type ObjectService struct {
*Service *Service
@@ -13,3 +20,18 @@ func (svc *Service) ObjectSvc() *ObjectService {
func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) { func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) {
panic("not implement yet!") panic("not implement yet!")
} }

func (svc *ObjectService) GetPackageObjects(userID int64, packageID int64) ([]model.Object, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp.Objects, nil
}

+ 15
- 0
client/internal/services/package.go View File

@@ -23,6 +23,21 @@ func (svc *Service) PackageSvc() *PackageService {
return &PackageService{Service: svc} return &PackageService{Service: svc}
} }


func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return &getResp.Package, nil
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {


+ 20
- 3
client/internal/services/storage.go View File

@@ -8,6 +8,7 @@ import (


"gitlink.org.cn/cloudream/storage/client/internal/task" "gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
) )
@@ -25,12 +26,13 @@ func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64
return tsk.ID(), nil return tsk.ID(), nil
} }


func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) {
func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) {
tsk := svc.TaskMgr.FindByID(taskID) tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) { if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
loadTsk := tsk.Body().(*task.StorageLoadPackage)
return true, loadTsk.ResultFullPath, tsk.Error()
} }
return false, nil
return false, "", nil
} }


func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {
@@ -89,3 +91,18 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string,


return true, waitResp.PackageID, nil return true, waitResp.PackageID, nil
} }

func (svc *StorageService) GetInfo(userID int64, storageID int64) (*model.Storage, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return &getResp.Storage, nil
}

+ 4
- 0
client/internal/task/storage_load_package.go View File

@@ -11,10 +11,13 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
) )


// TODO 可以考虑不用Task来实现这些逻辑
type StorageLoadPackage struct { type StorageLoadPackage struct {
userID int64 userID int64
packageID int64 packageID int64
storageID int64 storageID int64

ResultFullPath string
} }


func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
@@ -97,6 +100,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error {
return fmt.Errorf("agent loading package: %s", waitResp.Error) return fmt.Errorf("agent loading package: %s", waitResp.Error)
} }


t.ResultFullPath = waitResp.FullPath
break break
} }
} }


+ 0
- 1
common/assets/confs/agent.config.json View File

@@ -10,7 +10,6 @@
"port": 5010 "port": 5010
}, },
"ecPacketSize": 10, "ecPacketSize": 10,
"storageBaseDir": ".",
"tempFileLifetime": 3600, "tempFileLifetime": 3600,
"logger": { "logger": {
"output": "file", "output": "file",


+ 4
- 13
common/pkgs/db/model/model.go View File

@@ -6,6 +6,8 @@ import (
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
) )


// TODO 可以考虑逐步迁移到stgsdk中。迁移思路:数据对象应该包含的字段都迁移到stgsdk中,内部使用的一些特殊字段则留在这里

type Node struct { type Node struct {
NodeID int64 `db:"NodeID" json:"nodeID"` NodeID int64 `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"` Name string `db:"Name" json:"name"`
@@ -56,20 +58,9 @@ type Bucket struct {
CreatorID int64 `db:"CreatorID" json:"creatorID"` CreatorID int64 `db:"CreatorID" json:"creatorID"`
} }


type Package struct {
PackageID int64 `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`
BucketID int64 `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
Redundancy stgsdk.TypedRedundancyInfo `db:"Redundancy" json:"redundancy"`
}
type Package = stgsdk.Package


type Object struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
PackageID int64 `db:"PackageID" json:"packageID"`
Path string `db:"Path" json:"path"`
Size int64 `db:"Size" json:"size,string"`
}
type Object = stgsdk.Object


type ObjectRep struct { type ObjectRep struct {
ObjectID int64 `db:"ObjectID" json:"objectID"` ObjectID int64 `db:"ObjectID" json:"objectID"`


+ 22
- 0
common/pkgs/db/object_rep.go View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"


"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/consts"
stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -116,6 +117,27 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) (
return rets, nil return rets, nil
} }


func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
var tmpRet []struct {
stgsdk.Object
FileHash string `db:"FileHash"`
}

err := sqlx.Select(ctx, &tmpRet, "select Object.*, ObjectRep.FileHash from Object"+
" left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+
" where Object.PackageID = ? order by Object.ObjectID asc", packageID)
if err != nil {
return nil, err
}

ret := make([]stgsdk.ObjectCacheInfo, len(tmpRet))
for i, r := range tmpRet {
ret[i] = stgsdk.NewObjectCacheInfo(r.Object, r.FileHash)
}

return ret, nil
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitIDStringUnsafe(idStr string) []int64 { func splitIDStringUnsafe(idStr string) []int64 {


+ 3
- 1
common/pkgs/mq/agent/storage.go View File

@@ -61,6 +61,7 @@ type WaitStorageLoadPackageResp struct {
mq.MessageBodyBase mq.MessageBodyBase
IsComplete bool `json:"isComplete"` IsComplete bool `json:"isComplete"`
Error string `json:"error"` Error string `json:"error"`
FullPath string `json:"fullPath"`
} }


func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage { func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage {
@@ -69,10 +70,11 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL
WaitTimeoutMs: waitTimeoutMs, WaitTimeoutMs: waitTimeoutMs,
} }
} }
func NewWaitStorageLoadPackageResp(isComplete bool, err string) *WaitStorageLoadPackageResp {
func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp {
return &WaitStorageLoadPackageResp{ return &WaitStorageLoadPackageResp{
IsComplete: isComplete, IsComplete: isComplete,
Error: err, Error: err,
FullPath: fullPath,
} }
} }
func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {


+ 31
- 0
common/pkgs/mq/coordinator/cache.go View File

@@ -2,10 +2,13 @@ package coordinator


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
) )


type CacheService interface { type CacheService interface {
CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage)

GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, *mq.CodeMessage)
} }


// Package的Object移动到了节点的Cache中 // Package的Object移动到了节点的Cache中
@@ -34,3 +37,31 @@ func NewCachePackageMovedResp() *CachePackageMovedResp {
func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) {
return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg)
} }

// 获取Package中所有Object的FileHash
var _ = Register(Service.GetPackageObjectCacheInfos)

type GetPackageObjectCacheInfos struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectCacheInfosResp struct {
mq.MessageBodyBase
Infos []stgsdk.ObjectCacheInfo
}

func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObjectCacheInfos {
return &GetPackageObjectCacheInfos{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectCacheInfosResp(infos []stgsdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp {
return &GetPackageObjectCacheInfosResp{
Infos: infos,
}
}
func (client *Client) GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, error) {
return mq.Request(Service.GetPackageObjectCacheInfos, client.rabbitCli, msg)
}

+ 31
- 0
common/pkgs/mq/coordinator/object.go View File

@@ -4,14 +4,45 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"


stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
) )


type ObjectService interface { type ObjectService interface {
GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, *mq.CodeMessage) GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, *mq.CodeMessage)


GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage) GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage)
} }


// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjects)

type GetPackageObjects struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectsResp struct {
mq.MessageBodyBase
Objects []model.Object `json:"objects"`
}

func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects {
return &GetPackageObjects{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp {
return &GetPackageObjectsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) {
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序 // 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjectRepData) var _ = Register(Service.GetPackageObjectRepData)




+ 0
- 30
common/pkgs/mq/coordinator/package.go View File

@@ -10,8 +10,6 @@ import (
type PackageService interface { type PackageService interface {
GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage) GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage)


GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage)


UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, *mq.CodeMessage) UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, *mq.CodeMessage)
@@ -53,34 +51,6 @@ func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) {
return mq.Request(Service.GetPackage, client.rabbitCli, msg) return mq.Request(Service.GetPackage, client.rabbitCli, msg)
} }


// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjects)

type GetPackageObjects struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectsResp struct {
mq.MessageBodyBase
Objects []model.Object `json:"objects"`
}

func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects {
return &GetPackageObjects{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp {
return &GetPackageObjectsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) {
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 创建一个Package // 创建一个Package
var _ = Register(Service.CreatePackage) var _ = Register(Service.CreatePackage)




+ 5
- 4
common/utils/utils.go View File

@@ -1,10 +1,11 @@
package utils package utils


import ( import (
"fmt"
"path/filepath"
"strconv"
) )


// MakeStorageLoadPackageDirName Load操作时,写入的文件夹的名称
func MakeStorageLoadPackageDirName(packageID int64, userID int64) string {
return fmt.Sprintf("%d-%d", packageID, userID)
// MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称
func MakeStorageLoadPackagePath(stgDir string, userID int64, packageID int64) string {
return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "packages", strconv.FormatInt(packageID, 10))
} }

+ 25
- 0
coordinator/internal/services/object.go View File

@@ -7,6 +7,31 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
) )


func (svc *Service) GetPackageObjectCacheInfos(msg *coormq.GetPackageObjectCacheInfos) (*coormq.GetPackageObjectCacheInfosResp, *mq.CodeMessage) {
pkg, err := svc.db.Package().GetUserPackage(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

if pkg.Redundancy.IsRepInfo() {
infos, err := svc.db.ObjectRep().GetPackageObjectCacheInfos(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting rep package object cache infos: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get rep package object cache infos failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectCacheInfosResp(infos))
}
// TODO EC

return nil, mq.Failed(errorcode.OperationFailed, "not implement yet")
}

func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) {
data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil { if err != nil {


Loading…
Cancel
Save