From 44a7be19b629c2b970407b482bce5c274b57d773 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sat, 11 May 2024 10:48:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=91=BD=E4=BB=A4=E6=96=B9?= =?UTF-8?q?=E4=BE=BF=E4=B9=8B=E5=90=8E=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/storage.go | 4 +- agent/internal/task/storage_load_package.go | 6 +- client/internal/cmdline/commandline.go | 19 +++ client/internal/cmdline/getp.go | 129 ++++++++++++++++++++ client/internal/cmdline/load.go | 89 ++++++++++++++ client/internal/cmdline/lsp.go | 74 +++++++++++ client/internal/cmdline/put.go | 124 +++++++++++++++++++ client/internal/http/storage.go | 2 +- client/internal/services/package.go | 3 +- client/internal/services/storage.go | 57 +++++---- common/pkgs/db/storage.go | 10 ++ common/pkgs/mq/coordinator/storage.go | 59 ++++++--- coordinator/internal/mq/package.go | 4 + coordinator/internal/mq/storage.go | 19 ++- go.mod | 4 + go.sum | 14 ++- 16 files changed, 567 insertions(+), 50 deletions(-) create mode 100644 client/internal/cmdline/getp.go create mode 100644 client/internal/cmdline/load.go create mode 100644 client/internal/cmdline/lsp.go create mode 100644 client/internal/cmdline/put.go diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index bdc006e..37780e2 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -180,7 +180,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka } defer stgglb.CoordinatorMQPool.Release(coorCli) - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + getStgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -188,7 +188,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path)) + fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.Directory, msg.Path)) var uploadFilePathes []string err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index fd12241..93c8374 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -62,12 +62,12 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e } defer stgglb.IPFSPool.Release(ipfsCli) - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + getStgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(t.userID, t.storageID)) if err != nil { return fmt.Errorf("request to coordinator: %w", err) } - outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID) + outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Storage.Directory, t.userID, t.packageID) if err = os.MkdirAll(outputDirPath, 0755); err != nil { return fmt.Errorf("creating output directory: %w", err) } @@ -84,7 +84,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e // 保护在storage目录中下载的文件 Storage().Buzy(t.storageID). // 保护下载文件时同时保存到IPFS的文件 - IPFS().Buzy(getStgResp.NodeID). + IPFS().Buzy(getStgResp.Storage.NodeID). MutexLock(ctx.distlock) if err != nil { return fmt.Errorf("acquire locks failed, err: %w", err) diff --git a/client/internal/cmdline/commandline.go b/client/internal/cmdline/commandline.go index 2b71d93..eea72cd 100644 --- a/client/internal/cmdline/commandline.go +++ b/client/internal/cmdline/commandline.go @@ -1,9 +1,11 @@ package cmdline import ( + "context" "fmt" "os" + "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/cmdtrie" "gitlink.org.cn/cloudream/storage/client/internal/services" ) @@ -12,8 +14,11 @@ type CommandContext struct { Cmdline *Commandline } +// TODO 逐步使用cobra代替cmdtrie var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() +var rootCmd = cobra.Command{} + type Commandline struct { Svc *services.Service } @@ -30,6 +35,16 @@ func (c *Commandline) DispatchCommand(allArgs []string) { } cmdErr, err := commands.Execute(cmdCtx, allArgs, cmdtrie.ExecuteOption{ReplaceEmptyArrayWithNil: true}) if err != nil { + if err == cmdtrie.ErrCommandNotFound { + ctx := context.WithValue(context.Background(), "cmdCtx", &cmdCtx) + err = rootCmd.ExecuteContext(ctx) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + return + } + fmt.Printf("execute command failed, err: %s", err.Error()) os.Exit(1) } @@ -43,3 +58,7 @@ func MustAddCmd(fn any, prefixWords ...string) any { commands.MustAdd(fn, prefixWords...) return nil } + +func GetCmdCtx(cmd *cobra.Command) *CommandContext { + return cmd.Context().Value("cmdCtx").(*CommandContext) +} diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go new file mode 100644 index 0000000..d98f26d --- /dev/null +++ b/client/internal/cmdline/getp.go @@ -0,0 +1,129 @@ +package cmdline + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/inhies/go-bytesize" + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +func init() { + var usePkgID bool + cmd := &cobra.Command{ + Use: "getp", + Short: "Download whole package by package id or path", + Args: cobra.ExactArgs(2), + Run: func(cmd *cobra.Command, args []string) { + cmdCtx := GetCmdCtx(cmd) + + if usePkgID { + id, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + fmt.Printf("Invalid package id: %s\n", args[0]) + return + } + + getpByID(cmdCtx, cdssdk.PackageID(id), args[1]) + } else { + getpByPath(cmdCtx, args[0], args[1]) + } + }, + } + cmd.Flags().BoolVarP(&usePkgID, "id", "i", false, "Download with package id instead of path") + + rootCmd.AddCommand(cmd) +} + +func getpByPath(cmdCtx *CommandContext, path string, output string) { + userID := cdssdk.UserID(1) + + comps := strings.Split(strings.Trim(path, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + if len(comps) != 2 { + fmt.Printf("Package path must be in format of /") + return + } + + pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1]) + if err != nil { + fmt.Println(err) + return + } + + getpByID(cmdCtx, pkg.PackageID, output) +} + +func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { + userID := cdssdk.UserID(1) + startTime := time.Now() + + objIter, err := cmdCtx.Cmdline.Svc.PackageSvc().DownloadPackage(userID, id) + if err != nil { + fmt.Println(err) + return + } + + err = os.MkdirAll(output, os.ModePerm) + if err != nil { + fmt.Printf("Create output directory %s failed, err: %v", output, err) + return + } + defer objIter.Close() + + madeDirs := make(map[string]bool) + fileCount := 0 + totalSize := int64(0) + + for { + objInfo, err := objIter.MoveNext() + if err == iterator.ErrNoMoreItem { + break + } + if err != nil { + fmt.Println(err) + return + } + + err = func() error { + defer objInfo.File.Close() + fileCount++ + totalSize += objInfo.Object.Size + + fullPath := filepath.Join(output, objInfo.Object.Path) + + dirPath := filepath.Dir(fullPath) + if !madeDirs[dirPath] { + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + madeDirs[dirPath] = true + } + + outputFile, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + _, err = io.Copy(outputFile, objInfo.File) + if err != nil { + return fmt.Errorf("copy object data to local file failed, err: %w", err) + } + + return nil + }() + if err != nil { + fmt.Println(err) + return + } + } + + fmt.Printf("Get %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), output, time.Since(startTime)) +} diff --git a/client/internal/cmdline/load.go b/client/internal/cmdline/load.go new file mode 100644 index 0000000..c4821b6 --- /dev/null +++ b/client/internal/cmdline/load.go @@ -0,0 +1,89 @@ +package cmdline + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func init() { + var useID bool + cmd := cobra.Command{ + Use: "load", + Short: "Load data from CDS to a storage service", + Args: cobra.ExactArgs(2), + Run: func(cmd *cobra.Command, args []string) { + cmdCtx := GetCmdCtx(cmd) + + if useID { + pkgID, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + fmt.Printf("Invalid package ID: %s\n", args[0]) + } + + stgID, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + fmt.Printf("Invalid storage ID: %s\n", args[1]) + } + + loadByID(cmdCtx, cdssdk.PackageID(pkgID), cdssdk.StorageID(stgID)) + } else { + loadByPath(cmdCtx, args[0], args[1]) + } + }, + } + cmd.Flags().BoolVarP(&useID, "id", "i", false, "Use ID for both package and storage service instead of their name or path") + rootCmd.AddCommand(&cmd) +} + +func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string) { + userID := cdssdk.UserID(1) + + comps := strings.Split(strings.Trim(pkgPath, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + if len(comps) != 2 { + fmt.Printf("Package path must be in format of /") + return + } + + pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1]) + if err != nil { + fmt.Println(err) + return + } + + stg, err := cmdCtx.Cmdline.Svc.StorageSvc().GetByName(userID, stgName) + if err != nil { + fmt.Println(err) + return + } + + loadByID(cmdCtx, pkg.PackageID, stg.StorageID) +} + +func loadByID(cmdCtx *CommandContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID) { + userID := cdssdk.UserID(1) + startTime := time.Now() + + nodeID, taskID, err := cmdCtx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(userID, pkgID, stgID) + if err != nil { + fmt.Println(err) + return + } + + for { + complete, fullPath, err := cmdCtx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) + if err != nil { + fmt.Println(err) + return + } + + if complete { + fmt.Printf("Package loaded to: %s in %v\n", fullPath, time.Since(startTime)) + break + } + } +} diff --git a/client/internal/cmdline/lsp.go b/client/internal/cmdline/lsp.go new file mode 100644 index 0000000..9bcc3a3 --- /dev/null +++ b/client/internal/cmdline/lsp.go @@ -0,0 +1,74 @@ +package cmdline + +import ( + "fmt" + "strconv" + "strings" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func init() { + var usePkgID *bool + cmd := &cobra.Command{ + Use: "lsp", + Short: "List package information", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + cmdCtx := GetCmdCtx(cmd) + + if usePkgID != nil && *usePkgID { + id, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + fmt.Printf("Invalid package id: %s\n", args[0]) + return + } + + lspOneByID(cmdCtx, cdssdk.PackageID(id)) + } else { + lspByPath(cmdCtx, args[0]) + } + }, + } + usePkgID = cmd.Flags().BoolP("id", "i", false, "List with package id instead of path") + + rootCmd.AddCommand(cmd) +} + +func lspByPath(cmdCtx *CommandContext, path string) { + userID := cdssdk.UserID(1) + + comps := strings.Split(strings.Trim(path, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + if len(comps) != 2 { + fmt.Printf("Package path must be in format of /") + return + } + + pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1]) + if err != nil { + fmt.Println(err) + return + } + + wr := table.NewWriter() + wr.AppendHeader(table.Row{"ID", "Name", "State"}) + wr.AppendRow(table.Row{pkg.PackageID, pkg.Name, pkg.State}) + fmt.Println(wr.Render()) +} + +func lspOneByID(cmdCtx *CommandContext, id cdssdk.PackageID) { + userID := cdssdk.UserID(1) + + pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().Get(userID, id) + if err != nil { + fmt.Println(err) + return + } + + wr := table.NewWriter() + wr.AppendHeader(table.Row{"ID", "Name", "State"}) + wr.AppendRow(table.Row{pkg.PackageID, pkg.Name, pkg.State}) + fmt.Println(wr.Render()) +} diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go new file mode 100644 index 0000000..873d002 --- /dev/null +++ b/client/internal/cmdline/put.go @@ -0,0 +1,124 @@ +package cmdline + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/inhies/go-bytesize" + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +func init() { + var nodeID int64 + cmd := &cobra.Command{ + Use: "put", + Short: "Upload files to CDS", + Args: func(cmd *cobra.Command, args []string) error { + if err := cobra.ExactArgs(2)(cmd, args); err != nil { + return err + } + + remote := args[1] + comps := strings.Split(strings.Trim(remote, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + if len(comps) != 2 { + return fmt.Errorf("invalid remote path: %s, which must be in format of /", remote) + } + + return nil + }, + Run: func(cmd *cobra.Command, args []string) { + userID := cdssdk.UserID(1) + cmdCtx := GetCmdCtx(cmd) + + local := args[0] + remote := args[1] + comps := strings.Split(strings.Trim(remote, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + + startTime := time.Now() + + bkt, err := cmdCtx.Cmdline.Svc.BucketSvc().GetBucketByName(userID, comps[0]) + if err != nil { + fmt.Printf("getting bucket: %v\n", err) + return + } + + pkg, err := cmdCtx.Cmdline.Svc.PackageSvc().GetByName(userID, comps[0], comps[1]) + if err != nil { + if codeMsg, ok := err.(*mq.CodeMessageError); ok && codeMsg.Code == errorcode.DataNotFound { + pkg2, err := cmdCtx.Cmdline.Svc.PackageSvc().Create(userID, bkt.BucketID, comps[1]) + if err != nil { + fmt.Printf("creating package: %v\n", err) + return + } + pkg = &pkg2 + + } else { + fmt.Printf("getting package: %v\n", err) + return + } + } + + var fileCount int + var totalSize int64 + var uploadFilePathes []string + err = filepath.WalkDir(local, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + fileCount++ + + info, err := fi.Info() + if err == nil { + totalSize += info.Size() + } + } + + return nil + }) + if err != nil { + fmt.Printf("walking directory: %v\n", err) + return + } + + var nodeAff *cdssdk.NodeID + if nodeID != 0 { + id := cdssdk.NodeID(nodeID) + nodeAff = &id + } + + objIter := iterator.NewUploadingObjectIterator(local, uploadFilePathes) + taskID, err := cmdCtx.Cmdline.Svc.ObjectSvc().StartUploading(userID, pkg.PackageID, objIter, nodeAff) + if err != nil { + fmt.Printf("start uploading objects: %v\n", err) + return + } + + for { + complete, _, err := cmdCtx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + if err != nil { + fmt.Printf("uploading objects: %v\n", err) + return + } + + if complete { + break + } + } + + fmt.Printf("Put %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), remote, time.Since(startTime)) + }, + } + cmd.Flags().Int64VarP(&nodeID, "node", "n", 0, "node affinity") + + rootCmd.AddCommand(cmd) +} diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 9b5dd06..c098914 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -111,7 +111,7 @@ func (s *StorageService) GetInfo(ctx *gin.Context) { return } - info, err := s.svc.StorageSvc().GetInfo(req.UserID, req.StorageID) + info, err := s.svc.StorageSvc().Get(req.UserID, req.StorageID) if err != nil { log.Warnf("getting info: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage inf failed")) diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 3472130..c76f776 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -42,7 +42,8 @@ func (svc *PackageService) GetByName(userID cdssdk.UserID, bucketName string, pa getResp, err := coorCli.GetPackageByName(coormq.ReqGetPackageByName(userID, bucketName, packageName)) if err != nil { - return nil, fmt.Errorf("requsting to coodinator: %w", err) + // TODO 要附加日志信息,但不能直接%w,因为外部需要判断错误吗 + return nil, err } return &getResp.Package, nil diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index d5671a4..448c9a6 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -20,6 +20,36 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } +func (svc *StorageService) Get(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return &getResp.Storage, nil +} + +func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model.Storage, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorageByName(coormq.ReqGetStorageByName(userID, name)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return &getResp.Storage, nil +} + func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -27,12 +57,12 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package } defer stgglb.CoordinatorMQPool.Release(coorCli) - stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -43,7 +73,7 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package return 0, "", fmt.Errorf("start storage load package: %w", err) } - return stgResp.NodeID, startResp.TaskID, nil + return stgResp.Storage.NodeID, startResp.TaskID, nil } func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) { @@ -84,12 +114,12 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke } defer stgglb.CoordinatorMQPool.Release(coorCli) - stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -100,7 +130,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke return 0, "", fmt.Errorf("start storage upload package: %w", err) } - return stgResp.NodeID, startResp.TaskID, nil + return stgResp.Storage.NodeID, startResp.TaskID, nil } func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) { @@ -127,18 +157,3 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID return true, waitResp.PackageID, nil } - -func (svc *StorageService) GetInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) - if err != nil { - return nil, fmt.Errorf("request to coordinator: %w", err) - } - - return &getResp.Storage, nil -} diff --git a/common/pkgs/db/storage.go b/common/pkgs/db/storage.go index 93582d0..bcf4f60 100644 --- a/common/pkgs/db/storage.go +++ b/common/pkgs/db/storage.go @@ -59,6 +59,16 @@ func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storag return stg, err } +func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) { + var stg model.Storage + err := sqlx.Get(ctx, &stg, + "select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = Storage.StorageID and Storage.Name = ?", + userID, + name) + + return stg, err +} + func (db *StorageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, state string) error { _, err := ctx.Exec("update Storage set State = ? where StorageID = ?", state, storageID) return err diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index f12ce94..a35a953 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -10,7 +10,9 @@ import ( ) type StorageService interface { - GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage) + GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage) + + GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) @@ -18,37 +20,58 @@ type StorageService interface { } // 获取Storage信息 -var _ = Register(Service.GetStorageInfo) +var _ = Register(Service.GetStorage) -type GetStorageInfo struct { +type GetStorage struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` StorageID cdssdk.StorageID `json:"storageID"` } -type GetStorageInfoResp struct { +type GetStorageResp struct { mq.MessageBodyBase - model.Storage + Storage model.Storage `json:"storage"` } -func NewGetStorageInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorageInfo { - return &GetStorageInfo{ +func ReqGetStorage(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorage { + return &GetStorage{ UserID: userID, StorageID: storageID, } } -func NewGetStorageInfoResp(storageID cdssdk.StorageID, name string, nodeID cdssdk.NodeID, dir string, state string) *GetStorageInfoResp { - return &GetStorageInfoResp{ - Storage: model.Storage{ - StorageID: storageID, - Name: name, - NodeID: nodeID, - Directory: dir, - State: state, - }, +func RespGetStorage(stg model.Storage) *GetStorageResp { + return &GetStorageResp{ + Storage: stg, + } +} +func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) { + return mq.Request(Service.GetStorage, client.rabbitCli, msg) +} + +var _ = Register(Service.GetStorageByName) + +type GetStorageByName struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Name string `json:"name"` +} +type GetStorageByNameResp struct { + mq.MessageBodyBase + Storage model.Storage `json:"storage"` +} + +func ReqGetStorageByName(userID cdssdk.UserID, name string) *GetStorageByName { + return &GetStorageByName{ + UserID: userID, + Name: name, + } +} +func RespGetStorageByNameResp(storage model.Storage) *GetStorageByNameResp { + return &GetStorageByNameResp{ + Storage: storage, } } -func (client *Client) GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, error) { - return mq.Request(Service.GetStorageInfo, client.rabbitCli, msg) +func (client *Client) GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, error) { + return mq.Request(Service.GetStorageByName, client.rabbitCli, msg) } // 提交调度记录 diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index 73acd1c..b83c9d1 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -34,6 +34,10 @@ func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetP WithField("PackageName", msg.PackageName). Warnf("get package by name: %s", err.Error()) + if err == sql.ErrNoRows { + return nil, mq.Failed(errorcode.DataNotFound, "package not found") + } + return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed") } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 6060bd0..3556b3f 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -13,14 +13,29 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) { +func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) { stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) if err != nil { logger.Warnf("getting user storage: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") } - return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) + return mq.ReplyOK(coormq.RespGetStorage(stg)) +} + +func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) { + stg, err := svc.db.Storage().GetUserStorageByName(svc.db.SQLCtx(), msg.UserID, msg.Name) + if err != nil { + logger.Warnf("getting user storage by name: %s", err.Error()) + + if err == sql.ErrNoRows { + return nil, mq.Failed(errorcode.DataNotFound, "storage not found") + } + + return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") + } + + return mq.ReplyOK(coormq.RespGetStorageByNameResp(stg)) } func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { diff --git a/go.mod b/go.mod index b0f5ebb..0ae4648 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/google/uuid v1.3.1 github.com/hashicorp/golang-lru/v2 v2.0.5 + github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf github.com/jedib0t/go-pretty/v6 v6.4.7 github.com/jmoiron/sqlx v1.3.5 github.com/klauspost/reedsolomon v1.11.8 github.com/magefile/mage v1.15.0 github.com/samber/lo v1.38.1 github.com/smartystreets/goconvey v1.8.1 + github.com/spf13/cobra v1.8.0 gitlink.org.cn/cloudream/common v0.0.0 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -43,6 +45,7 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/imdario/mergo v0.3.15 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/boxo v0.12.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-ipfs-api v0.7.0 // indirect @@ -74,6 +77,7 @@ require ( github.com/sirupsen/logrus v1.9.2 // indirect github.com/smarty/assertions v1.15.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/streadway/amqp v1.1.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect diff --git a/go.sum b/go.sum index 5bf6160..d13a0e8 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -51,8 +52,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -64,6 +65,10 @@ github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvH github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s= +github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4= github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ= github.com/ipfs/boxo v0.12.0/go.mod h1:xAnfiU6PtxWCnRqu7dcXQ10bB5/kvI1kXRotuGqGBhg= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= @@ -141,6 +146,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= @@ -151,6 +157,10 @@ github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sS github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=