From eb50316b87c9928930ec4fcedd020a46b2d79f31 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 18 Mar 2024 10:11:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=9F=E6=9C=AC=E7=9A=84=E6=96=B0=E5=BB=BAPa?= =?UTF-8?q?ckage=E6=8E=A5=E5=8F=A3=E6=8B=86=E6=88=90=E6=96=B0=E5=BB=BA+?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E4=B8=A4=E4=B8=AA=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/create_package.go | 60 +++++++-- client/internal/cmdline/commandline.go | 5 + client/internal/cmdline/object.go | 58 +++++++++ client/internal/cmdline/package.go | 115 ++---------------- client/internal/http/bucket.go | 61 ++++++++++ client/internal/http/cache.go | 2 +- client/internal/http/object.go | 52 +++++++- client/internal/http/package.go | 69 ++--------- client/internal/http/server.go | 26 ++-- client/internal/http/storage.go | 2 +- client/internal/services/object.go | 17 +++ client/internal/services/package.go | 46 +++---- client/internal/task/create_package.go | 35 ------ client/internal/task/update_package.go | 36 ------ client/internal/task/upload_objects.go | 36 ++++++ common/assets/scripts/create_database.sql | 2 + common/pkgs/cmd/update_package.go | 88 -------------- .../{create_package.go => upload_objects.go} | 37 +++--- common/pkgs/db/model/model.go | 6 +- common/pkgs/db/object.go | 76 +++--------- common/pkgs/mq/coordinator/package.go | 22 ++-- .../event/check_package_redundancy.go | 2 +- 22 files changed, 384 insertions(+), 469 deletions(-) create mode 100644 client/internal/cmdline/object.go create mode 100644 client/internal/http/bucket.go delete mode 100644 client/internal/task/create_package.go delete mode 100644 client/internal/task/update_package.go create mode 100644 client/internal/task/upload_objects.go delete mode 100644 common/pkgs/cmd/update_package.go rename common/pkgs/cmd/{create_package.go => upload_objects.go} (87%) diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index b70fae8..275d522 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -1,26 +1,39 @@ package task import ( + "fmt" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type CreatePackageResult = cmd.CreatePackageResult +type CreatePackageResult struct { + PackageID cdssdk.PackageID + Objects []cmd.ObjectUploadResult +} type CreatePackage struct { - cmd cmd.CreatePackage - - Result *CreatePackageResult + userID cdssdk.UserID + bucketID cdssdk.BucketID + name string + objIter iterator.UploadingObjectIterator + nodeAffinity *cdssdk.NodeID + Result *CreatePackageResult } func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { return &CreatePackage{ - cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), + userID: userID, + bucketID: bucketID, + name: name, + objIter: objIter, + nodeAffinity: nodeAffinity, } } @@ -29,12 +42,43 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c log.Debugf("begin") defer log.Debugf("end") - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + err = fmt.Errorf("new coordinator client: %w", err) + log.Warn(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + createResp, err := coorCli.CreatePackage(coordinator.NewCreatePackage(t.userID, t.bucketID, t.name)) + if err != nil { + err = fmt.Errorf("creating package: %w", err) + log.Error(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, }) - t.Result = ret + if err != nil { + err = fmt.Errorf("uploading objects: %w", err) + log.Error(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + t.Result.PackageID = createResp.PackageID + t.Result.Objects = uploadRet.Objects - complete(err, CompleteOption{ + complete(nil, CompleteOption{ RemovingDelay: time.Minute, }) } diff --git a/client/internal/cmdline/commandline.go b/client/internal/cmdline/commandline.go index 102154f..2b71d93 100644 --- a/client/internal/cmdline/commandline.go +++ b/client/internal/cmdline/commandline.go @@ -38,3 +38,8 @@ func (c *Commandline) DispatchCommand(allArgs []string) { os.Exit(1) } } + +func MustAddCmd(fn any, prefixWords ...string) any { + commands.MustAdd(fn, prefixWords...) + return nil +} diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go new file mode 100644 index 0000000..fad17cf --- /dev/null +++ b/client/internal/cmdline/object.go @@ -0,0 +1,58 @@ +package cmdline + +import ( + "fmt" + "os" + "path/filepath" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, nodeAffinity []cdssdk.NodeID) error { + userID := cdssdk.UserID(1) + + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + var nodeAff *cdssdk.NodeID + if len(nodeAffinity) > 0 { + n := cdssdk.NodeID(nodeAffinity[0]) + nodeAff = &n + } + + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploading(userID, packageID, objIter, nodeAff) + if err != nil { + return fmt.Errorf("update objects to package %d failed, err: %w", packageID, err) + } + + for { + complete, _, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("uploading objects: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait updating: %w", err) + } + } +}, "obj", "upload") diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index a4a7dfd..a15bdbe 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -5,7 +5,6 @@ import ( "io" "os" "path/filepath" - "time" "github.com/jedib0t/go-pretty/v6/table" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -13,7 +12,7 @@ import ( ) func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) if err != nil { @@ -34,13 +33,15 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err } func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { + userID := cdssdk.UserID(1) + err := os.MkdirAll(outputDir, os.ModePerm) if err != nil { return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) } // 下载文件 - objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID) + objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(userID, packageID) if err != nil { return fmt.Errorf("download object failed, err: %w", err) } @@ -91,108 +92,20 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp return nil } -func PackageCreatePackage(ctx CommandContext, name string, rootPath string, bucketID cdssdk.BucketID, nodeAffinity []cdssdk.NodeID) error { - rootPath = filepath.Clean(rootPath) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - var nodeAff *cdssdk.NodeID - if len(nodeAffinity) > 0 { - n := cdssdk.NodeID(nodeAffinity[0]) - nodeAff = &n - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingPackage(0, bucketID, name, objIter, nodeAff) +func PackageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string) error { + userID := cdssdk.UserID(1) + pkgID, err := ctx.Cmdline.Svc.PackageSvc().Create(userID, bucketID, name) if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("uploading package: %w", err) - } - - tb := table.NewWriter() - - tb.AppendHeader(table.Row{"Path", "ObjectID"}) - for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { - tb.AppendRow(table.Row{ - uploadObjectResult.ObjectResults[i].Info.Path, - uploadObjectResult.ObjectResults[i].ObjectID, - }) - } - fmt.Print(tb.Render()) - fmt.Printf("\n%v", uploadObjectResult.PackageID) - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } -} - -func PackageUpdatePackage(ctx CommandContext, packageID cdssdk.PackageID, rootPath string) error { - //userID := int64(0) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingPackage(0, packageID, objIter) - if err != nil { - return fmt.Errorf("update package %d failed, err: %w", packageID, err) + return err } - for { - complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("updating package: %w", err) - } - - return nil - } - - if err != nil { - return fmt.Errorf("wait updating: %w", err) - } - } + fmt.Printf("%v\n", pkgID) + return nil } func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) if err != nil { return fmt.Errorf("delete package %d failed, err: %w", packageID, err) @@ -201,7 +114,7 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error } func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) fmt.Printf("resp: %v\n", resp) if err != nil { @@ -211,7 +124,7 @@ func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error } func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) fmt.Printf("nodeIDs: %v\n", nodeIDs) if err != nil { @@ -227,8 +140,6 @@ func init() { commands.MustAdd(PackageCreatePackage, "pkg", "new") - commands.MustAdd(PackageUpdatePackage, "pkg", "update") - commands.MustAdd(PackageDeletePackage, "pkg", "delete") commands.MustAdd(PackageGetCachedNodes, "pkg", "cached") diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go new file mode 100644 index 0000000..7e749b0 --- /dev/null +++ b/client/internal/http/bucket.go @@ -0,0 +1,61 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type BucketService struct { + *Server +} + +func (s *Server) Bucket() *BucketService { + return &BucketService{ + Server: s, + } +} + +func (s *BucketService) Create(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.Create") + + var req cdssdk.BucketCreateReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.BucketName) + if err != nil { + log.Warnf("creating bucket: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketCreateResp{ + BucketID: bucketID, + })) +} + +func (s *BucketService) Delete(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.Delete") + + var req cdssdk.BucketDeleteReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + if err := s.svc.BucketSvc().DeleteBucket(req.UserID, req.BucketID); err != nil { + log.Warnf("deleting bucket: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete bucket failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} diff --git a/client/internal/http/cache.go b/client/internal/http/cache.go index e1f1210..ab86fda 100644 --- a/client/internal/http/cache.go +++ b/client/internal/http/cache.go @@ -14,7 +14,7 @@ type CacheService struct { *Server } -func (s *Server) CacheSvc() *CacheService { +func (s *Server) Cache() *CacheService { return &CacheService{ Server: s, } diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 9b330ef..d7dba18 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -2,7 +2,9 @@ package http import ( "io" + "mime/multipart" "net/http" + "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -15,12 +17,60 @@ type ObjectService struct { *Server } -func (s *Server) ObjectSvc() *ObjectService { +func (s *Server) Object() *ObjectService { return &ObjectService{ Server: s, } } +type ObjectUploadReq struct { + Info cdssdk.ObjectUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` +} + +func (s *ObjectService) Upload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Upload") + + var req ObjectUploadReq + if err := ctx.ShouldBind(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + var err error + + objIter := mapMultiPartFileToUploadingObject(req.Files) + + taskID, err := s.svc.ObjectSvc().StartUploading(req.Info.UserID, req.Info.PackageID, objIter, req.Info.NodeAffinity) + + if err != nil { + log.Warnf("start uploading object task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + return + } + + for { + complete, _, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + if complete { + if err != nil { + log.Warnf("uploading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading object failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) + return + } + + if err != nil { + log.Warnf("waiting task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) + return + } + } +} + type ObjectDownloadReq struct { UserID *cdssdk.UserID `form:"userID" binding:"required"` ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c1c454e..cdaf3e0 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -3,7 +3,6 @@ package http import ( "mime/multipart" "net/http" - "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -19,7 +18,7 @@ type PackageService struct { *Server } -func (s *Server) PackageSvc() *PackageService { +func (s *Server) Package() *PackageService { return &PackageService{ Server: s, } @@ -53,71 +52,25 @@ func (s *PackageService) Get(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) } -type PackageUploadReq struct { - Info PackageUploadInfo `form:"info" binding:"required"` - Files []*multipart.FileHeader `form:"files"` -} - -type PackageUploadInfo struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - BucketID *cdssdk.BucketID `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` -} - -type PackageUploadResp struct { - PackageID cdssdk.PackageID `json:"packageID,string"` -} - -func (s *PackageService) Upload(ctx *gin.Context) { - log := logger.WithField("HTTP", "Package.Upload") - - var req PackageUploadReq - if err := ctx.ShouldBind(&req); err != nil { +func (s *PackageService) Create(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Create") + var req cdssdk.PackageCreateReq + if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - s.uploadEC(ctx, &req) -} - -func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { - log := logger.WithField("HTTP", "Package.Upload") - - var err error - - objIter := mapMultiPartFileToUploadingObject(req.Files) - - taskID, err := s.svc.PackageSvc().StartCreatingPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) - + pkgID, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) if err != nil { - log.Warnf("start uploading ec package task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + log.Warnf("creating package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) return } - for { - complete, createResult, err := s.svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - log.Warnf("uploading ec package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed")) - return - } - - ctx.JSON(http.StatusOK, OK(PackageUploadResp{ - PackageID: createResult.PackageID, - })) - return - } - - if err != nil { - log.Warnf("waiting task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) - return - } - } + ctx.JSON(http.StatusOK, OK(cdssdk.PackageCreateResp{ + PackageID: pkgID, + })) } type PackageDeleteReq struct { diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ee1954f..462f76a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -39,18 +39,22 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET("/object/download", s.ObjectSvc().Download) - s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.ObjectSvc().GetPackageObjects) + s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) + s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) + s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) - s.engine.GET("/package/get", s.PackageSvc().Get) - s.engine.POST("/package/upload", s.PackageSvc().Upload) - s.engine.POST("/package/delete", s.PackageSvc().Delete) - s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) - s.engine.GET("/package/getLoadedNodes", s.PackageSvc().GetLoadedNodes) + s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) + s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) + s.engine.POST("/package/delete", s.Package().Delete) + s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes) + s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes) - s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) - s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) - s.engine.GET("/storage/getInfo", s.StorageSvc().GetInfo) + s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) + s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) + s.engine.GET("/storage/getInfo", s.Storage().GetInfo) - s.engine.POST(cdssdk.CacheMovePackagePath, s.CacheSvc().MovePackage) + s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + + s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) + s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) } diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 1b30f1c..3ada149 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -14,7 +14,7 @@ type StorageService struct { *Server } -func (s *Server) StorageSvc() *StorageService { +func (s *Server) Storage() *StorageService { return &StorageService{ Server: s, } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 2b988cf..0dc7374 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -3,10 +3,13 @@ package services import ( "fmt" "io" + "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -18,6 +21,20 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } +func (svc *ObjectService) StartUploading(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewUploadObjects(userID, packageID, objIter, nodeAffinity)) + return tsk.ID(), nil +} + +func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration) (bool, *mytask.UploadObjectsResult, error) { + tsk := svc.TaskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + updatePkgTask := tsk.Body().(*mytask.UploadObjects) + return true, updatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { panic("not implement yet!") } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index da3d420..9667356 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -2,13 +2,10 @@ package services import ( "fmt" - "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - agtcmd "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -37,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return 0, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) + if err != nil { + return 0, fmt.Errorf("creating package: %w", err) + } + + return resp.PackageID, nil +} + func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -56,34 +68,6 @@ func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssd return iter, nil } -func (svc *PackageService) StartCreatingPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitCreatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreatePackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreatePackage) - return true, cteatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - -func (svc *PackageService) StartUpdatingPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewUpdatePackage(userID, packageID, objIter)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitUpdatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdatePackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*mytask.UpdatePackage) - return true, updatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/client/internal/task/create_package.go b/client/internal/task/create_package.go deleted file mode 100644 index 1f826b9..0000000 --- a/client/internal/task/create_package.go +++ /dev/null @@ -1,35 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type CreatePackageResult = cmd.CreatePackageResult - -type CreatePackage struct { - cmd cmd.CreatePackage - - Result *CreatePackageResult -} - -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { - return &CreatePackage{ - cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), - } -} - -func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/client/internal/task/update_package.go b/client/internal/task/update_package.go deleted file mode 100644 index 518cc98..0000000 --- a/client/internal/task/update_package.go +++ /dev/null @@ -1,36 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type UpdatePackageResult = cmd.UpdatePackageResult - -type UpdatePackage struct { - cmd cmd.UpdatePackage - - Result *UpdatePackageResult -} - -func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdatePackage { - return &UpdatePackage{ - cmd: *cmd.NewUpdatePackage(userID, packageID, objectIter), - } -} - -func (t *UpdatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go new file mode 100644 index 0000000..b138add --- /dev/null +++ b/client/internal/task/upload_objects.go @@ -0,0 +1,36 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/task" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +type UploadObjectsResult = cmd.UploadObjectsResult + +type UploadObjects struct { + cmd cmd.UploadObjects + + Result *UploadObjectsResult +} + +func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { + return &UploadObjects{ + cmd: *cmd.NewUploadObjects(userID, packageID, objectIter, nodeAffinity), + } +} + +func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ + Distlock: ctx.distlock, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 3652c7a..415b0bf 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -122,6 +122,8 @@ create table Object ( Size bigint not null comment '对象大小(Byte)', FileHash varchar(100) not null comment '完整对象的FileHash', Redundancy JSON not null comment '冗余策略', + CreateTime timestamp not null comment '创建时间', + UpdateTime timestamp not null comment '更新时间', UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; diff --git a/common/pkgs/cmd/update_package.go b/common/pkgs/cmd/update_package.go deleted file mode 100644 index 015644f..0000000 --- a/common/pkgs/cmd/update_package.go +++ /dev/null @@ -1,88 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/samber/lo" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type UpdatePackage struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - objectIter iterator.UploadingObjectIterator -} - -type UpdatePackageResult struct { - ObjectResults []ObjectUploadResult -} - -type UpdateNodeInfo struct { - UploadNodeInfo - HasOldObject bool -} - -func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) *UpdatePackage { - return &UpdatePackage{ - userID: userID, - packageID: packageID, - objectIter: objIter, - } -} - -func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult, error) { - defer t.objectIter.Close() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) - if err != nil { - return nil, fmt.Errorf("getting user nodes: %w", err) - } - - userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo { - return UploadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, - } - }) - - // 给上传节点的IPFS加锁 - ipfsReqBlder := reqbuilder.NewBuilder() - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if stgglb.Local.NodeID != nil { - ipfsReqBlder.IPFS().Buzy(*stgglb.Local.NodeID) - } - for _, node := range userNodes { - if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { - continue - } - - ipfsReqBlder.IPFS().Buzy(node.Node.NodeID) - } - // TODO 加Object的Create锁,最好一次性能加多个 - // 防止上传的副本被清除 - ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer ipfsMutex.Unlock() - - rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, nil) - if err != nil { - return nil, err - } - - return &UpdatePackageResult{ - ObjectResults: rets, - }, nil -} diff --git a/common/pkgs/cmd/create_package.go b/common/pkgs/cmd/upload_objects.go similarity index 87% rename from common/pkgs/cmd/create_package.go rename to common/pkgs/cmd/upload_objects.go index f250dc5..b4234f3 100644 --- a/common/pkgs/cmd/create_package.go +++ b/common/pkgs/cmd/upload_objects.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "math/rand" + "time" "github.com/samber/lo" @@ -18,17 +19,15 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type CreatePackage struct { +type UploadObjects struct { userID cdssdk.UserID - bucketID cdssdk.BucketID - name string + packageID cdssdk.PackageID objectIter iterator.UploadingObjectIterator nodeAffinity *cdssdk.NodeID } -type CreatePackageResult struct { - PackageID cdssdk.PackageID - ObjectResults []ObjectUploadResult +type UploadObjectsResult struct { + Objects []ObjectUploadResult } type ObjectUploadResult struct { @@ -43,21 +42,20 @@ type UploadNodeInfo struct { IsSameLocation bool } -type UpdatePackageContext struct { +type UploadObjectsContext struct { Distlock *distlock.Service } -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { - return &CreatePackage{ +func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { + return &UploadObjects{ userID: userID, - bucketID: bucketID, - name: name, + packageID: packageID, objectIter: objIter, nodeAffinity: nodeAffinity, } } -func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult, error) { +func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) { defer t.objectIter.Close() coorCli, err := stgglb.CoordinatorMQPool.Acquire() @@ -65,11 +63,6 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult return nil, fmt.Errorf("new coordinator client: %w", err) } - createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name)) - if err != nil { - return nil, fmt.Errorf("creating package: %w", err) - } - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { return nil, fmt.Errorf("getting user nodes: %w", err) @@ -103,14 +96,13 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult } defer ipfsMutex.Unlock() - rets, err := uploadAndUpdatePackage(createPkgResp.PackageID, t.objectIter, userNodes, t.nodeAffinity) + rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, t.nodeAffinity) if err != nil { return nil, err } - return &CreatePackageResult{ - PackageID: createPkgResp.PackageID, - ObjectResults: rets, + return &UploadObjectsResult{ + Objects: rets, }, nil } @@ -158,6 +150,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo err = func() error { defer objInfo.File.Close() + uploadTime := time.Now() fileHash, err := uploadFile(objInfo.File, uploadNode) if err != nil { return fmt.Errorf("uploading file: %w", err) @@ -168,7 +161,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo Error: err, }) - adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) + adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadNode.Node.NodeID)) return nil }() if err != nil { diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 531d0e0..7b5ee66 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -46,11 +46,7 @@ type UserStorage struct { StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` } -type Bucket struct { - BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"` - Name string `db:"Name" json:"name"` - CreatorID cdssdk.UserID `db:"CreatorID" json:"creatorID"` -} +type Bucket = cdssdk.Bucket type Package = cdssdk.Package diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 24e3941..20c1afb 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -43,10 +43,10 @@ func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.Packag return objIDs, nil } -func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" +func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime) values(?,?,?,?,?,?,?)" - ret, err := ctx.Exec(sql, packageID, path, size, redundancy) + ret, err := ctx.Exec(sql, obj.PackageID, obj.Path, obj.Size, obj.FileHash, obj.Redundancy, obj.UpdateTime, obj.UpdateTime) if err != nil { return 0, fmt.Errorf("insert object failed, err: %w", err) } @@ -56,64 +56,18 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path stri return 0, fmt.Errorf("get id of inserted object failed, err: %w", err) } - return objectID, nil + return cdssdk.ObjectID(objectID), nil } -// 创建或者更新记录,返回值true代表是创建,false代表是更新 -func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string) (cdssdk.ObjectID, bool, error) { - // 首次上传Object时,默认不启用冗余,即使是在更新一个已有的Object也是如此 - defRed := cdssdk.NewNoneRedundancy() - - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?) on duplicate key update Size = ?, FileHash = ?, Redundancy = ?" - - ret, err := ctx.Exec(sql, packageID, path, size, fileHash, defRed, size, fileHash, defRed) - if err != nil { - return 0, false, fmt.Errorf("insert object failed, err: %w", err) - } - - affs, err := ret.RowsAffected() - if err != nil { - return 0, false, fmt.Errorf("getting affected rows: %w", err) - } - - // 影响行数为1时是插入,为2时是更新 - if affs == 1 { - objectID, err := ret.LastInsertId() - if err != nil { - return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err) - } - return cdssdk.ObjectID(objectID), true, nil - } - - var objID cdssdk.ObjectID - if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil { - return 0, false, fmt.Errorf("getting object id: %w", err) - } - - return objID, false, nil -} - -// 批量创建或者更新记录 +// 可以用于批量创建或者更新记录 +// 用于创建时,需要额外检查PackageID+Path的唯一性 +// 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" + - " values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" + - " on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)" - - return BatchNamedExec(ctx, sql, 5, objs, nil) -} - -func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { - ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) - if err != nil { - return false, err - } - - cnt, err := ret.RowsAffected() - if err != nil { - return false, fmt.Errorf("get affected rows failed, err: %w", err) - } + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + + " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" - return cnt > 0, nil + return BatchNamedExec(ctx, sql, 7, objs, nil) } func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { @@ -183,6 +137,8 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] Size: add.Size, FileHash: add.FileHash, Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 + CreateTime: add.UploadTime, + UpdateTime: add.UploadTime, }) } @@ -256,9 +212,9 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb // 目前只能使用这种方式来同时更新大量数据 err := BatchNamedExec(ctx, - "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+ - " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+ - " on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil) + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index e831069..3d01e5d 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -1,6 +1,8 @@ package coordinator import ( + "time" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -92,10 +94,11 @@ type UpdatePackageResp struct { mq.MessageBodyBase } type AddObjectEntry struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash string `json:"fileHash"` - NodeID cdssdk.NodeID `json:"nodeID"` + Path string `json:"path"` + Size int64 `json:"size,string"` + FileHash string `json:"fileHash"` + UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 + NodeID cdssdk.NodeID `json:"nodeID"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -108,12 +111,13 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes func NewUpdatePackageResp() *UpdatePackageResp { return &UpdatePackageResp{} } -func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectEntry { +func NewAddObjectEntry(path string, size int64, fileHash string, uploadTime time.Time, nodeID cdssdk.NodeID) AddObjectEntry { return AddObjectEntry{ - Path: path, - Size: size, - FileHash: fileHash, - NodeID: nodeIDs, + Path: path, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + NodeID: nodeID, } } func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index b1cc02c..9d82041 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -74,7 +74,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } // TODO UserID - getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(0)) + getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(1)) if err != nil { log.Warnf("getting all nodes: %s", err.Error()) return