Browse Source

增加命令方便之后测试

gitlink
Sydonian 1 year ago
parent
commit
44a7be19b6
16 changed files with 567 additions and 50 deletions
  1. +2
    -2
      agent/internal/mq/storage.go
  2. +3
    -3
      agent/internal/task/storage_load_package.go
  3. +19
    -0
      client/internal/cmdline/commandline.go
  4. +129
    -0
      client/internal/cmdline/getp.go
  5. +89
    -0
      client/internal/cmdline/load.go
  6. +74
    -0
      client/internal/cmdline/lsp.go
  7. +124
    -0
      client/internal/cmdline/put.go
  8. +1
    -1
      client/internal/http/storage.go
  9. +2
    -1
      client/internal/services/package.go
  10. +36
    -21
      client/internal/services/storage.go
  11. +10
    -0
      common/pkgs/db/storage.go
  12. +41
    -18
      common/pkgs/mq/coordinator/storage.go
  13. +4
    -0
      coordinator/internal/mq/package.go
  14. +17
    -2
      coordinator/internal/mq/storage.go
  15. +4
    -0
      go.mod
  16. +12
    -2
      go.sum

+ 2
- 2
agent/internal/mq/storage.go View File

@@ -180,7 +180,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
getStgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())
@@ -188,7 +188,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path))
fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.Directory, msg.Path))

var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {


+ 3
- 3
agent/internal/task/storage_load_package.go View File

@@ -62,12 +62,12 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
}
defer stgglb.IPFSPool.Release(ipfsCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
getStgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("request to coordinator: %w", err)
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID)
outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Storage.Directory, t.userID, t.packageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
return fmt.Errorf("creating output directory: %w", err)
}
@@ -84,7 +84,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
// 保护在storage目录中下载的文件
Storage().Buzy(t.storageID).
// 保护下载文件时同时保存到IPFS的文件
IPFS().Buzy(getStgResp.NodeID).
IPFS().Buzy(getStgResp.Storage.NodeID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)


+ 19
- 0
client/internal/cmdline/commandline.go View File

@@ -1,9 +1,11 @@
package cmdline

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/pkgs/cmdtrie"
"gitlink.org.cn/cloudream/storage/client/internal/services"
)
@@ -12,8 +14,11 @@ type CommandContext struct {
Cmdline *Commandline
}

// TODO 逐步使用cobra代替cmdtrie
var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]()

var rootCmd = cobra.Command{}

type Commandline struct {
Svc *services.Service
}
@@ -30,6 +35,16 @@ func (c *Commandline) DispatchCommand(allArgs []string) {
}
cmdErr, err := commands.Execute(cmdCtx, allArgs, cmdtrie.ExecuteOption{ReplaceEmptyArrayWithNil: true})
if err != nil {
if err == cmdtrie.ErrCommandNotFound {
ctx := context.WithValue(context.Background(), "cmdCtx", &cmdCtx)
err = rootCmd.ExecuteContext(ctx)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
return
}

fmt.Printf("execute command failed, err: %s", err.Error())
os.Exit(1)
}
@@ -43,3 +58,7 @@ func MustAddCmd(fn any, prefixWords ...string) any {
commands.MustAdd(fn, prefixWords...)
return nil
}

func GetCmdCtx(cmd *cobra.Command) *CommandContext {
return cmd.Context().Value("cmdCtx").(*CommandContext)
}

+ 129
- 0
client/internal/cmdline/getp.go View File

@@ -0,0 +1,129 @@
package cmdline

import (
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/inhies/go-bytesize"
"github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

func init() {
var usePkgID bool
cmd := &cobra.Command{
Use: "getp",
Short: "Download whole package by package id or path",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
cmdCtx := GetCmdCtx(cmd)

if usePkgID {
id, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fmt.Printf("Invalid package id: %s\n", args[0])
return
}

getpByID(cmdCtx, cdssdk.PackageID(id), args[1])
} else {
getpByPath(cmdCtx, args[0], args[1])
}
},
}
cmd.Flags().BoolVarP(&usePkgID, "id", "i", false, "Download with package id instead of path")

rootCmd.AddCommand(cmd)
}

func getpByPath(cmdCtx *CommandContext, path string, output string) {
userID := cdssdk.UserID(1)

comps := strings.Split(strings.Trim(path, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator)
if len(comps) != 2 {
fmt.Printf("Package path must be in format of <bucket>/<package>")
return
}

pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1])
if err != nil {
fmt.Println(err)
return
}

getpByID(cmdCtx, pkg.PackageID, output)
}

