| @@ -1,60 +0,0 @@ | |||||
| package cmdline | |||||
| /* | |||||
| import ( | |||||
| "fmt" | |||||
| "time" | |||||
| "github.com/jedib0t/go-pretty/v6/table" | |||||
| cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||||
| ) | |||||
| func BucketListUserBuckets(ctx CommandContext) error { | |||||
| buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets() | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| fmt.Printf("Find %d buckets for user %d:\n", len(buckets)) | |||||
| tb := table.NewWriter() | |||||
| tb.AppendHeader(table.Row{"ID", "Name"}) | |||||
| for _, bucket := range buckets { | |||||
| tb.AppendRow(table.Row{bucket.BucketID, bucket.Name}) | |||||
| } | |||||
| fmt.Print(tb.Render()) | |||||
| return nil | |||||
| } | |||||
| func BucketCreateBucket(ctx CommandContext, bucketName string) error { | |||||
| bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(bucketName, time.Now()) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| fmt.Printf("Create bucket %v success, id: %v", bucketName, bucketID) | |||||
| return nil | |||||
| } | |||||
| func BucketDeleteBucket(ctx CommandContext, bucketID cdssdk.BucketID) error { | |||||
| err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(bucketID) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| fmt.Printf("Delete bucket %d success ", bucketID) | |||||
| return nil | |||||
| } | |||||
| func init() { | |||||
| commands.MustAdd(BucketListUserBuckets, "bucket", "ls") | |||||
| commands.MustAdd(BucketCreateBucket, "bucket", "new") | |||||
| commands.MustAdd(BucketDeleteBucket, "bucket", "delete") | |||||
| } | |||||
| */ | |||||
| @@ -1,47 +0,0 @@ | |||||
| package cmdline | |||||
| /* | |||||
| import ( | |||||
| "fmt" | |||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||||
| ) | |||||
| func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { | |||||
| startTime := time.Now() | |||||
| defer func() { | |||||
| fmt.Printf("%v\n", time.Since(startTime).Seconds()) | |||||
| }() | |||||
| hubID, taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, stgID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("start cache moving package: %w", err) | |||||
| } | |||||
| for { | |||||
| complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(hubID, taskID, time.Second*10) | |||||
| if complete { | |||||
| if err != nil { | |||||
| return fmt.Errorf("moving complete with: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| if err != nil { | |||||
| return fmt.Errorf("wait moving: %w", err) | |||||
| } | |||||
| } | |||||
| } | |||||
| func CacheRemovePackage(ctx CommandContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { | |||||
| return ctx.Cmdline.Svc.CacheSvc().CacheRemovePackage(packageID, stgID) | |||||
| } | |||||
| func init() { | |||||
| commands.Add(CacheMovePackage, "cache", "move") | |||||
| commands.Add(CacheRemovePackage, "cache", "remove") | |||||
| } | |||||
| */ | |||||
| @@ -1,88 +0,0 @@ | |||||
| package cmdline | |||||
| /* | |||||
| import ( | |||||
| "fmt" | |||||
| "strings" | |||||
| "github.com/samber/lo" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/lockprovider" | |||||
| ) | |||||
| // DistLockLock 尝试获取分布式锁。 | |||||
| // ctx: 命令上下文,包含执行命令所需的服务和配置。 | |||||
| // lockData: 锁数据数组,每个元素包含锁的路径、名称和目标。 | |||||
| // 返回值: 获取锁失败时返回错误。 | |||||
| func DistLockLock(ctx CommandContext, lockData []string) error { | |||||
| req := distlock.LockRequest{} | |||||
| // 解析锁数据,填充请求结构体。 | |||||
| for _, lock := range lockData { | |||||
| l, err := parseOneLock(lock) | |||||
| if err != nil { | |||||
| return fmt.Errorf("parse lock data %s failed, err: %w", lock, err) | |||||
| } | |||||
| req.Locks = append(req.Locks, l) | |||||
| } | |||||
| // 请求分布式锁。 | |||||
| reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req) | |||||
| if err != nil { | |||||
| return fmt.Errorf("acquire locks failed, err: %w", err) | |||||
| } | |||||
| fmt.Printf("%s\n", reqID) | |||||
| return nil | |||||
| } | |||||
| // parseOneLock 解析单个锁数据。 | |||||
| // lockData: 待解析的锁数据,格式为"路径/名称@目标字符串"。 | |||||
| // 返回值: 解析得到的锁对象和可能的错误。 | |||||
| func parseOneLock(lockData string) (distlock.Lock, error) { | |||||
| var lock distlock.Lock | |||||
| // 解析锁的路径、名称和目标。 | |||||
| fullPathAndTarget := strings.Split(lockData, "@") | |||||
| if len(fullPathAndTarget) != 2 { | |||||
| return lock, fmt.Errorf("lock data must contains lock path, name and target") | |||||
| } | |||||
| pathAndName := strings.Split(fullPathAndTarget[0], "/") | |||||
| if len(pathAndName) < 2 { | |||||
| return lock, fmt.Errorf("lock data must contains lock path, name and target") | |||||
| } | |||||
| lock.Path = pathAndName[0 : len(pathAndName)-1] | |||||
| lock.Name = pathAndName[len(pathAndName)-1] | |||||
| // 解析目标字符串。 | |||||
| target := lockprovider.NewStringLockTarget() | |||||
| comps := strings.Split(fullPathAndTarget[1], "/") | |||||
| for _, comp := range comps { | |||||
| target.Add(lo.Map(strings.Split(comp, "."), func(str string, index int) any { return str })...) | |||||
| } | |||||
| lock.Target = *target | |||||
| return lock, nil | |||||
| } | |||||
| // DistLockUnlock 释放分布式锁。 | |||||
| // ctx: 命令上下文。 | |||||
| // reqID: 请求ID,对应获取锁时返回的ID。 | |||||
| // 返回值: 释放锁失败时返回错误。 | |||||
| func DistLockUnlock(ctx CommandContext, reqID string) error { | |||||
| ctx.Cmdline.Svc.DistLock.Release(reqID) | |||||
| return nil | |||||
| } | |||||
| // 初始化命令行工具,注册分布式锁相关命令。 | |||||
| func init() { | |||||
| commands.MustAdd(DistLockLock, "distlock", "lock") | |||||
| commands.MustAdd(DistLockUnlock, "distlock", "unlock") | |||||
| } | |||||
| */ | |||||
| @@ -1,69 +0,0 @@ | |||||
| package cmdline | |||||
| /* | |||||
| import ( | |||||
| "fmt" | |||||
| "os" | |||||
| "path/filepath" | |||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||||
| ) | |||||
| // 必须添加的命令函数,用于处理对象上传。 | |||||
| // | |||||
| // ctx: 命令上下文,提供必要的服务和环境配置。 | |||||
| // packageID: 上传套餐的唯一标识。 | |||||
| // rootPath: 本地文件系统中待上传文件的根目录。 | |||||
| // storageAffinity: 偏好的节点ID列表,上传任务可能会分配到这些节点上。 | |||||
| // 返回值: 执行过程中遇到的任何错误。 | |||||
| var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, spaceAffinity []cdssdk.UserSpaceID) error { | |||||
| // 记录函数开始时间,用于计算执行时间。 | |||||
| startTime := time.Now() | |||||
| defer func() { | |||||
| // 打印函数执行时间。 | |||||
| fmt.Printf("%v\n", time.Since(startTime).Seconds()) | |||||
| }() | |||||
| // 根据节点亲和性列表设置首选上传节点。 | |||||
| var storageAff cdssdk.UserSpaceID | |||||
| if len(spaceAffinity) > 0 { | |||||
| storageAff = spaceAffinity[0] | |||||
| } | |||||
| up, err := ctx.Cmdline.Svc.Uploader.BeginUpdate(packageID, storageAff, nil, nil) | |||||
| if err != nil { | |||||
| return fmt.Errorf("begin updating package: %w", err) | |||||
| } | |||||
| defer up.Abort() | |||||
| err = filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | |||||
| if err != nil { | |||||
| return nil | |||||
| } | |||||
| if fi.IsDir() { | |||||
| return nil | |||||
| } | |||||
| file, err := os.Open(fname) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| defer file.Close() | |||||
| return up.Upload(fname, file) | |||||
| }) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| _, err = up.Commit() | |||||
| if err != nil { | |||||
| return fmt.Errorf("commit updating package: %w", err) | |||||
| } | |||||
| return nil | |||||
| }, "obj", "upload") | |||||
| */ | |||||
| @@ -1,190 +0,0 @@ | |||||
| package cmdline | |||||
| /* | |||||
| import ( | |||||
| "fmt" | |||||
| "io" | |||||
| "os" | |||||
| "path/filepath" | |||||
| "time" | |||||
| "github.com/jedib0t/go-pretty/v6/table" | |||||
| cdssdk "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" | |||||
| ) | |||||
| // PackageListBucketPackages 列出指定存储桶中的所有包裹。 | |||||
| // | |||||
| // 参数: | |||||
| // | |||||
| // ctx - 命令上下文。 | |||||
| // bucketID - 存储桶ID。 | |||||
| // | |||||
| // 返回值: | |||||
| // | |||||
| // error - 操作过程中发生的任何错误。 | |||||
| func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { | |||||
| packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(bucketID) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID) | |||||
| tb := table.NewWriter() | |||||
| tb.AppendHeader(table.Row{"ID", "Name", "BucketID"}) | |||||
| for _, obj := range packages { | |||||
| tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID}) | |||||
| } | |||||
| fmt.Println(tb.Render()) | |||||
| return nil | |||||
| } | |||||
| // PackageDownloadPackage 下载指定包裹的所有文件到本地目录。 | |||||
| // | |||||
| // 参数: | |||||
| // | |||||
| // ctx - 命令上下文。 | |||||
| // packageID - 包裹ID。 | |||||
| // outputDir - 输出目录路径。 | |||||
| // | |||||
| // 返回值: | |||||
| // | |||||
| // error - 操作过程中发生的任何错误。 | |||||
| func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { | |||||
| startTime := time.Now() | |||||
| defer func() { | |||||
| fmt.Printf("%v\n", time.Since(startTime).Seconds()) | |||||
| }() | |||||
| err := os.MkdirAll(outputDir, os.ModePerm) | |||||
| if err != nil { | |||||
| return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) | |||||
| } | |||||
| // 初始化文件下载迭代器 | |||||
| objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(packageID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("download object failed, err: %w", err) | |||||
| } | |||||
| defer objIter.Close() | |||||
| madeDirs := make(map[string]bool) | |||||
| for { | |||||
| objInfo, err := objIter.MoveNext() | |||||
| if err == iterator.ErrNoMoreItem { | |||||
| break | |||||
| } | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| err = func() error { | |||||
| defer objInfo.File.Close() | |||||
| fullPath := filepath.Join(outputDir, objInfo.Object.Path) | |||||
| dirPath := filepath.Dir(fullPath) | |||||
| if !madeDirs[dirPath] { | |||||
| if err := os.MkdirAll(dirPath, 0755); err != nil { | |||||
| return fmt.Errorf("creating object dir: %w", err) | |||||
| } | |||||
| madeDirs[dirPath] = true | |||||
| } | |||||
| outputFile, err := os.Create(fullPath) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating object file: %w", err) | |||||
| } | |||||
| defer outputFile.Close() | |||||
| _, err = io.Copy(outputFile, objInfo.File) | |||||
| if err != nil { | |||||
| return fmt.Errorf("copy object data to local file failed, err: %w", err) | |||||
| } | |||||
| return nil | |||||
| }() | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| } | |||||
| return nil | |||||
| } | |||||
| // PackageCreatePackage 在指定存储桶中创建新包裹。 | |||||
| // | |||||
| // 参数: | |||||
| // | |||||
| // ctx - 命令上下文。 | |||||
| // bucketID - 存储桶ID。 | |||||
| // name - 包裹名称。 | |||||
| // | |||||
| // 返回值: | |||||
| // | |||||
| // error - 操作过程中发生的任何错误。 | |||||
| func PackageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string) error { | |||||
| pkgID, err := ctx.Cmdline.Svc.PackageSvc().Create(bucketID, name) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| fmt.Printf("%v\n", pkgID) | |||||
| return nil | |||||
| } | |||||
| // PackageDeletePackage 删除指定的包裹。 | |||||
| // | |||||
| // 参数: | |||||
| // | |||||
| // ctx - 命令上下文。 | |||||
| // packageID - 包裹ID。 | |||||
| // | |||||
| // 返回值: | |||||
| // | |||||
| // error - 操作过程中发生的任何错误。 | |||||
| func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { | |||||
| err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(packageID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("delete package %d failed, err: %w", packageID, err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| // PackageGetCachedStorages 获取指定包裹的缓存节点信息。 | |||||
| // | |||||
| // 参数: | |||||
| // | |||||
| // ctx - 命令上下文。 | |||||
| // packageID - 包裹ID。 | |||||
| // | |||||
| // 返回值: | |||||
| // | |||||
| // error - 操作过程中发生的任何错误。 | |||||
| func PackageGetCachedStorages(ctx CommandContext, packageID cdssdk.PackageID) error { | |||||
| resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedStorages(packageID) | |||||
| fmt.Printf("resp: %v\n", resp) | |||||
| if err != nil { | |||||
| return fmt.Errorf("get package %d cached storages failed, err: %w", packageID, err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| // 初始化命令行工具的包相关命令。 | |||||
| func init() { | |||||
| commands.MustAdd(PackageListBucketPackages, "pkg", "ls") | |||||
| commands.MustAdd(PackageDownloadPackage, "pkg", "get") | |||||
| commands.MustAdd(PackageCreatePackage, "pkg", "new") | |||||
| commands.MustAdd(PackageDeletePackage, "pkg", "delete") | |||||
| // 查询package缓存到哪些节点 | |||||
| commands.MustAdd(PackageGetCachedStorages, "pkg", "cached") | |||||
| } | |||||
| */ | |||||
| @@ -16,7 +16,6 @@ import ( | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/http" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/http" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/repl" | |||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" | ||||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" | "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" | ||||
| @@ -179,8 +178,8 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { | |||||
| defer spaceSync.Stop() | defer spaceSync.Stop() | ||||
| // 交互式命令行 | // 交互式命令行 | ||||
| rep := repl.New(db, tktk) | |||||
| replCh := rep.Start() | |||||
| // rep := repl.New(db, tktk) | |||||
| // replCh := rep.Start() | |||||
| // 挂载 | // 挂载 | ||||
| mntCfg := config.Cfg().Mount | mntCfg := config.Cfg().Mount | ||||
| @@ -224,7 +223,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { | |||||
| conColEvt := conColChan.Receive() | conColEvt := conColChan.Receive() | ||||
| acStatEvt := acStatChan.Receive() | acStatEvt := acStatChan.Receive() | ||||
| spaceSyncEvt := spaceSyncChan.Receive() | spaceSyncEvt := spaceSyncChan.Receive() | ||||
| replEvt := replCh.Receive() | |||||
| // replEvt := replCh.Receive() | |||||
| httpEvt := httpChan.Receive() | httpEvt := httpChan.Receive() | ||||
| mntEvt := mntChan.Receive() | mntEvt := mntChan.Receive() | ||||
| @@ -309,18 +308,18 @@ loop: | |||||
| } | } | ||||
| spaceSyncEvt = spaceSyncChan.Receive() | spaceSyncEvt = spaceSyncChan.Receive() | ||||
| case e := <-replEvt.Chan(): | |||||
| if e.Err != nil { | |||||
| logger.Errorf("receive repl event: %v", err) | |||||
| break loop | |||||
| } | |||||
| // case e := <-replEvt.Chan(): | |||||
| // if e.Err != nil { | |||||
| // logger.Errorf("receive repl event: %v", err) | |||||
| // break loop | |||||
| // } | |||||
| switch e.Value.(type) { | |||||
| case repl.ExitEvent: | |||||
| logger.Info("exit by repl") | |||||
| break loop | |||||
| } | |||||
| replEvt = replCh.Receive() | |||||
| // switch e.Value.(type) { | |||||
| // case repl.ExitEvent: | |||||
| // logger.Info("exit by repl") | |||||
| // break loop | |||||
| // } | |||||
| // replEvt = replCh.Receive() | |||||
| case e := <-httpEvt.Chan(): | case e := <-httpEvt.Chan(): | ||||
| if e.Err != nil { | if e.Err != nil { | ||||
| @@ -349,4 +348,7 @@ loop: | |||||
| mntEvt = mntChan.Receive() | mntEvt = mntChan.Receive() | ||||
| } | } | ||||
| } | } | ||||
| // TODO 优雅退出 | |||||
| os.Exit(0) | |||||
| } | } | ||||