From 1d88199eaeef7eee86d1c15812288b982d4a8a9b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 16:15:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/accessstat/access_stat.go | 2 +- client/internal/cmdline/bucket.go | 2 + client/internal/cmdline/commandline.go | 54 +---- client/internal/cmdline/distlock.go | 2 + client/internal/cmdline/getp.go | 6 +- client/internal/cmdline/load.go | 16 +- client/internal/cmdline/lsp.go | 2 + client/internal/cmdline/mount.go | 36 ++-- client/internal/cmdline/newloadp.go | 2 + client/internal/cmdline/object.go | 2 + client/internal/cmdline/package.go | 2 + client/internal/cmdline/put.go | 2 + client/internal/cmdline/serve.go | 120 ++++++----- client/internal/cmdline/storage.go | 38 ---- client/internal/config/config.go | 4 +- client/internal/db/db.go | 10 +- client/internal/downloader/downloader.go | 4 +- client/internal/downloader/iterator.go | 8 +- .../internal/downloader/strategy/selector.go | 16 +- client/internal/http/object.go | 8 +- client/internal/http/package.go | 32 +-- client/internal/http/presigned.go | 36 ++-- client/internal/http/storage.go | 16 +- client/internal/metacache/host.go | 13 +- client/internal/metacache/storagemeta.go | 73 ++++--- client/internal/services/object.go | 17 +- client/internal/services/package.go | 6 +- client/internal/services/service.go | 8 +- client/internal/services/storage.go | 51 +++-- client/main.go | 2 +- common/globals/globals.go | 1 + common/models/datamap/datamap.go | 121 ++++++----- common/pkgs/ec/block.go | 190 ------------------ common/pkgs/mq/coordinator/storage.go | 28 +-- coordinator/internal/mq/storage.go | 57 +++++- coordinator/types/storage.go | 5 + 36 files changed, 430 insertions(+), 562 deletions(-) delete mode 100644 client/internal/cmdline/storage.go delete mode 100644 common/pkgs/ec/block.go diff --git a/client/internal/accessstat/access_stat.go b/client/internal/accessstat/access_stat.go index 7cc416f..1ac14fc 100644 --- a/client/internal/accessstat/access_stat.go +++ b/client/internal/accessstat/access_stat.go @@ -55,7 +55,7 @@ func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { continue } - err := db.DoTx11(p.db, p.db.Package().BatchAddPackageAccessStat, st) + err := db.DoTx10(p.db, p.db.Package().BatchAddPackageAccessStat, st) if err != nil { logger.Errorf("add all package access stat counter: %v", err) diff --git a/client/internal/cmdline/bucket.go b/client/internal/cmdline/bucket.go index 5c7b919..015a0fb 100644 --- a/client/internal/cmdline/bucket.go +++ b/client/internal/cmdline/bucket.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "time" @@ -56,3 +57,4 @@ func init() { commands.MustAdd(BucketDeleteBucket, "bucket", "delete") } +*/ diff --git a/client/internal/cmdline/commandline.go b/client/internal/cmdline/commandline.go index 9b23c1f..be4bb06 100644 --- a/client/internal/cmdline/commandline.go +++ b/client/internal/cmdline/commandline.go @@ -1,64 +1,20 @@ package cmdline import ( - "context" - "fmt" - "os" - "github.com/spf13/cobra" - "gitlink.org.cn/cloudream/common/pkgs/cmdtrie" "gitlink.org.cn/cloudream/storage2/client/internal/services" ) type CommandContext struct { - Cmdline *Commandline + svc *services.Service } -// TODO 逐步使用cobra代替cmdtrie -var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() - var RootCmd = cobra.Command{} -type Commandline struct { - Svc *services.Service -} - -func NewCommandline(svc *services.Service) (*Commandline, error) { - return &Commandline{ - Svc: svc, - }, nil -} - -func (c *Commandline) DispatchCommand(allArgs []string) { - cmdCtx := CommandContext{ - Cmdline: c, - } - cmdErr, err := commands.Execute(cmdCtx, allArgs, cmdtrie.ExecuteOption{ReplaceEmptyArrayWithNil: true}) - if err != nil { - if err == cmdtrie.ErrCommandNotFound { - ctx := context.WithValue(context.Background(), "cmdCtx", &cmdCtx) - err = RootCmd.ExecuteContext(ctx) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - return - } - - fmt.Printf("execute command failed, err: %s", err.Error()) - os.Exit(1) - } - if cmdErr != nil { - fmt.Printf("execute command failed, err: %s", cmdErr.Error()) - os.Exit(1) - } -} - -func MustAddCmd(fn any, prefixWords ...string) any { - commands.MustAdd(fn, prefixWords...) - return nil -} - func GetCmdCtx(cmd *cobra.Command) *CommandContext { return cmd.Context().Value("cmdCtx").(*CommandContext) } + +func RootExecute() { + RootCmd.Execute() +} diff --git a/client/internal/cmdline/distlock.go b/client/internal/cmdline/distlock.go index 74d3f1c..8655418 100644 --- a/client/internal/cmdline/distlock.go +++ b/client/internal/cmdline/distlock.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "strings" @@ -84,3 +85,4 @@ func init() { commands.MustAdd(DistLockUnlock, "distlock", "unlock") } +*/ diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go index a611282..e80fcd1 100644 --- a/client/internal/cmdline/getp.go +++ b/client/internal/cmdline/getp.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "io" @@ -115,8 +116,8 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { return fmt.Errorf("copy object data to local file failed, err: %w", err) } - if config.Cfg().StorageID > 0 { - cmdCtx.Cmdline.Svc.AccessStat.AddAccessCounter(objInfo.Object.ObjectID, id, config.Cfg().StorageID, 1) + if config.Cfg().UserSpaceID > 0 { + cmdCtx.Cmdline.Svc.AccessStat.AddAccessCounter(objInfo.Object.ObjectID, id, config.Cfg().UserSpaceID, 1) } return nil }() @@ -128,3 +129,4 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { fmt.Printf("Get %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), output, time.Since(startTime)) } +*/ diff --git a/client/internal/cmdline/load.go b/client/internal/cmdline/load.go index 4d30178..d234f9a 100644 --- a/client/internal/cmdline/load.go +++ b/client/internal/cmdline/load.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "strconv" @@ -7,7 +8,7 @@ import ( "time" "github.com/spf13/cobra" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) func init() { @@ -25,12 +26,12 @@ func init() { fmt.Printf("Invalid package ID: %s\n", args[0]) } - stgID, err := strconv.ParseInt(args[1], 10, 64) + userSpaceID, err := strconv.ParseInt(args[1], 10, 64) if err != nil { - fmt.Printf("Invalid storage ID: %s\n", args[1]) + fmt.Printf("Invalid user space ID: %s\n", args[1]) } - loadByID(cmdCtx, cdssdk.PackageID(pkgID), cdssdk.StorageID(stgID), args[2]) + loadByID(cmdCtx, clitypes.PackageID(pkgID), clitypes.UserSpaceID(userSpaceID), args[2]) } else { loadByPath(cmdCtx, args[0], args[1], args[2]) } @@ -41,7 +42,7 @@ func init() { } func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string, rootPath string) { - comps := strings.Split(strings.Trim(pkgPath, cdssdk.ObjectPathSeparator), cdssdk.ObjectPathSeparator) + comps := strings.Split(strings.Trim(pkgPath, clitypes.ObjectPathSeparator), clitypes.ObjectPathSeparator) if len(comps) != 2 { fmt.Printf("Package path must be in format of /") return @@ -53,7 +54,7 @@ func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string, rootPath return } - stg, err := cmdCtx.Cmdline.Svc.StorageSvc().GetByName(stgName) + // stg, err := cmdCtx.Cmdline.Svc.StorageSvc().GetByName(stgName) if err != nil { fmt.Println(err) return @@ -62,7 +63,7 @@ func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string, rootPath loadByID(cmdCtx, pkg.PackageID, stg.StorageID, rootPath) } -func loadByID(cmdCtx *CommandContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID, rootPath string) { +func loadByID(cmdCtx *CommandContext, pkgID clitypes.PackageID, stgID clitypes.StorageID, rootPath string) { startTime := time.Now() err := cmdCtx.Cmdline.Svc.StorageSvc().LoadPackage(pkgID, stgID, rootPath) @@ -73,3 +74,4 @@ func loadByID(cmdCtx *CommandContext, pkgID cdssdk.PackageID, stgID cdssdk.Stora fmt.Printf("Package loaded to: %v:%v in %v\n", stgID, rootPath, time.Since(startTime)) } +*/ diff --git a/client/internal/cmdline/lsp.go b/client/internal/cmdline/lsp.go index cca0bb6..72698cf 100644 --- a/client/internal/cmdline/lsp.go +++ b/client/internal/cmdline/lsp.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "strconv" @@ -68,3 +69,4 @@ func lspOneByID(cmdCtx *CommandContext, id cdssdk.PackageID) { wr.AppendRow(table.Row{pkg.PackageID, pkg.Name}) fmt.Println(wr.Render()) } +*/ diff --git a/client/internal/cmdline/mount.go b/client/internal/cmdline/mount.go index 61c92b6..166a7aa 100644 --- a/client/internal/cmdline/mount.go +++ b/client/internal/cmdline/mount.go @@ -8,18 +8,18 @@ import ( "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage2/client/internal/config" + db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/client/internal/metacache" "gitlink.org.cn/cloudream/storage2/client/internal/mount" mntcfg "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) func init() { @@ -50,21 +50,26 @@ func mountCmd(mountPoint string, configPath string) { os.Exit(1) } - stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) - stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) - stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) + // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) + // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 初始化存储服务管理器 - stgAgts := agtpool.NewPool() + stgPool := agtpool.NewPool() + + db, err := db2.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } // 启动网络连通性检测,并就地检测一次 conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) // conCol.CollectInPlace() // 初始化元数据缓存服务 - metacacheHost := metacache.NewHost() + metacacheHost := metacache.NewHost(db) go metacacheHost.Serve() stgMeta := metacacheHost.AddStorageMeta() hubMeta := metacacheHost.AddHubMeta() @@ -82,15 +87,10 @@ func mountCmd(mountPoint string, configPath string) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 初始化下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) - - db, err := db2.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db2 failed, err: %s", err.Error()) - } + uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db) mnt := mount.NewMount(&mntcfg.Config{ CacheDir: "./cache", diff --git a/client/internal/cmdline/newloadp.go b/client/internal/cmdline/newloadp.go index d9b4f46..8d5decd 100644 --- a/client/internal/cmdline/newloadp.go +++ b/client/internal/cmdline/newloadp.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "os" @@ -104,3 +105,4 @@ func newloadp(cmdCtx *CommandContext, path string, bucketID cdssdk.BucketID, pac wr.AppendRow(table.Row{ret.Package.PackageID, ret.Package.Name, fileCount, totalSize}) fmt.Println(wr.Render()) } +*/ diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go index 57c0a89..ea9b7c3 100644 --- a/client/internal/cmdline/object.go +++ b/client/internal/cmdline/object.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "os" @@ -65,3 +66,4 @@ var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath return nil }, "obj", "upload") +*/ diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 7722ffd..5bb127f 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "io" @@ -186,3 +187,4 @@ func init() { // 查询package缓存到哪些节点 commands.MustAdd(PackageGetCachedStorages, "pkg", "cached") } +*/ diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go index 01cd35a..e4c5275 100644 --- a/client/internal/cmdline/put.go +++ b/client/internal/cmdline/put.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "fmt" "os" @@ -119,3 +120,4 @@ func init() { RootCmd.AddCommand(cmd) } +*/ diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index cd492c3..21d5072 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -1,27 +1,28 @@ package cmdline import ( + "context" "fmt" "os" "time" "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage2/client/internal/accessstat" "gitlink.org.cn/cloudream/storage2/client/internal/config" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/internal/http" + "gitlink.org.cn/cloudream/storage2/client/internal/metacache" "gitlink.org.cn/cloudream/storage2/client/internal/services" - "gitlink.org.cn/cloudream/storage2/client/internal/task" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" + "gitlink.org.cn/cloudream/storage2/common/models/datamap" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) // 初始化函数,将ServeHTTP命令注册到命令列表中。 @@ -52,46 +53,32 @@ func serveHTTP(configPath string, listenAddr string) { os.Exit(1) } - stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) - // 连接性信息收集 - var conCol connectivity.Collector - if config.Cfg().Local.HubID != nil { - //如果client与某个hub处于同一台机器,则使用这个hub的连通性信息 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("acquire coordinator mq failed, err: %s", err.Error()) - os.Exit(1) - } - getCons, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{*config.Cfg().Local.HubID})) - if err != nil { - logger.Warnf("get hub connectivities failed, err: %s", err.Error()) - os.Exit(1) - } - consMap := make(map[cdssdk.HubID]connectivity.Connectivity) - for _, con := range getCons.Connectivities { - var delay *time.Duration - if con.Latency != nil { - d := time.Duration(*con.Latency * float32(time.Millisecond)) - delay = &d - } - consMap[con.FromHubID] = connectivity.Connectivity{ - ToHubID: con.ToHubID, - Latency: delay, - } - } - conCol = connectivity.NewCollectorWithInitData(&config.Cfg().Connectivity, nil, consMap) - logger.Info("use local hub connectivities") + // 数据库 + db, err := db.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } - } else { - // 否则需要就地收集连通性信息 - conCol = connectivity.NewCollector(&config.Cfg().Connectivity, nil) - conCol.CollectInPlace() + // 初始化系统事件发布器 + evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceClient{ + UserID: config.Cfg().Local.UserID, + }) + if err != nil { + logger.Errorf("new sysevent publisher: %v", err) + os.Exit(1) } + go servePublisher(evtPub) - metaCacheHost := metacache.NewHost() + // 连接性信息收集 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + conCol.CollectInPlace() + + // 元数据缓存 + metaCacheHost := metacache.NewHost(db) go metaCacheHost.Serve() stgMeta := metaCacheHost.AddStorageMeta() hubMeta := metaCacheHost.AddHubMeta() @@ -109,24 +96,22 @@ func serveHTTP(configPath string, listenAddr string) { acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 ReportInterval: time.Second * 10, - }) + }, db) go serveAccessStat(acStat) // 存储管理器 stgAgts := agtpool.NewPool() - // 任务管理器 - taskMgr := task.NewManager(distlockSvc, &conCol, stgAgts) - + // 下载策略 strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel, db) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) + uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta, db) - svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader, strgSel, stgMeta) + svc, err := services.NewService(distlockSvc, &dlder, acStat, uploader, strgSel, stgMeta, db, evtPub) if err != nil { logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) @@ -196,3 +181,38 @@ loop: // TODO 仅简单结束了程序 os.Exit(1) } + +func servePublisher(evtPub *sysevent.Publisher) { + logger.Info("start serving sysevent publisher") + + ch := evtPub.Start() + +loop: + for { + val, err := ch.Receive().Wait(context.Background()) + if err != nil { + logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case sysevent.PublishError: + logger.Errorf("publishing event: %v", val) + + case sysevent.PublisherExited: + if val.Err != nil { + logger.Errorf("publisher exited with error: %v", val.Err) + } else { + logger.Info("publisher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent: %v", val) + } + } + logger.Info("sysevent publisher stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go deleted file mode 100644 index 43899d7..0000000 --- a/client/internal/cmdline/storage.go +++ /dev/null @@ -1,38 +0,0 @@ -package cmdline - -import ( - "fmt" - "time" - - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" -) - -// UserSpaceCreatePackage 创建一个新的包并上传到指定的存储系统。 -// ctx: 命令上下文,提供必要的服务和环境配置。 -// bucketID: 存储桶的唯一标识,包将被上传到这个存储桶中。 -// name: 新包的名称。 -// storageID: 目标存储系统的唯一标识。 -// path: 包在存储系统中的路径。 -// 返回值: 执行过程中遇到的任何错误。 -func UserSpaceCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string, spaceID cdssdk.UserSpaceID, path string) error { - startTime := time.Now() - defer func() { - // 打印函数执行时间 - fmt.Printf("%v\n", time.Since(startTime).Seconds()) - }() - - // 开始创建并上传包到存储系统 - pkg, err := ctx.Cmdline.Svc.StorageSvc().StorageCreatePackage(bucketID, name, spaceID, path, 0) - if err != nil { - return fmt.Errorf("start storage uploading package: %w", err) - } - - fmt.Printf("%d\n", pkg.PackageID) - return nil -} - -// 初始化函数,注册加载包和创建包的命令到命令行解析器。 -func init() { - // 注册创建包命令 - commands.MustAdd(UserSpaceCreatePackage, "stg", "pkg", "new") -} diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 36f4ed3..cf54835 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -8,10 +8,10 @@ import ( "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" - cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Config struct { @@ -23,7 +23,7 @@ type Config struct { Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` DownloadStrategy strategy.Config `json:"downloadStrategy"` - StorageID cortypes.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 AuthSecretKey string `json:"authSecretKey"` MaxHTTPBodySize int64 `json:"maxHttpBodySize"` diff --git a/client/internal/db/db.go b/client/internal/db/db.go index 588f2d3..7db69a0 100644 --- a/client/internal/db/db.go +++ b/client/internal/db/db.go @@ -28,13 +28,13 @@ func (db *DB) DoTx(do func(tx SQLContext) error) error { }) } -func DoTx11[T any](db *DB, do func(tx SQLContext, t T) error, t T) error { +func DoTx10[T any](db *DB, do func(tx SQLContext, t T) error, t T) error { return db.db.Transaction(func(tx *gorm.DB) error { return do(SQLContext{tx}, t) }) } -func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { +func DoTx01[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { var err error @@ -44,7 +44,7 @@ func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { return ret, err } -func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) { +func DoTx11[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { var err error @@ -54,7 +54,7 @@ func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) ( return ret, err } -func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) { +func DoTx21[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { var err error @@ -64,7 +64,7 @@ func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) return ret, err } -func DoTx32[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) { +func DoTx31[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { var err error diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go index 19755e7..d08bac8 100644 --- a/client/internal/downloader/downloader.go +++ b/client/internal/downloader/downloader.go @@ -71,7 +71,7 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { return iterator.Empty[*Downloading]() } - objDetails, err := db.DoTx12(d.db, d.db.Object().BatchGetDetails, objIDs) + objDetails, err := db.DoTx11(d.db, d.db.Object().BatchGetDetails, objIDs) if err != nil { return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err)) } @@ -108,7 +108,7 @@ func (d *Downloader) DownloadObjectByDetail(detail types.ObjectDetail, off int64 } func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator { - details, err := db.DoTx12(d.db, d.db.Object().GetPackageObjectDetails, pkgID) + details, err := db.DoTx11(d.db, d.db.Object().GetPackageObjectDetails, pkgID) if err != nil { return iterator.FuseError[*Downloading](fmt.Errorf("get package object details: %w", err)) } diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 5c428c5..bdfa1ba 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -109,7 +109,7 @@ func (i *DownloadObjectIterator) Close() { } func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) { - logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Space.Storage.String()) + logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.UserSpace.Storage.String()) var strHandle *exec.DriverReadStream ft := ioswitch2.NewFromTo() @@ -123,7 +123,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat toExec.Range.Length = &len } - ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Space.MasterHub, strg.Space, ioswitch2.RawStream())).AddTo(toExec) + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.UserSpace.MasterHub, strg.UserSpace, ioswitch2.RawStream())).AddTo(toExec) strHandle = handle plans := exec.NewPlanBuilder() @@ -146,7 +146,7 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Spaces[i].Storage.String())) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.UserSpaces[i].Storage.String())) } logger.Debug(logStrs...) @@ -154,7 +154,7 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str for i, b := range strg.Blocks { downloadBlks[i] = downloadBlock{ Block: b, - Space: strg.Spaces[i], + Space: strg.UserSpaces[i], } } diff --git a/client/internal/downloader/strategy/selector.go b/client/internal/downloader/strategy/selector.go index 0968f0b..655960f 100644 --- a/client/internal/downloader/strategy/selector.go +++ b/client/internal/downloader/strategy/selector.go @@ -29,8 +29,8 @@ type Strategy interface { // 直接下载完整对象 type DirectStrategy struct { - Detail types.ObjectDetail - Space types.UserSpaceDetail + Detail types.ObjectDetail + UserSpace types.UserSpaceDetail } func (s *DirectStrategy) GetDetail() types.ObjectDetail { @@ -42,7 +42,7 @@ type ECReconstructStrategy struct { Detail types.ObjectDetail Redundancy types.ECRedundancy Blocks []types.ObjectBlock - Spaces []types.UserSpaceDetail + UserSpaces []types.UserSpaceDetail } func (s *ECReconstructStrategy) GetDetail() types.ObjectDetail { @@ -135,8 +135,8 @@ func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) { } return &DirectStrategy{ - Detail: req.Detail, - Space: sortedStgs[0].Space, + Detail: req.Detail, + UserSpace: sortedStgs[0].Space, }, nil } @@ -161,7 +161,7 @@ func (s *Selector) selectForEC(req request2, red types.ECRedundancy) (Strategy, Detail: req.Detail, Redundancy: red, Blocks: bs, - Spaces: ss, + UserSpaces: ss, }, nil } @@ -171,8 +171,8 @@ func (s *Selector) selectForEC(req request2, red types.ECRedundancy) (Strategy, } return &DirectStrategy{ - Detail: req.Detail, - Space: stg, + Detail: req.Detail, + UserSpace: stg, }, nil } diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 40c7fa4..0938bc7 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -176,8 +176,8 @@ func (s *ObjectService) Download(ctx *gin.Context) { } // TODO 当client不在某个代理节点上时如何处理? - if config.Cfg().StorageID > 0 { - s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) + if config.Cfg().UserSpaceID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) } } @@ -233,8 +233,8 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { log.Warnf("copying file: %s", err.Error()) } - if config.Cfg().StorageID > 0 { - s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) + if config.Cfg().UserSpaceID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) } } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 08a3d72..034d18d 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -10,8 +10,8 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) // PackageService 包服务,负责处理包相关的HTTP请求。 @@ -29,7 +29,7 @@ func (s *Server) Package() *PackageService { func (s *PackageService) Get(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Get") - var req cdsapi.PackageGetReq + var req cliapi.PackageGetReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -43,13 +43,13 @@ func (s *PackageService) Get(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageGetResp{Package: *pkg})) + ctx.JSON(http.StatusOK, OK(cliapi.PackageGetResp{Package: pkg})) } func (s *PackageService) GetByFullName(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.GetByFullName") - var req cdsapi.PackageGetByFullName + var req cliapi.PackageGetByFullName if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -63,13 +63,13 @@ func (s *PackageService) GetByFullName(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageGetByFullNameResp{Package: *pkg})) + ctx.JSON(http.StatusOK, OK(cliapi.PackageGetByFullNameResp{Package: pkg})) } // Create 处理创建新包的HTTP请求。 func (s *PackageService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Create") - var req cdsapi.PackageCreate + var req cliapi.PackageCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -83,13 +83,13 @@ func (s *PackageService) Create(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageCreateResp{ + ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateResp{ Package: pkg, })) } type PackageCreateLoad struct { - Info cdsapi.PackageCreateLoadInfo `form:"info" binding:"required"` + Info cliapi.PackageCreateLoadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` } @@ -150,18 +150,18 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { return } - objs := make([]cdssdk.Object, len(pathes)) + objs := make([]clitypes.Object, len(pathes)) for i := range pathes { objs[i] = ret.Objects[pathes[i]] } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageCreateLoadResp{Package: ret.Package, Objects: objs})) + ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateLoadResp{Package: ret.Package, Objects: objs})) } func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") - var req cdsapi.PackageDelete + var req cliapi.PackageDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -181,7 +181,7 @@ func (s *PackageService) Delete(ctx *gin.Context) { func (s *PackageService) Clone(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Clone") - var req cdsapi.PackageClone + var req cliapi.PackageClone if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -195,7 +195,7 @@ func (s *PackageService) Clone(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageCloneResp{ + ctx.JSON(http.StatusOK, OK(cliapi.PackageCloneResp{ Package: pkg, })) } @@ -203,7 +203,7 @@ func (s *PackageService) Clone(ctx *gin.Context) { func (s *PackageService) ListBucketPackages(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.ListBucketPackages") - var req cdsapi.PackageListBucketPackages + var req cliapi.PackageListBucketPackages if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -217,7 +217,7 @@ func (s *PackageService) ListBucketPackages(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PackageListBucketPackagesResp{ + ctx.JSON(http.StatusOK, OK(cliapi.PackageListBucketPackagesResp{ Packages: pkgs, })) } diff --git a/client/internal/http/presigned.go b/client/internal/http/presigned.go index 5999c64..30726bb 100644 --- a/client/internal/http/presigned.go +++ b/client/internal/http/presigned.go @@ -13,8 +13,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage2/client/internal/config" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" ) type PresignedService struct { @@ -30,7 +30,7 @@ func (s *Server) Presigned() *PresignedService { func (s *PresignedService) ObjectListByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectListByPath") - var req cdsapi.PresignedObjectListByPath + var req cliapi.PresignedObjectListByPath if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -50,14 +50,14 @@ func (s *PresignedService) ObjectListByPath(ctx *gin.Context) { func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectDownloadByPath") - var req cdsapi.PresignedObjectDownloadByPath + var req cliapi.PresignedObjectDownloadByPath if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + resp, err := s.svc.ObjectSvc().GetByPath(cliapi.ObjectListByPath{ PackageID: req.PackageID, Path: req.Path, }) if err != nil { @@ -98,15 +98,15 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { log.Warnf("copying file: %s", err.Error()) } - if config.Cfg().StorageID > 0 { - s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) + if config.Cfg().UserSpaceID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) } } func (s *PresignedService) ObjectDownload(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectDownloadByPath") - var req cdsapi.PresignedObjectDownload + var req cliapi.PresignedObjectDownload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -140,15 +140,15 @@ func (s *PresignedService) ObjectDownload(ctx *gin.Context) { log.Warnf("copying file: %s", err.Error()) } - if config.Cfg().StorageID > 0 { - s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) + if config.Cfg().UserSpaceID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) } } func (s *PresignedService) ObjectUpload(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectUpload") - var req cdsapi.PresignedObjectUpload + var req cliapi.PresignedObjectUpload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -179,13 +179,13 @@ func (s *PresignedService) ObjectUpload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: ret.Objects[path]})) + ctx.JSON(http.StatusOK, OK(cliapi.PresignedObjectUploadResp{Object: ret.Objects[path]})) } func (s *PresignedService) ObjectNewMultipartUpload(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectNewMultipartUpload") - var req cdsapi.PresignedObjectNewMultipartUpload + var req cliapi.PresignedObjectNewMultipartUpload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -199,13 +199,13 @@ func (s *PresignedService) ObjectNewMultipartUpload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: obj})) + ctx.JSON(http.StatusOK, OK(cliapi.PresignedObjectUploadResp{Object: obj})) } func (s *PresignedService) ObjectUploadPart(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectUploadPart") - var req cdsapi.PresignedObjectUploadPart + var req cliapi.PresignedObjectUploadPart if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -219,13 +219,13 @@ func (s *PresignedService) ObjectUploadPart(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadPartResp{})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectUploadPartResp{})) } func (s *PresignedService) ObjectCompleteMultipartUpload(ctx *gin.Context) { log := logger.WithField("HTTP", "Presigned.ObjectCompleteMultipartUpload") - var req cdsapi.PresignedObjectCompleteMultipartUpload + var req cliapi.PresignedObjectCompleteMultipartUpload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -239,5 +239,5 @@ func (s *PresignedService) ObjectCompleteMultipartUpload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCompleteMultipartUploadResp{Object: obj})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectCompleteMultipartUploadResp{Object: obj})) } diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 7db3a96..a620598 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -7,7 +7,7 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" ) type UserSpaceService struct { @@ -23,7 +23,7 @@ func (s *Server) UserSpace() *UserSpaceService { func (s *UserSpaceService) LoadPackage(ctx *gin.Context) { log := logger.WithField("HTTP", "UserSpace.LoadPackage") - var req cdsapi.UserSpaceLoadPackageReq + var req cliapi.UserSpaceLoadPackageReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -37,13 +37,13 @@ func (s *UserSpaceService) LoadPackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.UserSpaceLoadPackageResp{})) + ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceLoadPackageResp{})) } func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { log := logger.WithField("HTTP", "UserSpace.CreatePackage") - var req cdsapi.UserSpaceCreatePackageReq + var req cliapi.UserSpaceCreatePackageReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -58,7 +58,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.UserSpaceCreatePackageResp{ + ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceCreatePackageResp{ Package: pkg, })) } @@ -66,7 +66,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { func (s *UserSpaceService) Get(ctx *gin.Context) { log := logger.WithField("HTTP", "UserSpace.Get") - var req cdsapi.UserSpaceGet + var req cliapi.UserSpaceGet if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -80,7 +80,7 @@ func (s *UserSpaceService) Get(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.UserSpaceGetResp{ - UserSpace: *info, + ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceGetResp{ + UserSpace: info, })) } diff --git a/client/internal/metacache/host.go b/client/internal/metacache/host.go index 542a81a..17def7d 100644 --- a/client/internal/metacache/host.go +++ b/client/internal/metacache/host.go @@ -1,17 +1,24 @@ package metacache -import "time" +import ( + "time" + + "gitlink.org.cn/cloudream/storage2/client/internal/db" +) type MetaCache interface { ClearOutdated() } type MetaCacheHost struct { + db *db.DB caches []MetaCache } -func NewHost() *MetaCacheHost { - return &MetaCacheHost{} +func NewHost(db *db.DB) *MetaCacheHost { + return &MetaCacheHost{ + db: db, + } } func (m *MetaCacheHost) Serve() { diff --git a/client/internal/metacache/storagemeta.go b/client/internal/metacache/storagemeta.go index 6dd684c..754d5c7 100644 --- a/client/internal/metacache/storagemeta.go +++ b/client/internal/metacache/storagemeta.go @@ -3,11 +3,17 @@ package metacache import ( "time" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) func (m *MetaCacheHost) AddStorageMeta() *UserSpaceMeta { - meta := &UserSpaceMeta{} + meta := &UserSpaceMeta{ + host: m, + } meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[types.UserSpaceID, types.UserSpaceDetail]{ Getter: meta.load, Expire: time.Minute * 5, @@ -18,6 +24,7 @@ func (m *MetaCacheHost) AddStorageMeta() *UserSpaceMeta { } type UserSpaceMeta struct { + host *MetaCacheHost cache *SimpleMetaCache[types.UserSpaceID, types.UserSpaceDetail] } @@ -45,29 +52,45 @@ func (s *UserSpaceMeta) ClearOutdated() { } func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, []bool) { - // vs := make([]stgmod.StorageDetail, len(keys)) - // oks := make([]bool, len(keys)) - - // coorCli, err := stgglb.CoordinatorMQPool.Acquire() - // if err != nil { - // logger.Warnf("new coordinator client: %v", err) - // return vs, oks - // } - // defer stgglb.CoordinatorMQPool.Release(coorCli) - - // get, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(keys)) - // if err != nil { - // logger.Warnf("get storage details: %v", err) - // return vs, oks - // } - - // for i := range keys { - // if get.Storages[i] != nil { - // vs[i] = *get.Storages[i] - // oks[i] = true - // } - // } - - // return vs, oks + vs := make([]types.UserSpaceDetail, len(keys)) + oks := make([]bool, len(keys)) + + spaces, err := s.host.db.UserSpace().BatchGetByID(s.host.db.DefCtx(), keys) + if err != nil { + logger.Warnf("batch get user space by id: %v", err) + return vs, oks + } + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %v", err) + return vs, oks + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + stgIDs := make([]cortypes.StorageID, len(spaces)) + for i := range spaces { + stgIDs[i] = spaces[i].StorageID + } + + getStgs, err := coorCli.GetStorageDetails(coordinator.ReqGetStorageDetails(stgIDs)) + if err != nil { + logger.Warnf("get storage details: %v", err) + return vs, oks + } + + for i := range spaces { + if getStgs.Storage[i] != nil { + vs[i] = types.UserSpaceDetail{ + UserID: stgglb.Local.UserID, + UserSpace: spaces[i], + Storage: getStgs.Storage[i].Storage, + MasterHub: getStgs.Storage[i].MasterHub, + } + + oks[i] = true + } + } + return vs, oks } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 229ef93..c4f2c73 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -245,7 +245,7 @@ func (svc *ObjectService) Move(movings []api.MovingObject) ([]types.ObjectID, er } for _, e := range evt { - svc.evtPub.Publish(e) + svc.EvtPub.Publish(e) } return sucs, nil @@ -409,7 +409,7 @@ func (svc *ObjectService) Delete(objectIDs []types.ObjectID) error { } for _, objID := range sucs { - svc.evtPub.Publish(&datamap.BodyObjectDeleted{ + svc.EvtPub.Publish(&datamap.BodyObjectDeleted{ ObjectID: objID, }) } @@ -542,9 +542,9 @@ func (svc *ObjectService) Clone(clonings []api.CloningObject) ([]*types.Object, oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks for _, blk := range oldBlks { evtBlks = append(evtBlks, datamap.BlockDistributionObjectInfo{ - BlockType: blkType, - Index: blk.Index, - StorageID: blk.StorageID, + BlockType: blkType, + Index: blk.Index, + UserSpaceID: blk.UserSpaceID, }) } @@ -562,7 +562,7 @@ func (svc *ObjectService) Clone(clonings []api.CloningObject) ([]*types.Object, } for _, e := range evt { - svc.evtPub.Publish(e) + svc.EvtPub.Publish(e) } return ret, nil @@ -685,7 +685,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index return types.Object{}, fmt.Errorf("no block indexes specified") } - objDe, err := db.DoTx12(svc.DB, svc.DB.Object().GetDetail, objectID) + objDe, err := db.DoTx11(svc.DB, svc.DB.Object().GetDetail, objectID) if err != nil { return types.Object{}, err } @@ -738,7 +738,8 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index } shardInfo := ret["shard"].(*ops2.ShardInfoValue) - err = svc.DB.Object().BatchUpdateRedundancy([]db.UpdatingObjectRedundancy{ + + err = db.DoTx10(svc.DB, svc.DB.Object().BatchUpdateRedundancy, []db.UpdatingObjectRedundancy{ { ObjectID: objectID, FileHash: shardInfo.Hash, diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 10b1c0c..8e3deba 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -39,7 +39,7 @@ func (svc *PackageService) Create(bucketID types.BucketID, name string) (types.P return types.Package{}, err } - svc.evtPub.Publish(&datamap.BodyNewPackage{ + svc.EvtPub.Publish(&datamap.BodyNewPackage{ Info: pkg, }) @@ -58,7 +58,7 @@ func (svc *PackageService) DeletePackage(packageID types.PackageID) error { return err } - svc.evtPub.Publish(&datamap.BodyPackageDeleted{ + svc.EvtPub.Publish(&datamap.BodyPackageDeleted{ PackageID: packageID, }) @@ -124,7 +124,7 @@ func (svc *PackageService) Clone(packageID types.PackageID, bucketID types.Bucke return types.Package{}, err } - svc.evtPub.Publish(&datamap.BodyPackageCloned{ + svc.EvtPub.Publish(&datamap.BodyPackageCloned{ SourcePackageID: packageID, NewPackage: pkg, SourceObjectIDs: oldObjIDs, diff --git a/client/internal/services/service.go b/client/internal/services/service.go index e398e00..519948a 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/internal/metacache" - "gitlink.org.cn/cloudream/storage2/client/internal/task" "gitlink.org.cn/cloudream/storage2/client/internal/uploader" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) @@ -15,19 +14,17 @@ import ( // Service 结构体封装了分布锁服务和任务管理服务。 type Service struct { DistLock *distlock.Service - TaskMgr *task.Manager Downloader *downloader.Downloader AccessStat *accessstat.AccessStat Uploader *uploader.Uploader StrategySelector *strategy.Selector UserSpaceMeta *metacache.UserSpaceMeta DB *db.DB - evtPub *sysevent.Publisher + EvtPub *sysevent.Publisher } func NewService( distlock *distlock.Service, - taskMgr *task.Manager, downloader *downloader.Downloader, accStat *accessstat.AccessStat, uploder *uploader.Uploader, @@ -38,13 +35,12 @@ func NewService( ) (*Service, error) { return &Service{ DistLock: distlock, - TaskMgr: taskMgr, Downloader: downloader, AccessStat: accStat, Uploader: uploder, StrategySelector: strategySelector, UserSpaceMeta: userSpaceMeta, DB: db, - evtPub: evtPub, + EvtPub: evtPub, }, nil } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index cf0517e..3297bbf 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -8,14 +8,12 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" ) type UserSpaceService struct { @@ -49,14 +47,14 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID return fmt.Errorf("userspace %v has no master hub", userspaceID) } - details, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(packageID)) + details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID) if err != nil { return err } var pinned []cdssdk.ObjectID plans := exec.NewPlanBuilder() - for _, obj := range details.Objects { + for _, obj := range details { strg, err := svc.StrategySelector.Select(strategy.Request{ Detail: obj, DestHub: destStg.MasterHub.HubID, @@ -81,7 +79,7 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) // 顺便保存到同存储服务的分片存储中 - if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() { + if destStg.UserSpace.ShardStore != nil { ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) pinned = append(pinned, obj.Object.ObjectID) } @@ -105,7 +103,7 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID // defer mutex.Unlock() // 记录访问统计 - for _, obj := range details.Objects { + for _, obj := range details { svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1) } @@ -115,8 +113,6 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID return err } - // 失败也没关系 - coorCli.UserSpacePackageLoaded(coormq.ReqUserSpacePackageLoaded(userID, userspaceID, packageID, rootPath, pinned)) return nil } @@ -133,25 +129,28 @@ func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, na // return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err) // } - spaceDetail := svc.UserSpaceMeta.Get(userspaceID) - if spaceDetail == nil { - return cdssdk.Package{}, fmt.Errorf("userspace not found: %d", userspaceID) - } + // spaceDetail := svc.UserSpaceMeta.Get(userspaceID) + // if spaceDetail == nil { + // return cdssdk.Package{}, fmt.Errorf("userspace not found: %d", userspaceID) + // } - if spaceDetail.UserSpace.ShardStore == nil { - return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled") - } + // if spaceDetail.UserSpace.ShardStore == nil { + // return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled") + // } - agentCli, err := stgglb.AgentMQPool.Acquire(spaceDetail.MasterHub.HubID) - if err != nil { - return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err) - } - defer stgglb.AgentMQPool.Release(agentCli) + // agentCli, err := stgglb.AgentMQPool.Acquire(spaceDetail.MasterHub.HubID) + // if err != nil { + // return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err) + // } + // defer stgglb.AgentMQPool.Release(agentCli) - createResp, err := agentCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) - if err != nil { - return cdssdk.Package{}, err - } + // createResp, err := agentCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) + // if err != nil { + // return cdssdk.Package{}, err + // } + + // return createResp.Package, nil - return createResp.Package, nil + // TODO 待实现 + return cdssdk.Package{}, fmt.Errorf("not implemented") } diff --git a/client/main.go b/client/main.go index 0873c71..77a5ba7 100644 --- a/client/main.go +++ b/client/main.go @@ -7,5 +7,5 @@ import ( ) func main() { - cmdline.RootCmd.Execute() + cmdline.RootExecute() } diff --git a/common/globals/globals.go b/common/globals/globals.go index 3398b69..6671fde 100644 --- a/common/globals/globals.go +++ b/common/globals/globals.go @@ -5,6 +5,7 @@ import ( ) type LocalMachineInfo struct { + UserID types.UserID `json:"userID"` ExternalIP string `json:"externalIP"` LocalIP string `json:"localIP"` LocationID types.LocationID `json:"locationID"` diff --git a/common/models/datamap/datamap.go b/common/models/datamap/datamap.go index 0d5732e..1713ed8 100644 --- a/common/models/datamap/datamap.go +++ b/common/models/datamap/datamap.go @@ -29,6 +29,7 @@ type SysEventSource interface { var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEventSource]( (*SourceCoordinator)(nil), (*SourceHub)(nil), + (*SourceClient)(nil), )), "type") type SourceCoordinator struct { @@ -67,23 +68,41 @@ func (s *SourceHub) String() string { return fmt.Sprintf("Hub(%d, %s)", s.HubID, s.HubName) } +type SourceClient struct { + serder.Metadata `union:"Client"` + Type string `json:"type"` + UserID cortypes.UserID `json:"userID"` +} + +func (s *SourceClient) GetSourceType() string { + return "Client" +} + +func (s *SourceClient) OnUnionSerializing() { + s.Type = s.GetSourceType() +} + +func (s *SourceClient) String() string { + return fmt.Sprintf("Client(%d)", s.UserID) +} + // 事件体 type SysEventBody interface { GetBodyType() string } var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEventBody]( - (*BodyNewHub)(nil), - (*BodyHubUpdated)(nil), - (*BodyHubDeleted)(nil), + // (*BodyNewHub)(nil), + // (*BodyHubUpdated)(nil), + // (*BodyHubDeleted)(nil), - (*BodyNewStorage)(nil), - (*BodyStorageUpdated)(nil), - (*BodyStorageDeleted)(nil), + (*BodyNewUserSpace)(nil), + (*BodyUserSpaceUpdated)(nil), + (*BodyUserSpaceDeleted)(nil), - (*BodyStorageStats)(nil), - (*BodyHubTransferStats)(nil), - (*BodyHubStorageTransferStats)(nil), + // (*BodyStorageStats)(nil), + // (*BodyHubTransferStats)(nil), + // (*BodyHubStorageTransferStats)(nil), (*BodyBlockTransfer)(nil), (*BodyBlockDistribution)(nil), @@ -99,6 +118,7 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEven (*BodyBucketDeleted)(nil), )), "type") +/* // 新增Hub的事件 type BodyNewHub struct { serder.Metadata `union:"NewHub"` @@ -143,52 +163,54 @@ func (b *BodyHubDeleted) GetBodyType() string { func (b *BodyHubDeleted) OnUnionSerializing() { b.Type = b.GetBodyType() } +*/ // 新增Storage的事件 -type BodyNewStorage struct { - serder.Metadata `union:"NewStorage"` - Info clitypes.Storage `json:"info"` - Type string `json:"type"` +type BodyNewUserSpace struct { + serder.Metadata `union:"NewUserSpace"` + Info clitypes.UserSpace `json:"info"` + Type string `json:"type"` } -func (b *BodyNewStorage) GetBodyType() string { - return "NewStorage" +func (b *BodyNewUserSpace) GetBodyType() string { + return "NewUserSpace" } -func (b *BodyNewStorage) OnUnionSerializing() { +func (b *BodyNewUserSpace) OnUnionSerializing() { b.Type = b.GetBodyType() } // Storage信息更新的事件 -type BodyStorageUpdated struct { - serder.Metadata `union:"StorageUpdated"` - Type string `json:"type"` - Info clitypes.Storage `json:"info"` +type BodyUserSpaceUpdated struct { + serder.Metadata `union:"UserSpaceUpdated"` + Type string `json:"type"` + Info clitypes.UserSpace `json:"info"` } -func (b *BodyStorageUpdated) GetBodyType() string { - return "StorageUpdated" +func (b *BodyUserSpaceUpdated) GetBodyType() string { + return "UserSpaceUpdated" } -func (b *BodyStorageUpdated) OnUnionSerializing() { +func (b *BodyUserSpaceUpdated) OnUnionSerializing() { b.Type = b.GetBodyType() } // Storage删除的事件 -type BodyStorageDeleted struct { - serder.Metadata `union:"StorageDeleted"` - Type string `json:"type"` - StorageID clitypes.StorageID `json:"storageID"` +type BodyUserSpaceDeleted struct { + serder.Metadata `union:"UserSpaceDeleted"` + Type string `json:"type"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` } -func (b *BodyStorageDeleted) GetBodyType() string { - return "StorageDeleted" +func (b *BodyUserSpaceDeleted) GetBodyType() string { + return "UserSpaceDeleted" } -func (b *BodyStorageDeleted) OnUnionSerializing() { +func (b *BodyUserSpaceDeleted) OnUnionSerializing() { b.Type = b.GetBodyType() } +/* // Storage统计信息的事件 type BodyStorageStats struct { serder.Metadata `union:"StorageStats"` @@ -252,6 +274,7 @@ func (b *BodyHubStorageTransferStats) GetBodyType() string { func (b *BodyHubStorageTransferStats) OnUnionSerializing() { b.Type = b.GetBodyType() } +*/ // 块传输的事件 type BodyBlockTransfer struct { @@ -288,24 +311,24 @@ const ( ) type Block struct { - BlockType string `json:"blockType"` - Index int `json:"index"` - StorageID clitypes.StorageID `json:"storageID"` + BlockType string `json:"blockType"` + Index int `json:"index"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` } type DataTransfer struct { - SourceStorageID clitypes.StorageID `json:"sourceStorageID"` - TargetStorageID clitypes.StorageID `json:"targetStorageID"` - TransferBytes int64 `json:"transferBytes"` + SourceUserSpaceID clitypes.UserSpaceID `json:"sourceUserSpaceID"` + TargetUserSpaceID clitypes.UserSpaceID `json:"targetUserSpaceID"` + TransferBytes int64 `json:"transferBytes"` } type BlockChangeClone struct { - serder.Metadata `union:"Clone"` - Type string `json:"type"` - BlockType string `json:"blockType"` - Index int `json:"index"` - SourceStorageID clitypes.StorageID `json:"sourceStorageID"` - TargetStorageID clitypes.StorageID `json:"targetStorageID"` - TransferBytes int64 `json:"transferBytes"` + serder.Metadata `union:"Clone"` + Type string `json:"type"` + BlockType string `json:"blockType"` + Index int `json:"index"` + SourceUserSpaceID clitypes.UserSpaceID `json:"sourceUserSpaceID"` + TargetUserSpaceID clitypes.UserSpaceID `json:"targetUserSpaceID"` + TransferBytes int64 `json:"transferBytes"` } func (b *BlockChangeClone) GetBlockChangeType() string { @@ -318,9 +341,9 @@ func (b *BlockChangeClone) OnUnionSerializing() { type BlockChangeDeleted struct { serder.Metadata `union:"Deleted"` - Type string `json:"type"` - Index int `json:"index"` - StorageID clitypes.StorageID `json:"storageID"` + Type string `json:"type"` + Index int `json:"index"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` } func (b *BlockChangeDeleted) GetBlockChangeType() string { @@ -372,9 +395,9 @@ func (b *BodyBlockDistribution) OnUnionSerializing() { } type BlockDistributionObjectInfo struct { - BlockType string `json:"type"` - Index int `json:"index"` - StorageID clitypes.StorageID `json:"storageID"` + BlockType string `json:"type"` + Index int `json:"index"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` } // 新增或者重新上传Object的事件 diff --git a/common/pkgs/ec/block.go b/common/pkgs/ec/block.go deleted file mode 100644 index 1259376..0000000 --- a/common/pkgs/ec/block.go +++ /dev/null @@ -1,190 +0,0 @@ -package ec - -// import ( -// "errors" -// "io" -// "io/ioutil" - -// "gitlink.org.cn/cloudream/common/pkgs/ipfs" -// "gitlink.org.cn/cloudream/common/pkgs/logger" -// stgglb "gitlink.org.cn/cloudream/storage2/common/globals" -// ) - -// type BlockReader struct { -// ipfsCli *ipfs.PoolClient -// /*将文件分块相关的属性*/ -// //fileHash -// fileHash string -// //fileSize -// fileSize int64 -// //ecK将文件的分块数 -// ecK int -// //chunkSize -// chunkSize int64 - -// /*可选项*/ -// //fastRead,true的时候直接通过hash读block -// jumpReadOpt bool -// } - -// func NewBlockReader() (*BlockReader, error) { -// ipfsClient, err := stgglb.IPFSPool.Acquire() -// if err != nil { -// return nil, err -// } -// //default:fast模式,通过hash直接获取 -// return &BlockReader{ipfsCli: ipfsClient, chunkSize: 256 * 1024, jumpReadOpt: false}, nil -// } - -// func (r *BlockReader) Close() { -// r.ipfsCli.Close() -// } - -// func (r *BlockReader) SetJumpRead(fileHash string, fileSize int64, ecK int) { -// r.fileHash = fileHash -// r.fileSize = fileSize -// r.ecK = ecK -// r.jumpReadOpt = true -// } - -// func (r *BlockReader) SetchunkSize(size int64) { -// r.chunkSize = size -// } - -// func (r *BlockReader) FetchBLock(blockHash string) (io.ReadCloser, error) { -// return r.ipfsCli.OpenRead(blockHash) -// } - -// func (r *BlockReader) FetchBLocks(blockHashs []string) ([]io.ReadCloser, error) { -// readers := make([]io.ReadCloser, len(blockHashs)) -// for i, hash := range blockHashs { -// var err error -// readers[i], err = r.ipfsCli.OpenRead(hash) -// if err != nil { -// return nil, err -// } -// } -// return readers, nil -// } - -// func (r *BlockReader) JumpFetchBlock(innerID int) (io.ReadCloser, error) { -// if !r.jumpReadOpt { -// return nil, nil -// } -// pipeReader, pipeWriter := io.Pipe() -// go func() { -// for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { -// reader, err := r.ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{Offset: i, Length: r.chunkSize}) -// if err != nil { -// pipeWriter.CloseWithError(err) -// return -// } -// data, err := ioutil.ReadAll(reader) -// if err != nil { -// pipeWriter.CloseWithError(err) -// return -// } -// reader.Close() -// _, err = pipeWriter.Write(data) -// if err != nil { -// pipeWriter.CloseWithError(err) -// return -// } -// } -// //如果文件大小不是分块的整数倍,可能需要补0 -// if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { -// //pktNum_1:chunkNum-1 -// pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) -// offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) -// count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset -// if count0 > 0 { -// add0 := make([]byte, count0) -// pipeWriter.Write(add0) -// } -// } -// pipeWriter.Close() -// }() -// return pipeReader, nil -// } - -// // FetchBlock1这个函数废弃了 -// func (r *BlockReader) FetchBlock1(input interface{}, errMsg chan error) (io.ReadCloser, error) { -// /*两种模式下传入第一个参数,但是input的类型不同: -// jumpReadOpt-》true:传入blcokHash, string型,通过哈希直接读 -// jumpReadOpt->false: 传入innerID,int型,选择需要获取的数据块的id -// */ -// var innerID int -// var blockHash string -// switch input.(type) { -// case int: -// // 执行针对整数的逻辑分支 -// if r.jumpReadOpt { -// return nil, errors.New("conflict, wrong input type and jumpReadOpt:true") -// } else { -// innerID = input.(int) -// } -// case string: -// if !r.jumpReadOpt { -// return nil, errors.New("conflict, wrong input type and jumpReadOpt:false") -// } else { -// blockHash = input.(string) -// } -// default: -// return nil, errors.New("wrong input type") -// } -// //开始执行 -// if r.jumpReadOpt { //快速读 -// ipfsCli, err := stgglb.IPFSPool.Acquire() -// if err != nil { -// logger.Warnf("new ipfs client: %s", err.Error()) -// return nil, err -// } -// defer ipfsCli.Close() -// return ipfsCli.OpenRead(blockHash) -// } else { //跳跃读 -// ipfsCli, err := stgglb.IPFSPool.Acquire() -// if err != nil { -// logger.Warnf("new ipfs client: %s", err.Error()) -// return nil, err -// } -// defer ipfsCli.Close() -// pipeReader, pipeWriter := io.Pipe() -// go func() { -// for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { -// reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{i, r.chunkSize}) -// if err != nil { -// pipeWriter.Close() -// errMsg <- err -// return -// } -// data, err := ioutil.ReadAll(reader) -// if err != nil { -// pipeWriter.Close() -// errMsg <- err -// return -// } -// reader.Close() -// _, err = pipeWriter.Write(data) -// if err != nil { -// pipeWriter.Close() -// errMsg <- err -// return -// } -// } -// //如果文件大小不是分块的整数倍,可能需要补0 -// if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { -// //pktNum_1:chunkNum-1 -// pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) -// offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) -// count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset -// if count0 > 0 { -// add0 := make([]byte, count0) -// pipeWriter.Write(add0) -// } -// } -// pipeWriter.Close() -// errMsg <- nil -// }() -// return pipeReader, nil -// } -// } diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index 05f3a68..07c0bc4 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -6,31 +6,31 @@ import ( ) type StorageService interface { - GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage) + GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, *mq.CodeMessage) } // 获取Storage信息 -var _ = Register(Service.GetStorage) +var _ = Register(Service.GetStorageDetails) -type GetStorage struct { +type GetStorageDetails struct { mq.MessageBodyBase - StorageID cortypes.StorageID `json:"storageID"` + StorageIDs []cortypes.StorageID `json:"storageIDs"` } -type GetStorageResp struct { +type GetStorageDetailsResp struct { mq.MessageBodyBase - Storage cortypes.Storage `json:"storage"` + Storage []*cortypes.StorageDetail `json:"storages"` } -func ReqGetStorage(storageID cortypes.StorageID) *GetStorage { - return &GetStorage{ - StorageID: storageID, +func ReqGetStorageDetails(storageIDs []cortypes.StorageID) *GetStorageDetails { + return &GetStorageDetails{ + StorageIDs: storageIDs, } } -func RespGetStorage(stg cortypes.Storage) *GetStorageResp { - return &GetStorageResp{ - Storage: stg, +func RespGetStorageDetails(stgs []*cortypes.StorageDetail) *GetStorageDetailsResp { + return &GetStorageDetailsResp{ + Storage: stgs, } } -func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) { - return mq.Request(Service.GetStorage, client.rabbitCli, msg) +func (client *Client) GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, error) { + return mq.Request(Service.GetStorageDetails, client.rabbitCli, msg) } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index df38e6c..308b7b7 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -1,19 +1,66 @@ package mq import ( + "fmt" + "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage2/coordinator/internal/db" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) { - stg, err := svc.db.Storage().GetByID(svc.db.DefCtx(), msg.StorageID) +func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.GetStorageDetailsResp, *mq.CodeMessage) { + d := svc.db + stgs, err := db.DoTx02(d, func(tx db.SQLContext) ([]*cortypes.StorageDetail, error) { + stgs, err := d.Storage().BatchGetByID(tx, msg.StorageIDs) + if err != nil { + return nil, fmt.Errorf("getting storages: %v", err) + } + stgMap := make(map[cortypes.StorageID]*cortypes.Storage) + for _, stg := range stgs { + s := stg + stgMap[stg.StorageID] = &s + } + + hubIDs := make([]cortypes.HubID, 0, len(stgs)) + for _, stg := range stgs { + if stg.MasterHub != 0 { + hubIDs = append(hubIDs, stg.MasterHub) + } + } + + hubs, err := d.Hub().BatchGetByID(tx, hubIDs) + if err != nil { + return nil, fmt.Errorf("getting hubs: %v", err) + } + + hubMap := make(map[cortypes.HubID]*cortypes.Hub) + for _, hub := range hubs { + h := hub + hubMap[hub.HubID] = &h + } + + details := make([]*cortypes.StorageDetail, len(msg.StorageIDs)) + for i, stgID := range msg.StorageIDs { + stg := stgMap[stgID] + if stg == nil { + continue + } + details[i] = &cortypes.StorageDetail{ + Storage: *stg, + MasterHub: hubMap[stg.MasterHub], + } + } + + return details, nil + }) if err != nil { - logger.Warnf("getting user storage: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") + logger.Warnf("getting storage details: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting storage details: %v", err)) } - return mq.ReplyOK(coormq.RespGetStorage(stg)) + return mq.ReplyOK(coormq.RespGetStorageDetails(stgs)) } diff --git a/coordinator/types/storage.go b/coordinator/types/storage.go index a87f3ba..848b4c3 100644 --- a/coordinator/types/storage.go +++ b/coordinator/types/storage.go @@ -26,6 +26,11 @@ func (s *Storage) String() string { return fmt.Sprintf("%v(%v)", s.Name, s.StorageID) } +type StorageDetail struct { + Storage Storage + MasterHub *Hub +} + // 存储服务地址 type StorageType interface { GetStorageType() string