func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) {
userID := cdssdk.UserID(1)
startTime := time.Now()

objIter, err := cmdCtx.Cmdline.Svc.PackageSvc().DownloadPackage(userID, id)
if err != nil {
fmt.Println(err)
return
}

err = os.MkdirAll(output, os.ModePerm)
if err != nil {
fmt.Printf("Create output directory %s failed, err: %v", output, err)
return
}
defer objIter.Close()

madeDirs := make(map[string]bool)
fileCount := 0
totalSize := int64(0)

for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
fmt.Println(err)
return
}

err = func() error {
defer objInfo.File.Close()
fileCount++
totalSize += objInfo.Object.Size

fullPath := filepath.Join(output, objInfo.Object.Path)

dirPath := filepath.Dir(fullPath)
if !madeDirs[dirPath] {
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}
madeDirs[dirPath] = true
}

outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()

_, err = io.Copy(outputFile, objInfo.File)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}

return nil
}()
if err != nil {
fmt.Println(err)
return
}
}

fmt.Printf("Get %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), output, time.Since(startTime))
}

+ 89
- 0
client/internal/cmdline/load.go View File

@@ -0,0 +1,89 @@
package cmdline

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func init() {
var useID bool
cmd := cobra.Command{
Use: "load",
Short: "Load data from CDS to a storage service",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
cmdCtx := GetCmdCtx(cmd)

if useID {
pkgID, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fmt.Printf("Invalid package ID: %s\n", args[0])
}

stgID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
fmt.Printf("Invalid storage ID: %s\n", args[1])
}

loadByID(cmdCtx, cdssdk.PackageID(pkgID), cdssdk.StorageID(stgID))
} else {
loadByPath(cmdCtx, args[0], args[1])
}
},
}
cmd.Flags().BoolVarP(&useID, "id", "i", false, "Use ID for both package and storage service instead of their name or path")
rootCmd.AddCommand(&cmd)
}

func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string) {
userID := cdssdk.UserID(1)

comps := strings.Split(strings.Trim(pkgPath, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator)
if len(comps) != 2 {
fmt.Printf("Package path must be in format of <bucket>/<package>")
return
}

pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1])
if err != nil {
fmt.Println(err)
return
}

stg, err := cmdCtx.Cmdline.Svc.StorageSvc().GetByName(userID, stgName)
if err != nil {
fmt.Println(err)
return
}

loadByID(cmdCtx, pkg.PackageID, stg.StorageID)
}

func loadByID(cmdCtx *CommandContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID) {
userID := cdssdk.UserID(1)
startTime := time.Now()

nodeID, taskID, err := cmdCtx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(userID, pkgID, stgID)
if err != nil {
fmt.Println(err)
return
}

for {
complete, fullPath, err := cmdCtx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if err != nil {
fmt.Println(err)
return
}

if complete {
fmt.Printf("Package loaded to: %s in %v\n", fullPath, time.Since(startTime))
break
}
}
}

+ 74
- 0
client/internal/cmdline/lsp.go View File

@@ -0,0 +1,74 @@
package cmdline

import (
"fmt"
"strconv"
"strings"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func init() {
var usePkgID *bool
cmd := &cobra.Command{
Use: "lsp",
Short: "List package information",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
cmdCtx := GetCmdCtx(cmd)

if usePkgID != nil && *usePkgID {
id, err := strconv.ParseInt(args[0], 10, 64)
if err != nil {
fmt.Printf("Invalid package id: %s\n", args[0])
return
}

lspOneByID(cmdCtx, cdssdk.PackageID(id))
} else {
lspByPath(cmdCtx, args[0])
}
},
}
usePkgID = cmd.Flags().BoolP("id", "i", false, "List with package id instead of path")

rootCmd.AddCommand(cmd)
}

func lspByPath(cmdCtx *CommandContext, path string) {
userID := cdssdk.UserID(1)

comps := strings.Split(strings.Trim(path, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator)
if len(comps) != 2 {
fmt.Printf("Package path must be in format of <bucket>/<package>")
return
}

pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1])
if err != nil {
fmt.Println(err)
return
}

wr := table.NewWriter()
wr.AppendHeader(table.Row{"ID", "Name", "State"})
wr.AppendRow(table.Row{pkg.PackageID, pkg.Name, pkg.State})
fmt.Println(wr.Render())
}

func lspOneByID(cmdCtx *CommandContext, id cdssdk.PackageID) {
userID := cdssdk.UserID(1)

pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().Get(userID, id)
if err != nil {
fmt.Println(err)
return
}

wr := table.NewWriter()
wr.AppendHeader(table.Row{"ID", "Name", "State"})
wr.AppendRow(table.Row{pkg.PackageID, pkg.Name, pkg.State})
fmt.Println(wr.Render())
}

