diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index 118fdb3..b184c2a 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -21,6 +21,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" "google.golang.org/grpc" @@ -122,8 +123,11 @@ func serve(configPath string) { // 初始化下载器 dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr) + // 初始化上传器 + uploader := uploader.NewUploader(distlock, &conCol, stgMgr) + // 初始化任务管理器 - taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr) + taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr, uploader) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 4d455e1..9050a5d 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -4,11 +4,11 @@ import ( "fmt" "time" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -17,7 +17,7 @@ import ( // 包含包的ID和上传的对象列表 type CreatePackageResult struct { PackageID cdssdk.PackageID - Objects []cmd.ObjectUploadResult + Objects []cdssdk.Object } // CreatePackage 定义创建包的任务结构 @@ -84,11 +84,48 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c return } - uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.Package.PackageID, t.objIter, t.stgAffinity).Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, - Connectivity: ctx.connectivity, - StgMgr: ctx.stgMgr, - }) + up, err := ctx.uploader.BeginUpdate(t.userID, createResp.Package.PackageID, t.stgAffinity) + if err != nil { + err = fmt.Errorf("begin update: %w", err) + log.Error(err.Error()) + // 完成任务并设置移除延迟 + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer up.Abort() + + for { + obj, err := t.objIter.MoveNext() + if err == iterator.ErrNoMoreItem { + break + } + if err != nil { + log.Error(err.Error()) + // 完成任务并设置移除延迟 + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + // 上传对象 + err = up.Upload(obj.Path, obj.Size, obj.File) + if err != nil { + err = fmt.Errorf("uploading object: %w", err) + log.Error(err.Error()) + // 完成任务并设置移除延迟 + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + } + + // 结束上传 + uploadRet, err := up.Commit() if err != nil { err = fmt.Errorf("uploading objects: %w", err) log.Error(err.Error()) @@ -100,7 +137,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c } t.Result.PackageID = createResp.Package.PackageID - t.Result.Objects = uploadRet.Objects + t.Result.Objects = lo.Values(uploadRet.Objects) // 完成任务并设置移除延迟 complete(nil, CompleteOption{ diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 43e980b..f653e67 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) // TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器 @@ -16,6 +17,7 @@ type TaskContext struct { downloader *downloader.Downloader accessStat *accessstat.AccessStat stgMgr *mgr.Manager + uploader *uploader.Uploader } // CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果 @@ -33,12 +35,13 @@ type Task = task.Task[TaskContext] // CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *mgr.Manager) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *mgr.Manager, uploader *uploader.Uploader) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, downloader: downloader, accessStat: accessStat, stgMgr: stgMgr, + uploader: uploader, }) } diff --git a/client/internal/cmdline/newloadp.go b/client/internal/cmdline/newloadp.go new file mode 100644 index 0000000..08c48d3 --- /dev/null +++ b/client/internal/cmdline/newloadp.go @@ -0,0 +1,100 @@ +package cmdline + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func init() { + cmd := &cobra.Command{ + Use: "newloadp localPath bucketID packageName storageID...", + Short: "Create a new package then upload an load files to it at the same time", + Args: cobra.MinimumNArgs(4), + Run: func(cmd *cobra.Command, args []string) { + cmdCtx := GetCmdCtx(cmd) + localPath := args[0] + + bktID, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + fmt.Println(err) + return + } + + packageName := args[2] + storageIDs := make([]cdssdk.StorageID, 0) + for _, sID := range args[3:] { + sID, err := strconv.ParseInt(sID, 10, 64) + if err != nil { + fmt.Println(err) + return + } + storageIDs = append(storageIDs, cdssdk.StorageID(sID)) + } + + newloadp(cmdCtx, localPath, cdssdk.BucketID(bktID), packageName, storageIDs) + }, + } + + rootCmd.AddCommand(cmd) +} + +func newloadp(cmdCtx *CommandContext, path string, bucketID cdssdk.BucketID, packageName string, storageIDs []cdssdk.StorageID) { + userID := cdssdk.UserID(1) + + up, err := cmdCtx.Cmdline.Svc.Uploader.BeginCreateLoad(userID, bucketID, packageName, storageIDs) + if err != nil { + fmt.Println(err) + return + } + defer up.Abort() + + var fileCount int + var totalSize int64 + err = filepath.WalkDir(path, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if fi.IsDir() { + return nil + } + + fileCount++ + + info, err := fi.Info() + if err != nil { + return err + } + totalSize += info.Size() + + file, err := os.Open(fname) + if err != nil { + return err + } + defer file.Close() + + return up.Upload(fname, info.Size(), file) + }) + if err != nil { + fmt.Println(err.Error()) + return + } + + ret, err := up.Commit() + if err != nil { + fmt.Printf("committing package: %v\n", err) + return + } + + wr := table.NewWriter() + wr.AppendHeader(table.Row{"ID", "Name", "FileCount", "TotalSize", "LoadedDirs"}) + wr.AppendRow(table.Row{ret.Package.PackageID, ret.Package.Name, fileCount, totalSize, strings.Join(ret.LoadedDirs, "\n")}) + fmt.Println(wr.Render()) +} diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go index 7d472e2..f0744ab 100644 --- a/client/internal/cmdline/object.go +++ b/client/internal/cmdline/object.go @@ -7,7 +7,6 @@ import ( "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) // 必须添加的命令函数,用于处理对象上传。 @@ -28,56 +27,48 @@ var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath // 模拟或获取用户ID。 userID := cdssdk.UserID(1) - // 遍历根目录下所有文件,收集待上传的文件路径。 - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - // 仅添加非目录文件路径。 - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - // 目录遍历失败处理。 - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - // 根据节点亲和性列表设置首选上传节点。 var storageAff cdssdk.StorageID if len(storageAffinity) > 0 { storageAff = storageAffinity[0] } - // 创建上传对象迭代器。 - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - // 开始上传任务。 - taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploading(userID, packageID, objIter, storageAff) + up, err := ctx.Cmdline.Svc.Uploader.BeginUpdate(userID, packageID, storageAff) if err != nil { - // 上传任务启动失败处理。 - return fmt.Errorf("update objects to package %d failed, err: %w", packageID, err) + return fmt.Errorf("begin updating package: %w", err) } + defer up.Abort() - // 循环等待上传任务完成。 - for { - // 每5秒检查一次上传状态。 - complete, _, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) - if complete { - // 上传完成,检查是否有错误。 - if err != nil { - return fmt.Errorf("uploading objects: %w", err) - } + err = filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + if fi.IsDir() { return nil } - // 等待过程中发生错误处理。 + info, err := fi.Info() + if err != nil { + return err + } + file, err := os.Open(fname) if err != nil { - return fmt.Errorf("wait updating: %w", err) + return err } + defer file.Close() + + return up.Upload(fname, info.Size(), file) + }) + if err != nil { + return err } + + _, err = up.Commit() + if err != nil { + return fmt.Errorf("commit updating package: %w", err) + } + + return nil + }, "obj", "upload") diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go index c7fdec3..bac7a36 100644 --- a/client/internal/cmdline/put.go +++ b/client/internal/cmdline/put.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) func init() { @@ -64,56 +63,56 @@ func init() { return } } + var storageAff cdssdk.StorageID + if stgID != 0 { + storageAff = cdssdk.StorageID(stgID) + } + + up, err := cmdCtx.Cmdline.Svc.Uploader.BeginUpdate(userID, pkg.PackageID, storageAff) + if err != nil { + fmt.Printf("begin updating package: %v\n", err) + return + } + defer up.Abort() var fileCount int var totalSize int64 - var uploadFilePathes []string err = filepath.WalkDir(local, func(fname string, fi os.DirEntry, err error) error { if err != nil { return nil } - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - fileCount++ + if fi.IsDir() { + return nil + } - info, err := fi.Info() - if err == nil { - totalSize += info.Size() - } + fileCount++ + + info, err := fi.Info() + if err != nil { + return err + } + totalSize += info.Size() + + file, err := os.Open(fname) + if err != nil { + return err } + defer file.Close() - return nil + return up.Upload(fname, info.Size(), file) }) if err != nil { - fmt.Printf("walking directory: %v\n", err) + fmt.Println(err.Error()) return } - var storageAff cdssdk.StorageID - if stgID != 0 { - storageAff = cdssdk.StorageID(stgID) - } - - objIter := iterator.NewUploadingObjectIterator(local, uploadFilePathes) - taskID, err := cmdCtx.Cmdline.Svc.ObjectSvc().StartUploading(userID, pkg.PackageID, objIter, storageAff) + _, err = up.Commit() if err != nil { - fmt.Printf("start uploading objects: %v\n", err) + fmt.Printf("committing package: %v\n", err) return } - for { - complete, _, err := cmdCtx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) - if err != nil { - fmt.Printf("uploading objects: %v\n", err) - return - } - - if complete { - break - } - } - fmt.Printf("Put %v files (%v) to %s in %v.\n", fileCount, bytesize.ByteSize(totalSize), remote, time.Since(startTime)) }, } diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 70b02f9..bded3c6 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -1,16 +1,17 @@ package http import ( + "fmt" "io" "mime/multipart" "net/http" "net/url" "path" - "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" @@ -41,50 +42,52 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - var err error - - objIter := mapMultiPartFileToUploadingObject(req.Files) - - taskID, err := s.svc.ObjectSvc().StartUploading(req.Info.UserID, req.Info.PackageID, objIter, req.Info.StorageAffinity) - + up, err := s.svc.Uploader.BeginUpdate(req.Info.UserID, req.Info.PackageID, req.Info.Affinity) if err != nil { - log.Warnf("start uploading object task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + log.Warnf("begin update: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err))) return } + defer up.Abort() + + var pathes []string + for _, file := range req.Files { + f, err := file.Open() + if err != nil { + log.Warnf("open file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("open file %v: %v", file.Filename, err))) + return + } - for { - complete, objs, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) - if complete { - if err != nil { - log.Warnf("uploading object: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading object failed")) - return - } - - uploadeds := make([]cdsapi.UploadedObject, len(objs.Objects)) - for i, obj := range objs.Objects { - err := "" - if obj.Error != nil { - err = obj.Error.Error() - } - o := obj.Object - uploadeds[i] = cdsapi.UploadedObject{ - Object: &o, - Error: err, - } - } - - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadResp{Uploadeds: uploadeds})) + path, err := url.PathUnescape(file.Filename) + if err != nil { + log.Warnf("unescape filename: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("unescape filename %v: %v", file.Filename, err))) return } + err = up.Upload(path, file.Size, f) if err != nil { - log.Warnf("waiting task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) + log.Warnf("uploading file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err))) return } + pathes = append(pathes, path) } + + ret, err := up.Commit() + if err != nil { + log.Warnf("commit update: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit update: %v", err))) + return + } + + uploadeds := make([]cdssdk.Object, len(pathes)) + for i := range pathes { + uploadeds[i] = ret.Objects[pathes[i]] + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadResp{Uploadeds: uploadeds})) } func (s *ObjectService) Download(ctx *gin.Context) { diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 89d1de6..d4b1c80 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -1,17 +1,16 @@ package http import ( + "fmt" "mime/multipart" "net/http" "net/url" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" - - stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) // PackageService 包服务,负责处理包相关的HTTP请求。 @@ -88,6 +87,69 @@ func (s *PackageService) Create(ctx *gin.Context) { })) } +type PackageCreateLoad struct { + Info cdsapi.PackageCreateLoad `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` +} + +func (s *PackageService) CreateLoad(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.CreateLoad") + + var req PackageCreateLoad + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + up, err := s.svc.Uploader.BeginCreateLoad(req.Info.UserID, req.Info.BucketID, req.Info.Name, req.Info.LoadTo) + if err != nil { + log.Warnf("begin package create load: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin package create load: %v", err))) + return + } + defer up.Abort() + + var pathes []string + for _, file := range req.Files { + f, err := file.Open() + if err != nil { + log.Warnf("open file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("open file %v: %v", file.Filename, err))) + return + } + + path, err := url.PathUnescape(file.Filename) + if err != nil { + log.Warnf("unescape filename: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("unescape filename %v: %v", file.Filename, err))) + return + } + + err = up.Upload(path, file.Size, f) + if err != nil { + log.Warnf("uploading file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err))) + return + } + pathes = append(pathes, path) + } + + ret, err := up.Commit() + if err != nil { + log.Warnf("commit create load: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit create load: %v", err))) + return + } + + objs := make([]cdssdk.Object, len(pathes)) + for i := range pathes { + objs[i] = ret.Objects[pathes[i]] + } + + ctx.JSON(http.StatusOK, OK(cdsapi.PackageCreateLoadResp{Package: ret.Package, Objects: objs, LoadedDirs: ret.LoadedDirs})) + +} func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") @@ -173,27 +235,3 @@ func (s *PackageService) GetLoadedStorages(ctx *gin.Context) { StorageIDs: stgIDs, })) } - -// mapMultiPartFileToUploadingObject 将multipart文件转换为上传对象的迭代器。 -func mapMultiPartFileToUploadingObject(files []*multipart.FileHeader) stgiter.UploadingObjectIterator { - return iterator.Map[*multipart.FileHeader]( - iterator.Array(files...), - func(file *multipart.FileHeader) (*stgiter.IterUploadingObject, error) { - stream, err := file.Open() - if err != nil { - return nil, err - } - - fileName, err := url.PathUnescape(file.Filename) - if err != nil { - return nil, err - } - - return &stgiter.IterUploadingObject{ - Path: fileName, - Size: file.Size, - File: stream, - }, nil - }, - ) -} diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 6773dc4..4dc05ec 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -2,16 +2,13 @@ package services import ( "fmt" - "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" - mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -25,30 +22,6 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } -// StartUploading 开始上传对象。 -// userID: 用户ID。 -// packageID: 套件ID。 -// objIter: 正在上传的对象迭代器。 -// storageAffinity: 节点亲和性,指定对象上传的首选节点。 -// 返回值: 任务ID和错误信息。 -func (svc *ObjectService) StartUploading(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, storageAffinity cdssdk.StorageID) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewUploadObjects(userID, packageID, objIter, storageAffinity)) - return tsk.ID(), nil -} - -// WaitUploading 等待上传任务完成。 -// taskID: 任务ID。 -// waitTimeout: 等待超时时间。 -// 返回值: 任务是否完成、上传结果和错误信息。 -func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration) (bool, *mytask.UploadObjectsResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*mytask.UploadObjects) - return true, updatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdsapi.UpdatingObject) ([]cdssdk.ObjectID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/client/internal/services/service.go b/client/internal/services/service.go index d369dc7..476e126 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/task" "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) // Service 结构体封装了分布锁服务和任务管理服务。 @@ -15,13 +16,15 @@ type Service struct { TaskMgr *task.Manager Downloader *downloader.Downloader AccessStat *accessstat.AccessStat + Uploader *uploader.Uploader } -func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, accStat *accessstat.AccessStat) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, accStat *accessstat.AccessStat, uploder *uploader.Uploader) (*Service, error) { return &Service{ DistLock: distlock, TaskMgr: taskMgr, Downloader: downloader, AccessStat: accStat, + Uploader: uploder, }, nil } diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go deleted file mode 100644 index aca89fd..0000000 --- a/client/internal/task/upload_objects.go +++ /dev/null @@ -1,52 +0,0 @@ -// package task 定义了与任务处理相关的结构体和函数。 -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" // 引入task包,提供任务处理的通用功能。 - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" // 引入cdssdk包,提供云存储相关的SDK接口。 - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" // 引入cmd包,提供命令执行相关的功能。 - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" // 引入iterator包,提供迭代器相关的功能。 -) - -// UploadObjectsResult 定义了上传对象结果的类型,继承自cmd包的UploadObjectsResult类型。 -type UploadObjectsResult = cmd.UploadObjectsResult - -// UploadObjects 定义了上传对象的任务结构体,包含上传命令和执行结果。 -type UploadObjects struct { - cmd cmd.UploadObjects // cmd字段定义了上传对象的具体操作。 - - Result *UploadObjectsResult // Result字段存储上传对象操作的结果。 -} - -// NewUploadObjects 创建并返回一个新的UploadObjects实例。 -// userID: 用户ID,标识发起上传请求的用户。 -// packageID: 包ID,标识被上传的对象所属的包。 -// objectIter: 上传对象迭代器,用于遍历和上传多个对象。 -// storageAffinity: 节点亲和性,指定上传任务首选的执行节点。 -// 返回值为初始化后的UploadObjects指针。 -func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, storageAffinity cdssdk.StorageID) *UploadObjects { - return &UploadObjects{ - cmd: *cmd.NewUploadObjects(userID, packageID, objectIter, storageAffinity), - } -} - -// Execute 执行上传对象的任务。 -// task: 任务实例,包含任务的上下文信息。 -// ctx: 任务执行的上下文,包括分布式锁和网络连接性等信息。 -// complete: 任务完成时的回调函数。 -// 该函数负责调用上传命令的Execute方法,处理上传结果,并通过回调函数报告任务完成情况。 -func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, // 使用任务上下文中的分布式锁。 - Connectivity: ctx.connectivity, // 使用任务上下文中的网络连接性信息。 - StgMgr: ctx.stgMgr, - }) - - t.Result = ret // 存储上传结果。 - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, // 设置任务完成后的清理延迟为1分钟。 - }) -} diff --git a/client/main.go b/client/main.go index e8dd6d0..96cec4e 100644 --- a/client/main.go +++ b/client/main.go @@ -20,6 +20,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) func main() { @@ -39,6 +40,7 @@ func main() { stgglb.InitMQPool(&config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) + // 连接性信息收集 var conCol connectivity.Collector if config.Cfg().Local.HubID != nil { //如果client与某个hub处于同一台机器,则使用这个hub的连通性信息 @@ -73,6 +75,7 @@ func main() { conCol.CollectInPlace() } + // 分布式锁 distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error()) @@ -80,19 +83,26 @@ func main() { } go serveDistLock(distlockSvc) + // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 ReportInterval: time.Second * 10, }) go serveAccessStat(acStat) + // 存储管理器 stgMgr := mgr.NewManager() + // 任务管理器 taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr) + // 下载器 dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr) - svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat) + // 上传器 + uploader := uploader.NewUploader(distlockSvc, &conCol, stgMgr) + + svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader) if err != nil { logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) diff --git a/common/pkgs/cmd/cmd.go b/common/pkgs/cmd/cmd.go deleted file mode 100644 index 0149075..0000000 --- a/common/pkgs/cmd/cmd.go +++ /dev/null @@ -1,3 +0,0 @@ -package cmd - -// 这个包主要存放一些公共的业务逻辑代码 diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go deleted file mode 100644 index 58695f4..0000000 --- a/common/pkgs/cmd/upload_objects.go +++ /dev/null @@ -1,240 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "io" - "math" - "math/rand" - "time" - - "github.com/samber/lo" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sort2" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" -) - -type UploadObjects struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - objectIter iterator.UploadingObjectIterator - stgAffinity cdssdk.StorageID -} - -type UploadObjectsResult struct { - Objects []ObjectUploadResult -} - -type ObjectUploadResult struct { - Info *iterator.IterUploadingObject - Error error - Object cdssdk.Object -} - -type UploadStorageInfo struct { - Storage stgmod.StorageDetail - Delay time.Duration - IsSameLocation bool -} - -type UploadObjectsContext struct { - Distlock *distlock.Service - Connectivity *connectivity.Collector - StgMgr *mgr.Manager -} - -func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, stgAffinity cdssdk.StorageID) *UploadObjects { - return &UploadObjects{ - userID: userID, - packageID: packageID, - objectIter: objIter, - stgAffinity: stgAffinity, - } -} - -func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) { - defer t.objectIter.Close() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(t.userID)) - if err != nil { - return nil, fmt.Errorf("getting user storages: %w", err) - } - - cons := ctx.Connectivity.GetAll() - var userStgs []UploadStorageInfo - for _, stg := range getUserStgsResp.Storages { - if stg.MasterHub == nil { - continue - } - - delay := time.Duration(math.MaxInt64) - - con, ok := cons[stg.MasterHub.HubID] - if ok && con.Delay != nil { - delay = *con.Delay - } - - userStgs = append(userStgs, UploadStorageInfo{ - Storage: stg, - Delay: delay, - IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, - }) - } - - if len(userStgs) == 0 { - return nil, fmt.Errorf("user no available storages") - } - - // 给上传节点的IPFS加锁 - lockBlder := reqbuilder.NewBuilder() - for _, us := range userStgs { - lockBlder.Shard().Buzy(us.Storage.Storage.StorageID) - } - // TODO 考虑加Object的Create锁 - // 防止上传的副本被清除 - ipfsMutex, err := lockBlder.MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer ipfsMutex.Unlock() - - rets, err := uploadAndUpdatePackage(ctx, t.packageID, t.objectIter, userStgs, t.stgAffinity) - if err != nil { - return nil, err - } - - return &UploadObjectsResult{ - Objects: rets, - }, nil -} - -// chooseUploadStorage 选择一个上传文件的节点 -// 1. 选择设置了亲和性的节点 -// 2. 从与当前客户端相同地域的节点中随机选一个 -// 3. 没有的话从所有节点选择延迟最低的节点 -func chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo { - if stgAffinity > 0 { - aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity }) - if ok { - return aff - } - } - - sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationStorages) > 0 { - return sameLocationStorages[rand.Intn(len(sameLocationStorages))] - } - - // 选择延迟最低的节点 - storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) - - return storages[0] -} - -func uploadAndUpdatePackage(ctx *UploadObjectsContext, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userStorages []UploadStorageInfo, stgAffinity cdssdk.StorageID) ([]ObjectUploadResult, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - // 为所有文件选择相同的上传节点 - uploadStorage := chooseUploadStorage(userStorages, stgAffinity) - - var uploadRets []ObjectUploadResult - //上传文件夹 - var adds []coormq.AddObjectEntry - for { - objInfo, err := objectIter.MoveNext() - if err == iterator.ErrNoMoreItem { - break - } - if err != nil { - return nil, fmt.Errorf("reading object: %w", err) - } - err = func() error { - defer objInfo.File.Close() - - uploadTime := time.Now() - fileHash, err := uploadFile(ctx, objInfo.File, uploadStorage) - if err != nil { - return fmt.Errorf("uploading file: %w", err) - } - - uploadRets = append(uploadRets, ObjectUploadResult{ - Info: objInfo, - Error: err, - }) - - adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadStorage.Storage.Storage.StorageID)) - return nil - }() - if err != nil { - return nil, err - } - } - - updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) - if err != nil { - return nil, fmt.Errorf("updating package: %w", err) - } - - updatedObjs := make(map[string]*cdssdk.Object) - for _, obj := range updateResp.Added { - o := obj - updatedObjs[obj.Path] = &o - } - - for i := range uploadRets { - obj := updatedObjs[uploadRets[i].Info.Path] - if obj == nil { - uploadRets[i].Error = fmt.Errorf("object %s not found in package", uploadRets[i].Info.Path) - continue - } - uploadRets[i].Object = *obj - } - - return uploadRets, nil -} - -func uploadFile(ctx *UploadObjectsContext, file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) { - ft := ioswitch2.NewFromTo() - fromExec, hd := ioswitch2.NewFromDriver(-1) - ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*uploadStg.Storage.MasterHub, uploadStg.Storage.Storage, -1, "fileHash")) - - parser := parser.NewParser(cdssdk.DefaultECRedundancy) - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - return "", fmt.Errorf("parsing plan: %w", err) - } - - exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, ctx.StgMgr) - exec := plans.Execute(exeCtx) - exec.BeginWrite(io.NopCloser(file), hd) - ret, err := exec.Wait(context.TODO()) - if err != nil { - return "", err - } - - return ret["fileHash"].(*ops2.FileHashValue).Hash, nil -} diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index cbe814a..c4f59e3 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -399,9 +399,8 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *stg.MasterHub, stg.Storage, -1)).AddTo(toExec) strHandle = handle - parser := parser.NewParser(cdssdk.DefaultECRedundancy) plans := exec.NewPlanBuilder() - if err := parser.Parse(ft, plans); err != nil { + if err := parser.Parse(ft, plans, cdssdk.DefaultECRedundancy); err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 23f4960..da06dcc 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -209,9 +209,8 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { }) ft.AddTo(toExec) - parser := parser.NewParser(*s.red) plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) + err := parser.Parse(ft, plans, *s.red) if err != nil { return 0, err } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index b05edab..0d312fb 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -15,16 +15,6 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type DefaultParser struct { - EC cdssdk.ECRedundancy -} - -func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { - return &DefaultParser{ - EC: ec, - } -} - type IndexedStream struct { Stream *dag.Var DataIndex int @@ -38,22 +28,24 @@ type ParseContext struct { ToNodes map[ioswitch2.To]ops2.ToNode IndexedStreams []IndexedStream StreamRange exec.Range + EC cdssdk.ECRedundancy } -func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { +func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder, ec cdssdk.ECRedundancy) error { ctx := ParseContext{ Ft: ft, DAG: ops2.NewGraphNodeBuilder(), ToNodes: make(map[ioswitch2.To]ops2.ToNode), + EC: ec, } // 分成两个阶段: // 1. 基于From和To生成更多指令,初步匹配to的需求 // 计算一下打开流的范围 - p.calcStreamRange(&ctx) + calcStreamRange(&ctx) - err := p.extend(&ctx) + err := extend(&ctx) if err != nil { return err } @@ -64,16 +56,16 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro // 从目前实现上来说不会死循环 for { opted := false - if p.removeUnusedJoin(&ctx) { + if removeUnusedJoin(&ctx) { opted = true } - if p.removeUnusedMultiplyOutput(&ctx) { + if removeUnusedMultiplyOutput(&ctx) { opted = true } - if p.removeUnusedSplit(&ctx) { + if removeUnusedSplit(&ctx) { opted = true } - if p.omitSplitJoin(&ctx) { + if omitSplitJoin(&ctx) { opted = true } @@ -83,18 +75,18 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro } // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 - for p.pin(&ctx) { + for pin(&ctx) { } // 下面这些只需要执行一次,但需要按顺序 - p.dropUnused(&ctx) - p.storeIPFSWriteResult(&ctx) - p.generateClone(&ctx) - p.generateRange(&ctx) + dropUnused(&ctx) + storeIPFSWriteResult(&ctx) + generateClone(&ctx) + generateRange(&ctx) return plan.Generate(ctx.DAG.Graph, blder) } -func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var { +func findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var { var ret *dag.Var for _, s := range ctx.IndexedStreams { if s.DataIndex == streamIndex { @@ -106,8 +98,8 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *da } // 计算输入流的打开范围。会把流的范围按条带大小取整 -func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { - stripSize := int64(p.EC.ChunkSize * p.EC.K) +func calcStreamRange(ctx *ParseContext) { + stripSize := int64(ctx.EC.ChunkSize * ctx.EC.K) rng := exec.Range{ Offset: math.MaxInt64, @@ -126,10 +118,10 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { } else { toRng := to.GetRange() - blkStartIndex := math2.FloorDiv(toRng.Offset, int64(p.EC.ChunkSize)) + blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.EC.ChunkSize)) rng.ExtendStart(blkStartIndex * stripSize) if toRng.Length != nil { - blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(p.EC.ChunkSize)) + blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.EC.ChunkSize)) rng.ExtendEnd(blkEndIndex * stripSize) } else { rng.Length = nil @@ -140,9 +132,9 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext) error { +func extend(ctx *ParseContext) error { for _, fr := range ctx.Ft.Froms { - frNode, err := p.buildFromNode(ctx, fr) + frNode, err := buildFromNode(ctx, fr) if err != nil { return err } @@ -154,9 +146,9 @@ func (p *DefaultParser) extend(ctx *ParseContext) error { // 对于完整文件的From,生成Split指令 if fr.GetDataIndex() == -1 { - splitNode := ctx.DAG.NewChunkedSplit(p.EC.ChunkSize) - splitNode.Split(frNode.Output().Var, p.EC.K) - for i := 0; i < p.EC.K; i++ { + splitNode := ctx.DAG.NewChunkedSplit(ctx.EC.ChunkSize) + splitNode.Split(frNode.Output().Var, ctx.EC.K) + for i := 0; i < ctx.EC.K; i++ { ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: splitNode.SubStream(i), DataIndex: i, @@ -170,29 +162,29 @@ func (p *DefaultParser) extend(ctx *ParseContext) error { for _, s := range ctx.IndexedStreams { if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { ecInputStrs[s.DataIndex] = s.Stream - if len(ecInputStrs) == p.EC.K { + if len(ecInputStrs) == ctx.EC.K { break } } } - if len(ecInputStrs) == p.EC.K { - mulNode := ctx.DAG.NewECMultiply(p.EC) + if len(ecInputStrs) == ctx.EC.K { + mulNode := ctx.DAG.NewECMultiply(ctx.EC) for i, s := range ecInputStrs { mulNode.AddInput(s, i) } - for i := 0; i < p.EC.N; i++ { + for i := 0; i < ctx.EC.N; i++ { ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: mulNode.NewOutput(i), DataIndex: i, }) } - joinNode := ctx.DAG.NewChunkedJoin(p.EC.ChunkSize) - for i := 0; i < p.EC.K; i++ { + joinNode := ctx.DAG.NewChunkedJoin(ctx.EC.ChunkSize) + for i := 0; i < ctx.EC.K; i++ { // 不可能找不到流 - joinNode.AddInput(p.findOutputStream(ctx, i)) + joinNode.AddInput(findOutputStream(ctx, i)) } ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: joinNode.Joined(), @@ -202,13 +194,13 @@ func (p *DefaultParser) extend(ctx *ParseContext) error { // 为每一个To找到一个输入流 for _, to := range ctx.Ft.Toes { - toNode, err := p.buildToNode(ctx, to) + toNode, err := buildToNode(ctx, to) if err != nil { return err } ctx.ToNodes[to] = toNode - str := p.findOutputStream(ctx, to.GetDataIndex()) + str := findOutputStream(ctx, to.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex()) } @@ -219,17 +211,17 @@ func (p *DefaultParser) extend(ctx *ParseContext) error { return nil } -func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { +func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { var repRange exec.Range var blkRange exec.Range repRange.Offset = ctx.StreamRange.Offset - blkRange.Offset = ctx.StreamRange.Offset / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize) + blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.EC.ChunkSize*ctx.EC.K) * int64(ctx.EC.ChunkSize) if ctx.StreamRange.Length != nil { repRngLen := *ctx.StreamRange.Length repRange.Length = &repRngLen - blkRngLen := *ctx.StreamRange.Length / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize) + blkRngLen := *ctx.StreamRange.Length / int64(ctx.EC.ChunkSize*ctx.EC.K) * int64(ctx.EC.ChunkSize) blkRange.Length = &blkRngLen } @@ -278,12 +270,12 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 } } -func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { +func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToShardStore: n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey) - if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { + if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err } @@ -301,7 +293,7 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN case *ioswitch2.LoadToShared: n := ctx.DAG.NewSharedLoad(t.Storage.StorageID, t.UserID, t.PackageID, t.Path) - if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { + if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err } @@ -314,7 +306,7 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN } } -func (p *DefaultParser) setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error { +func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error { switch addr := addr.(type) { case *cdssdk.HttpAddressInfo: n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}) @@ -330,7 +322,7 @@ func (p *DefaultParser) setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk. } // 删除输出流未被使用的Join指令 -func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { +func removeUnusedJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool { @@ -347,7 +339,7 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { } // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 -func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { +func removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool { outArr := node.OutputStreams().RawArray() @@ -376,7 +368,7 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { } // 删除未使用的Split指令 -func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { +func removeUnusedSplit(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 @@ -396,7 +388,7 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { } // 如果Split的结果被完全用于Join,则省略Split和Join指令 -func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { +func omitSplitJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool { @@ -452,7 +444,7 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { // 通过流的输入输出位置来确定指令的执行位置。 // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG, // 所以理论上不会出现有指令的位置始终无法确定的情况。 -func (p *DefaultParser) pin(ctx *ParseContext) bool { +func pin(ctx *ParseContext) bool { changed := false ctx.DAG.Walk(func(node dag.Node) bool { if node.Env().Pinned { @@ -513,7 +505,7 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { } // 对于所有未使用的流,增加Drop指令 -func (p *DefaultParser) dropUnused(ctx *ParseContext) { +func dropUnused(ctx *ParseContext) { ctx.DAG.Walk(func(node dag.Node) bool { for _, out := range node.OutputStreams().RawArray() { if out.To().Len() == 0 { @@ -527,7 +519,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { } // 为IPFS写入指令存储结果 -func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { +func storeIPFSWriteResult(ctx *ParseContext) { dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool { if n.FileHashStoreKey == "" { return true @@ -542,7 +534,7 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 -func (p *DefaultParser) generateRange(ctx *ParseContext) { +func generateRange(ctx *ParseContext) { for i := 0; i < len(ctx.Ft.Toes); i++ { to := ctx.Ft.Toes[i] toNode := ctx.ToNodes[to] @@ -562,10 +554,10 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { toNode.SetInput(rnged) } else { - stripSize := int64(p.EC.ChunkSize * p.EC.K) + stripSize := int64(ctx.EC.ChunkSize * ctx.EC.K) blkStartIdx := ctx.StreamRange.Offset / stripSize - blkStart := blkStartIdx * int64(p.EC.ChunkSize) + blkStart := blkStartIdx * int64(ctx.EC.ChunkSize) n := ctx.DAG.NewRange() toInput := toNode.Input() @@ -581,7 +573,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { } // 生成Clone指令 -func (p *DefaultParser) generateClone(ctx *ParseContext) { +func generateClone(ctx *ParseContext) { ctx.DAG.Walk(func(node dag.Node) bool { for _, out := range node.OutputStreams().RawArray() { if out.To().Len() <= 1 { diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go new file mode 100644 index 0000000..5e50cf2 --- /dev/null +++ b/common/pkgs/uploader/create_load.go @@ -0,0 +1,137 @@ +package uploader + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" +) + +type CreateLoadUploader struct { + pkg cdssdk.Package + userID cdssdk.UserID + targetStgs []stgmod.StorageDetail + uploader *Uploader + distlock *distlock.Mutex + successes []coormq.AddObjectEntry + lock sync.Mutex + commited bool +} + +type CreateLoadResult struct { + Package cdssdk.Package + Objects map[string]cdssdk.Object + LoadedDirs []string +} + +func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) error { + uploadTime := time.Now() + + ft := ioswitch2.NewFromTo() + fromExec, hd := ioswitch2.NewFromDriver(-1) + ft.AddFrom(fromExec) + for _, stg := range u.targetStgs { + ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, -1, "fileHash")) + ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path)) + } + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans, cdssdk.DefaultECRedundancy) + if err != nil { + return fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, u.uploader.stgMgr) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + u.lock.Lock() + defer u.lock.Unlock() + + // 记录上传结果 + fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash + for _, stg := range u.targetStgs { + u.successes = append(u.successes, coormq.AddObjectEntry{ + Path: path, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + StorageID: stg.Storage.StorageID, + }) + } + return nil +} + +func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { + u.lock.Lock() + defer u.lock.Unlock() + + if u.commited { + return CreateLoadResult{}, fmt.Errorf("package already commited") + } + u.commited = true + + defer u.distlock.Unlock() + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return CreateLoadResult{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes, nil)) + if err != nil { + return CreateLoadResult{}, fmt.Errorf("updating package: %w", err) + } + + ret := CreateLoadResult{ + Objects: make(map[string]cdssdk.Object), + } + + for _, entry := range updateResp.Added { + ret.Objects[entry.Path] = entry + } + + for _, stg := range u.targetStgs { + _, err := coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(u.userID, stg.Storage.StorageID, u.pkg.PackageID, nil)) + if err != nil { + return CreateLoadResult{}, fmt.Errorf("notifying storage package loaded: %w", err) + } + + // TODO 考虑让SharedStore来生成Load目录路径 + ret.LoadedDirs = append(ret.LoadedDirs, utils.MakeLoadedPackagePath(u.userID, u.pkg.PackageID)) + } + + return ret, nil +} + +func (u *CreateLoadUploader) Abort() { + u.lock.Lock() + defer u.lock.Unlock() + + if u.commited { + return + } + u.commited = true + + u.distlock.Unlock() + + // TODO 可以考虑删除PackageID +} diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go new file mode 100644 index 0000000..aed711b --- /dev/null +++ b/common/pkgs/uploader/update.go @@ -0,0 +1,121 @@ +package uploader + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type UpdateUploader struct { + uploader *Uploader + pkgID cdssdk.PackageID + targetStg stgmod.StorageDetail + distMutex *distlock.Mutex + successes []coormq.AddObjectEntry + lock sync.Mutex + commited bool +} + +type UploadStorageInfo struct { + Storage stgmod.StorageDetail + Delay time.Duration + IsSameLocation bool +} + +type UpdateResult struct { + // 上传成功的文件列表,Key为Path + Objects map[string]cdssdk.Object +} + +func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error { + uploadTime := time.Now() + + ft := ioswitch2.NewFromTo() + fromExec, hd := ioswitch2.NewFromDriver(-1) + ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg.Storage, -1, "fileHash")) + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans, cdssdk.DefaultECRedundancy) + if err != nil { + return fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, w.uploader.stgMgr) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + w.lock.Lock() + defer w.lock.Unlock() + + // 记录上传结果 + w.successes = append(w.successes, coormq.AddObjectEntry{ + Path: path, + Size: size, + FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash, + UploadTime: uploadTime, + StorageID: w.targetStg.Storage.StorageID, + }) + return nil +} + +func (w *UpdateUploader) Commit() (UpdateResult, error) { + w.lock.Lock() + defer w.lock.Unlock() + + if w.commited { + return UpdateResult{}, fmt.Errorf("package already commited") + } + w.commited = true + + defer w.distMutex.Unlock() + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return UpdateResult{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes, nil)) + if err != nil { + return UpdateResult{}, fmt.Errorf("updating package: %w", err) + } + + ret := UpdateResult{ + Objects: make(map[string]cdssdk.Object), + } + + for _, entry := range updateResp.Added { + ret.Objects[entry.Path] = entry + } + + return ret, nil +} + +func (w *UpdateUploader) Abort() { + w.lock.Lock() + defer w.lock.Unlock() + + if w.commited { + return + } + + w.commited = true + w.distMutex.Unlock() +} diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go new file mode 100644 index 0000000..1c5d6b9 --- /dev/null +++ b/common/pkgs/uploader/uploader.go @@ -0,0 +1,156 @@ +package uploader + +import ( + "fmt" + "math" + "math/rand" + "time" + + "github.com/samber/lo" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" +) + +type Uploader struct { + distlock *distlock.Service + connectivity *connectivity.Collector + stgMgr *mgr.Manager +} + +func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *mgr.Manager) *Uploader { + return &Uploader{ + distlock: distlock, + connectivity: connectivity, + stgMgr: stgMgr, + } +} + +func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, affinity cdssdk.StorageID) (*UpdateUploader, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) + if err != nil { + return nil, fmt.Errorf("getting user storages: %w", err) + } + + cons := u.connectivity.GetAll() + var userStgs []UploadStorageInfo + for _, stg := range getUserStgsResp.Storages { + if stg.MasterHub == nil { + continue + } + + delay := time.Duration(math.MaxInt64) + + con, ok := cons[stg.MasterHub.HubID] + if ok && con.Delay != nil { + delay = *con.Delay + } + + userStgs = append(userStgs, UploadStorageInfo{ + Storage: stg, + Delay: delay, + IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + }) + } + + if len(userStgs) == 0 { + return nil, fmt.Errorf("user no available storages") + } + + target := u.chooseUploadStorage(userStgs, affinity) + + // 给上传节点的IPFS加锁 + // TODO 考虑加Object的Create锁 + // 防止上传的副本被清除 + distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock) + if err != nil { + return nil, fmt.Errorf("acquire distlock: %w", err) + } + + return &UpdateUploader{ + uploader: u, + pkgID: pkgID, + targetStg: target.Storage, + distMutex: distMutex, + }, nil +} + +// chooseUploadStorage 选择一个上传文件的节点 +// 1. 选择设置了亲和性的节点 +// 2. 从与当前客户端相同地域的节点中随机选一个 +// 3. 没有的话从所有节点选择延迟最低的节点 +func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo { + if stgAffinity > 0 { + aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity }) + if ok { + return aff + } + } + + sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation }) + if len(sameLocationStorages) > 0 { + return sameLocationStorages[rand.Intn(len(sameLocationStorages))] + } + + // 选择延迟最低的节点 + storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) + + return storages[0] +} + +func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID) (*CreateLoadUploader, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(loadTo)) + if err != nil { + return nil, fmt.Errorf("getting storages: %w", err) + } + + targetStgs := make([]stgmod.StorageDetail, len(loadTo)) + for i, stg := range getStgs.Storages { + if stg == nil { + return nil, fmt.Errorf("storage %v not found", loadTo[i]) + } + targetStgs[i] = *stg + } + + createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName)) + if err != nil { + return nil, fmt.Errorf("create package: %w", err) + } + + reqBld := reqbuilder.NewBuilder() + for _, stg := range targetStgs { + reqBld.Shard().Buzy(stg.Storage.StorageID) + reqBld.Storage().Buzy(stg.Storage.StorageID) + reqBld.Metadata().StoragePackage().CreateOne(userID, stg.Storage.StorageID, createPkg.Package.PackageID) + } + lock, err := reqBld.MutexLock(u.distlock) + if err != nil { + return nil, fmt.Errorf("acquire distlock: %w", err) + } + + return &CreateLoadUploader{ + pkg: createPkg.Package, + userID: userID, + targetStgs: targetStgs, + uploader: u, + distlock: lock, + }, nil +} diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index f4616a0..337cf9c 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -453,8 +453,7 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object } plans := exec.NewPlanBuilder() - parser := parser.NewParser(cdssdk.DefaultECRedundancy) - err = parser.Parse(ft, plans) + err = parser.Parse(ft, plans, cdssdk.DefaultECRedundancy) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } @@ -511,9 +510,8 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD for i := 0; i < red.N; i++ { ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i))) } - parser := parser.NewParser(*red) plans := exec.NewPlanBuilder() - err = parser.Parse(ft, plans) + err = parser.Parse(ft, plans, *red) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } @@ -631,8 +629,7 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD } plans := exec.NewPlanBuilder() - parser := parser.NewParser(cdssdk.DefaultECRedundancy) - err = parser.Parse(ft, plans) + err = parser.Parse(ft, plans, cdssdk.DefaultECRedundancy) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } @@ -694,7 +691,6 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) // 每个被选节点都在自己节点上重建原始数据 - parser := parser.NewParser(*srcRed) planBlder := exec.NewPlanBuilder() for i := range uploadStgs { ft := ioswitch2.NewFromTo() @@ -709,7 +705,7 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe Length: &len, })) - err := parser.Parse(ft, planBlder) + err := parser.Parse(ft, planBlder, *srcRed) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } @@ -765,7 +761,6 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet } // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 - parser := parser.NewParser(*srcRed) planBlder := exec.NewPlanBuilder() var newBlocks []stgmod.ObjectBlock @@ -799,7 +794,7 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet // 输出只需要自己要保存的那一块 ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, i, fmt.Sprintf("%d", i))) - err := parser.Parse(ft, planBlder) + err := parser.Parse(ft, planBlder, *srcRed) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 41859ec..91ed3b0 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -745,8 +745,7 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st toStg := allStgInfos[solu.blockList[i].StorageID] ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, toStg.Storage, -1, fmt.Sprintf("%d.0", obj.Object.ObjectID))) - parser := parser.NewParser(cdssdk.DefaultECRedundancy) - err := parser.Parse(ft, planBld) + err := parser.Parse(ft, planBld, cdssdk.DefaultECRedundancy) if err != nil { // TODO 错误处理 continue @@ -796,7 +795,6 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg } ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) - parser := parser.NewParser(*ecRed) for id, idxs := range reconstrct { ft := ioswitch2.NewFromTo() @@ -806,7 +804,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, allStgInfos[id].Storage, i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) } - err := parser.Parse(ft, planBld) + err := parser.Parse(ft, planBld, *ecRed) if err != nil { // TODO 错误处理 continue