+ 124
- 0
client/internal/cmdline/put.go View File

@@ -0,0 +1,124 @@
package cmdline

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/inhies/go-bytesize"
"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

func init() {
var nodeID int64
cmd := &cobra.Command{
Use: "put",
Short: "Upload files to CDS",
Args: func(cmd *cobra.Command, args []string) error {
if err := cobra.ExactArgs(2)(cmd, args); err != nil {
return err
}

remote := args[1]
comps := strings.Split(strings.Trim(remote, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator)
if len(comps) != 2 {
return fmt.Errorf("invalid remote path: %s, which must be in format of <bucket>/<package>", remote)
}

return nil
},
Run: func(cmd *cobra.Command, args []string) {
userID := cdssdk.UserID(1)
cmdCtx := GetCmdCtx(cmd)

local := args[0]
remote := args[1]
comps := strings.Split(strings.Trim(remote, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator)

startTime := time.Now()

bkt, err := cmdCtx.Cmdline.Svc.BucketSvc().GetBucketByName(userID, comps[0])
if err != nil {
fmt.Printf("getting bucket: %v\n", err)
return
}

pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1])
if err != nil {
if codeMsg, ok := err.(*mq.CodeMessageError); ok && codeMsg.Code == errorcode.DataNotFound {
pkg2, err := cmdCtx.Cmdline.Svc.PackageSvc().Create(userID, bkt.BucketID, comps[1])
if err != nil {
fmt.Printf("creating package: %v\n", err)
return
}
pkg = &pkg2

} else {
fmt.Printf("getting package: %v\n", err)
return
}
}

var fileCount int
var totalSize int64
var uploadFilePathes []string
err = filepath.WalkDir(local, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
fileCount++

info, err := fi.Info()
if err == nil {
totalSize += info.Size()
}
}

return nil
})
if err != nil {
fmt.Printf("walking directory: %v\n", err)
return
}

var nodeAff *cdssdk.NodeID
if nodeID != 0 {
id := cdssdk.NodeID(nodeID)
nodeAff = &id
}

objIter := iterator.NewUploadingObjectIterator(local, uploadFilePathes)
taskID, err := cmdCtx.Cmdline.Svc.ObjectSvc().StartUploading(userID, pkg.PackageID, objIter, nodeAff)
if err != nil {
fmt.Printf("start uploading objects: %v\n", err)
return
}

for {
complete, _, err := cmdCtx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5)
if err != nil {
fmt.Printf("uploading objects: %v\n", err)
return
}

if complete {
break
}
}

fmt.Printf("Put %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), remote, time.Since(startTime))
},
}
cmd.Flags().Int64VarP(&nodeID, "node", "n", 0, "node affinity")

rootCmd.AddCommand(cmd)
}

+ 1
- 1
client/internal/http/storage.go View File

@@ -111,7 +111,7 @@ func (s *StorageService) GetInfo(ctx *gin.Context) {
return
}

info, err := s.svc.StorageSvc().GetInfo(req.UserID, req.StorageID)
info, err := s.svc.StorageSvc().Get(req.UserID, req.StorageID)
if err != nil {
log.Warnf("getting info: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed"))


+ 2
- 1
client/internal/services/package.go View File

@@ -42,7 +42,8 @@ func (svc *PackageService) GetByName(userID cdssdk.UserID, bucketName string, pa

getResp, err := coorCli.GetPackageByName(coormq.ReqGetPackageByName(userID, bucketName, packageName))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
// TODO 要附加日志信息,但不能直接%w,因为外部需要判断错误吗
return nil, err
}

return &getResp.Package, nil


+ 36
- 21
client/internal/services/storage.go View File

@@ -20,6 +20,36 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc}
}

func (svc *StorageService) Get(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return &getResp.Storage, nil
}

func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model.Storage, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageByName(coormq.ReqGetStorageByName(userID, name))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return &getResp.Storage, nil
}

func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -27,12 +57,12 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID)
agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
@@ -43,7 +73,7 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package
return 0, "", fmt.Errorf("start storage load package: %w", err)
}

return stgResp.NodeID, startResp.TaskID, nil
return stgResp.Storage.NodeID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) {
@@ -84,12 +114,12 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID)
agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
@@ -100,7 +130,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke
return 0, "", fmt.Errorf("start storage upload package: %w", err)
}

return stgResp.NodeID, startResp.TaskID, nil
return stgResp.Storage.NodeID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) {
@@ -127,18 +157,3 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID

return true, waitResp.PackageID, nil
}

func (svc *StorageService) GetInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return &getResp.Storage, nil
}

+ 10
- 0
common/pkgs/db/storage.go View File

@@ -59,6 +59,16 @@ func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storag
return stg, err
}

func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) {
var stg model.Storage
err := sqlx.Get(ctx, &stg,
"select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = Storage.StorageID and Storage.Name = ?",
userID,
name)

return stg, err
}

func (db *StorageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, state string) error {
_, err := ctx.Exec("update Storage set State = ? where StorageID = ?", state, storageID)
return err


+ 41
- 18
common/pkgs/mq/coordinator/storage.go View File

@@ -10,7 +10,9 @@ import (
)

type StorageService interface {
GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage)
GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage)

GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage)

StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage)

@@ -18,37 +20,58 @@ type StorageService interface {
}

// 获取Storage信息
var _ = Register(Service.GetStorageInfo)
var _ = Register(Service.GetStorage)

type GetStorageInfo struct {
type GetStorage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
}
type GetStorageInfoResp struct {
type GetStorageResp struct {
mq.MessageBodyBase
model.Storage
Storage model.Storage `json:"storage"`
}

func NewGetStorageInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorageInfo {
return &GetStorageInfo{
func ReqGetStorage(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorage {
return &GetStorage{
UserID: userID,
StorageID: storageID,
}
}
func NewGetStorageInfoResp(storageID cdssdk.StorageID, name string, nodeID cdssdk.NodeID, dir string, state string) *GetStorageInfoResp {
return &GetStorageInfoResp{
Storage: model.Storage{
StorageID: storageID,
Name: name,
NodeID: nodeID,
Directory: dir,
State: state,
},
func RespGetStorage(stg model.Storage) *GetStorageResp {
return &GetStorageResp{
Storage: stg,
}
}
func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) {
return mq.Request(Service.GetStorage, client.rabbitCli, msg)
}

var _ = Register(Service.GetStorageByName)

type GetStorageByName struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
}
type GetStorageByNameResp struct {
mq.MessageBodyBase
Storage model.Storage `json:"storage"`
}

func ReqGetStorageByName(userID cdssdk.UserID, name string) *GetStorageByName {
return &GetStorageByName{
UserID: userID,
Name: name,
}
}
func RespGetStorageByNameResp(storage model.Storage) *GetStorageByNameResp {
return &GetStorageByNameResp{
Storage: storage,
}
}
func (client *Client) GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, error) {
return mq.Request(Service.GetStorageInfo, client.rabbitCli, msg)
func (client *Client) GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, error) {
return mq.Request(Service.GetStorageByName, client.rabbitCli, msg)
}

// 提交调度记录


+ 4
- 0
coordinator/internal/mq/package.go View File

@@ -34,6 +34,10 @@ func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetP
WithField("PackageName", msg.PackageName).
Warnf("get package by name: %s", err.Error())

if err == sql.ErrNoRows {
return nil, mq.Failed(errorcode.DataNotFound, "package not found")
}

return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed")
}



+ 17
- 2
coordinator/internal/mq/storage.go View File

@@ -13,14 +13,29 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) {
func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) {
stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID)
if err != nil {
logger.Warnf("getting user storage: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
}

return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State))
return mq.ReplyOK(coormq.RespGetStorage(stg))
}

func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) {
stg, err := svc.db.Storage().GetUserStorageByName(svc.db.SQLCtx(), msg.UserID, msg.Name)
if err != nil {
logger.Warnf("getting user storage by name: %s", err.Error())

if err == sql.ErrNoRows {
return nil, mq.Failed(errorcode.DataNotFound, "storage not found")
}

return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
}

return mq.ReplyOK(coormq.RespGetStorageByNameResp(stg))
}

func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {


+ 4
- 0
go.mod View File

@@ -9,12 +9,14 @@ require (
github.com/go-sql-driver/mysql v1.7.1
github.com/google/uuid v1.3.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/reedsolomon v1.11.8
github.com/magefile/mage v1.15.0
github.com/samber/lo v1.38.1
github.com/smartystreets/goconvey v1.8.1
github.com/spf13/cobra v1.8.0
gitlink.org.cn/cloudream/common v0.0.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
@@ -43,6 +45,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/boxo v0.12.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-ipfs-api v0.7.0 // indirect
@@ -74,6 +77,7 @@ require (
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect


+ 12
- 2
go.sum View File

@@ -15,6 +15,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -51,8 +52,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -64,6 +65,10 @@ github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvH
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ=
github.com/ipfs/boxo v0.12.0/go.mod h1:xAnfiU6PtxWCnRqu7dcXQ10bB5/kvI1kXRotuGqGBhg=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
@@ -141,6 +146,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
@@ -151,6 +157,10 @@ github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sS
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=


Loading…
Cancel
Save