| @@ -1,26 +1,39 @@ | |||||
| package task | package task | ||||
| import ( | import ( | ||||
| "fmt" | |||||
| "time" | "time" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/task" | "gitlink.org.cn/cloudream/common/pkgs/task" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| ) | ) | ||||
| type CreatePackageResult = cmd.CreatePackageResult | |||||
| type CreatePackageResult struct { | |||||
| PackageID cdssdk.PackageID | |||||
| Objects []cmd.ObjectUploadResult | |||||
| } | |||||
| type CreatePackage struct { | type CreatePackage struct { | ||||
| cmd cmd.CreatePackage | |||||
| Result *CreatePackageResult | |||||
| userID cdssdk.UserID | |||||
| bucketID cdssdk.BucketID | |||||
| name string | |||||
| objIter iterator.UploadingObjectIterator | |||||
| nodeAffinity *cdssdk.NodeID | |||||
| Result *CreatePackageResult | |||||
| } | } | ||||
| func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { | func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { | ||||
| return &CreatePackage{ | return &CreatePackage{ | ||||
| cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), | |||||
| userID: userID, | |||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| objIter: objIter, | |||||
| nodeAffinity: nodeAffinity, | |||||
| } | } | ||||
| } | } | ||||
| @@ -29,12 +42,43 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c | |||||
| log.Debugf("begin") | log.Debugf("begin") | ||||
| defer log.Debugf("end") | defer log.Debugf("end") | ||||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| err = fmt.Errorf("new coordinator client: %w", err) | |||||
| log.Warn(err.Error()) | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| return | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| createResp, err := coorCli.CreatePackage(coordinator.NewCreatePackage(t.userID, t.bucketID, t.name)) | |||||
| if err != nil { | |||||
| err = fmt.Errorf("creating package: %w", err) | |||||
| log.Error(err.Error()) | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| return | |||||
| } | |||||
| uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ | |||||
| Distlock: ctx.distlock, | Distlock: ctx.distlock, | ||||
| }) | }) | ||||
| t.Result = ret | |||||
| if err != nil { | |||||
| err = fmt.Errorf("uploading objects: %w", err) | |||||
| log.Error(err.Error()) | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| return | |||||
| } | |||||
| t.Result.PackageID = createResp.PackageID | |||||
| t.Result.Objects = uploadRet.Objects | |||||
| complete(err, CompleteOption{ | |||||
| complete(nil, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | RemovingDelay: time.Minute, | ||||
| }) | }) | ||||
| } | } | ||||
| @@ -38,3 +38,8 @@ func (c *Commandline) DispatchCommand(allArgs []string) { | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| } | } | ||||
| func MustAddCmd(fn any, prefixWords ...string) any { | |||||
| commands.MustAdd(fn, prefixWords...) | |||||
| return nil | |||||
| } | |||||
| @@ -0,0 +1,58 @@ | |||||
| package cmdline | |||||
| import ( | |||||
| "fmt" | |||||
| "os" | |||||
| "path/filepath" | |||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| ) | |||||
| var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, nodeAffinity []cdssdk.NodeID) error { | |||||
| userID := cdssdk.UserID(1) | |||||
| var uploadFilePathes []string | |||||
| err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | |||||
| if err != nil { | |||||
| return nil | |||||
| } | |||||
| if !fi.IsDir() { | |||||
| uploadFilePathes = append(uploadFilePathes, fname) | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | |||||
| } | |||||
| var nodeAff *cdssdk.NodeID | |||||
| if len(nodeAffinity) > 0 { | |||||
| n := cdssdk.NodeID(nodeAffinity[0]) | |||||
| nodeAff = &n | |||||
| } | |||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | |||||
| taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploading(userID, packageID, objIter, nodeAff) | |||||
| if err != nil { | |||||
| return fmt.Errorf("update objects to package %d failed, err: %w", packageID, err) | |||||
| } | |||||
| for { | |||||
| complete, _, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) | |||||
| if complete { | |||||
| if err != nil { | |||||
| return fmt.Errorf("uploading objects: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| if err != nil { | |||||
| return fmt.Errorf("wait updating: %w", err) | |||||
| } | |||||
| } | |||||
| }, "obj", "upload") | |||||
| @@ -5,7 +5,6 @@ import ( | |||||
| "io" | "io" | ||||
| "os" | "os" | ||||
| "path/filepath" | "path/filepath" | ||||
| "time" | |||||
| "github.com/jedib0t/go-pretty/v6/table" | "github.com/jedib0t/go-pretty/v6/table" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| @@ -13,7 +12,7 @@ import ( | |||||
| ) | ) | ||||
| func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { | func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { | ||||
| userID := cdssdk.UserID(0) | |||||
| userID := cdssdk.UserID(1) | |||||
| packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) | packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -34,13 +33,15 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err | |||||
| } | } | ||||
| func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { | func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { | ||||
| userID := cdssdk.UserID(1) | |||||
| err := os.MkdirAll(outputDir, os.ModePerm) | err := os.MkdirAll(outputDir, os.ModePerm) | ||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) | return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) | ||||
| } | } | ||||
| // 下载文件 | // 下载文件 | ||||
| objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID) | |||||
| objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(userID, packageID) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("download object failed, err: %w", err) | return fmt.Errorf("download object failed, err: %w", err) | ||||
| } | } | ||||
| @@ -91,108 +92,20 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp | |||||
| return nil | return nil | ||||
| } | } | ||||
| func PackageCreatePackage(ctx CommandContext, name string, rootPath string, bucketID cdssdk.BucketID, nodeAffinity []cdssdk.NodeID) error { | |||||
| rootPath = filepath.Clean(rootPath) | |||||
| var uploadFilePathes []string | |||||
| err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | |||||
| if err != nil { | |||||
| return nil | |||||
| } | |||||
| if !fi.IsDir() { | |||||
| uploadFilePathes = append(uploadFilePathes, fname) | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | |||||
| } | |||||
| var nodeAff *cdssdk.NodeID | |||||
| if len(nodeAffinity) > 0 { | |||||
| n := cdssdk.NodeID(nodeAffinity[0]) | |||||
| nodeAff = &n | |||||
| } | |||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | |||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingPackage(0, bucketID, name, objIter, nodeAff) | |||||
| func PackageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string) error { | |||||
| userID := cdssdk.UserID(1) | |||||
| pkgID, err := ctx.Cmdline.Svc.PackageSvc().Create(userID, bucketID, name) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("upload file data failed, err: %w", err) | |||||
| } | |||||
| for { | |||||
| complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) | |||||
| if complete { | |||||
| if err != nil { | |||||
| return fmt.Errorf("uploading package: %w", err) | |||||
| } | |||||
| tb := table.NewWriter() | |||||
| tb.AppendHeader(table.Row{"Path", "ObjectID"}) | |||||
| for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { | |||||
| tb.AppendRow(table.Row{ | |||||
| uploadObjectResult.ObjectResults[i].Info.Path, | |||||
| uploadObjectResult.ObjectResults[i].ObjectID, | |||||
| }) | |||||
| } | |||||
| fmt.Print(tb.Render()) | |||||
| fmt.Printf("\n%v", uploadObjectResult.PackageID) | |||||
| return nil | |||||
| } | |||||
| if err != nil { | |||||
| return fmt.Errorf("wait uploading: %w", err) | |||||
| } | |||||
| } | |||||
| } | |||||
| func PackageUpdatePackage(ctx CommandContext, packageID cdssdk.PackageID, rootPath string) error { | |||||
| //userID := int64(0) | |||||
| var uploadFilePathes []string | |||||
| err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { | |||||
| if err != nil { | |||||
| return nil | |||||
| } | |||||
| if !fi.IsDir() { | |||||
| uploadFilePathes = append(uploadFilePathes, fname) | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) | |||||
| } | |||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | |||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingPackage(0, packageID, objIter) | |||||
| if err != nil { | |||||
| return fmt.Errorf("update package %d failed, err: %w", packageID, err) | |||||
| return err | |||||
| } | } | ||||
| for { | |||||
| complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingPackage(taskID, time.Second*5) | |||||
| if complete { | |||||
| if err != nil { | |||||
| return fmt.Errorf("updating package: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| if err != nil { | |||||
| return fmt.Errorf("wait updating: %w", err) | |||||
| } | |||||
| } | |||||
| fmt.Printf("%v\n", pkgID) | |||||
| return nil | |||||
| } | } | ||||
| func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { | func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { | ||||
| userID := cdssdk.UserID(0) | |||||
| userID := cdssdk.UserID(1) | |||||
| err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) | err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) | ||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("delete package %d failed, err: %w", packageID, err) | return fmt.Errorf("delete package %d failed, err: %w", packageID, err) | ||||
| @@ -201,7 +114,7 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error | |||||
| } | } | ||||
| func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { | func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { | ||||
| userID := cdssdk.UserID(0) | |||||
| userID := cdssdk.UserID(1) | |||||
| resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) | resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) | ||||
| fmt.Printf("resp: %v\n", resp) | fmt.Printf("resp: %v\n", resp) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -211,7 +124,7 @@ func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error | |||||
| } | } | ||||
| func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { | func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { | ||||
| userID := cdssdk.UserID(0) | |||||
| userID := cdssdk.UserID(1) | |||||
| nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) | nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) | ||||
| fmt.Printf("nodeIDs: %v\n", nodeIDs) | fmt.Printf("nodeIDs: %v\n", nodeIDs) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -227,8 +140,6 @@ func init() { | |||||
| commands.MustAdd(PackageCreatePackage, "pkg", "new") | commands.MustAdd(PackageCreatePackage, "pkg", "new") | ||||
| commands.MustAdd(PackageUpdatePackage, "pkg", "update") | |||||
| commands.MustAdd(PackageDeletePackage, "pkg", "delete") | commands.MustAdd(PackageDeletePackage, "pkg", "delete") | ||||
| commands.MustAdd(PackageGetCachedNodes, "pkg", "cached") | commands.MustAdd(PackageGetCachedNodes, "pkg", "cached") | ||||
| @@ -0,0 +1,61 @@ | |||||
| package http | |||||
| import ( | |||||
| "net/http" | |||||
| "github.com/gin-gonic/gin" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| type BucketService struct { | |||||
| *Server | |||||
| } | |||||
| func (s *Server) Bucket() *BucketService { | |||||
| return &BucketService{ | |||||
| Server: s, | |||||
| } | |||||
| } | |||||
| func (s *BucketService) Create(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Bucket.Create") | |||||
| var req cdssdk.BucketCreateReq | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.BucketName) | |||||
| if err != nil { | |||||
| log.Warnf("creating bucket: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(cdssdk.BucketCreateResp{ | |||||
| BucketID: bucketID, | |||||
| })) | |||||
| } | |||||
| func (s *BucketService) Delete(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Bucket.Delete") | |||||
| var req cdssdk.BucketDeleteReq | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| if err := s.svc.BucketSvc().DeleteBucket(req.UserID, req.BucketID); err != nil { | |||||
| log.Warnf("deleting bucket: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete bucket failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(nil)) | |||||
| } | |||||
| @@ -14,7 +14,7 @@ type CacheService struct { | |||||
| *Server | *Server | ||||
| } | } | ||||
| func (s *Server) CacheSvc() *CacheService { | |||||
| func (s *Server) Cache() *CacheService { | |||||
| return &CacheService{ | return &CacheService{ | ||||
| Server: s, | Server: s, | ||||
| } | } | ||||
| @@ -2,7 +2,9 @@ package http | |||||
| import ( | import ( | ||||
| "io" | "io" | ||||
| "mime/multipart" | |||||
| "net/http" | "net/http" | ||||
| "time" | |||||
| "github.com/gin-gonic/gin" | "github.com/gin-gonic/gin" | ||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| @@ -15,12 +17,60 @@ type ObjectService struct { | |||||
| *Server | *Server | ||||
| } | } | ||||
| func (s *Server) ObjectSvc() *ObjectService { | |||||
| func (s *Server) Object() *ObjectService { | |||||
| return &ObjectService{ | return &ObjectService{ | ||||
| Server: s, | Server: s, | ||||
| } | } | ||||
| } | } | ||||
| type ObjectUploadReq struct { | |||||
| Info cdssdk.ObjectUploadInfo `form:"info" binding:"required"` | |||||
| Files []*multipart.FileHeader `form:"files"` | |||||
| } | |||||
| func (s *ObjectService) Upload(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Object.Upload") | |||||
| var req ObjectUploadReq | |||||
| if err := ctx.ShouldBind(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| var err error | |||||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||||
| taskID, err := s.svc.ObjectSvc().StartUploading(req.Info.UserID, req.Info.PackageID, objIter, req.Info.NodeAffinity) | |||||
| if err != nil { | |||||
| log.Warnf("start uploading object task: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) | |||||
| return | |||||
| } | |||||
| for { | |||||
| complete, _, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) | |||||
| if complete { | |||||
| if err != nil { | |||||
| log.Warnf("uploading object: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading object failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(nil)) | |||||
| return | |||||
| } | |||||
| if err != nil { | |||||
| log.Warnf("waiting task: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) | |||||
| return | |||||
| } | |||||
| } | |||||
| } | |||||
| type ObjectDownloadReq struct { | type ObjectDownloadReq struct { | ||||
| UserID *cdssdk.UserID `form:"userID" binding:"required"` | UserID *cdssdk.UserID `form:"userID" binding:"required"` | ||||
| ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` | ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` | ||||
| @@ -3,7 +3,6 @@ package http | |||||
| import ( | import ( | ||||
| "mime/multipart" | "mime/multipart" | ||||
| "net/http" | "net/http" | ||||
| "time" | |||||
| "github.com/gin-gonic/gin" | "github.com/gin-gonic/gin" | ||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| @@ -19,7 +18,7 @@ type PackageService struct { | |||||
| *Server | *Server | ||||
| } | } | ||||
| func (s *Server) PackageSvc() *PackageService { | |||||
| func (s *Server) Package() *PackageService { | |||||
| return &PackageService{ | return &PackageService{ | ||||
| Server: s, | Server: s, | ||||
| } | } | ||||
| @@ -53,71 +52,25 @@ func (s *PackageService) Get(ctx *gin.Context) { | |||||
| ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) | ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) | ||||
| } | } | ||||
| type PackageUploadReq struct { | |||||
| Info PackageUploadInfo `form:"info" binding:"required"` | |||||
| Files []*multipart.FileHeader `form:"files"` | |||||
| } | |||||
| type PackageUploadInfo struct { | |||||
| UserID *cdssdk.UserID `json:"userID" binding:"required"` | |||||
| BucketID *cdssdk.BucketID `json:"bucketID" binding:"required"` | |||||
| Name string `json:"name" binding:"required"` | |||||
| NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` | |||||
| } | |||||
| type PackageUploadResp struct { | |||||
| PackageID cdssdk.PackageID `json:"packageID,string"` | |||||
| } | |||||
| func (s *PackageService) Upload(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Package.Upload") | |||||
| var req PackageUploadReq | |||||
| if err := ctx.ShouldBind(&req); err != nil { | |||||
| func (s *PackageService) Create(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "Package.Create") | |||||
| var req cdssdk.PackageCreateReq | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | log.Warnf("binding body: %s", err.Error()) | ||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ||||
| return | return | ||||
| } | } | ||||
| s.uploadEC(ctx, &req) | |||||
| } | |||||
| func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | |||||
| log := logger.WithField("HTTP", "Package.Upload") | |||||
| var err error | |||||
| objIter := mapMultiPartFileToUploadingObject(req.Files) | |||||
| taskID, err := s.svc.PackageSvc().StartCreatingPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) | |||||
| pkgID, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) | |||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("start uploading ec package task: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) | |||||
| log.Warnf("creating package: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) | |||||
| return | return | ||||
| } | } | ||||
| for { | |||||
| complete, createResult, err := s.svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) | |||||
| if complete { | |||||
| if err != nil { | |||||
| log.Warnf("uploading ec package: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed")) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(PackageUploadResp{ | |||||
| PackageID: createResult.PackageID, | |||||
| })) | |||||
| return | |||||
| } | |||||
| if err != nil { | |||||
| log.Warnf("waiting task: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) | |||||
| return | |||||
| } | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(cdssdk.PackageCreateResp{ | |||||
| PackageID: pkgID, | |||||
| })) | |||||
| } | } | ||||
| type PackageDeleteReq struct { | type PackageDeleteReq struct { | ||||
| @@ -39,18 +39,22 @@ func (s *Server) Serve() error { | |||||
| } | } | ||||
| func (s *Server) initRouters() { | func (s *Server) initRouters() { | ||||
| s.engine.GET("/object/download", s.ObjectSvc().Download) | |||||
| s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.ObjectSvc().GetPackageObjects) | |||||
| s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) | |||||
| s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) | |||||
| s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) | |||||
| s.engine.GET("/package/get", s.PackageSvc().Get) | |||||
| s.engine.POST("/package/upload", s.PackageSvc().Upload) | |||||
| s.engine.POST("/package/delete", s.PackageSvc().Delete) | |||||
| s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) | |||||
| s.engine.GET("/package/getLoadedNodes", s.PackageSvc().GetLoadedNodes) | |||||
| s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) | |||||
| s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) | |||||
| s.engine.POST("/package/delete", s.Package().Delete) | |||||
| s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes) | |||||
| s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes) | |||||
| s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) | |||||
| s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) | |||||
| s.engine.GET("/storage/getInfo", s.StorageSvc().GetInfo) | |||||
| s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) | |||||
| s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) | |||||
| s.engine.GET("/storage/getInfo", s.Storage().GetInfo) | |||||
| s.engine.POST(cdssdk.CacheMovePackagePath, s.CacheSvc().MovePackage) | |||||
| s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) | |||||
| s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) | |||||
| s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) | |||||
| } | } | ||||
| @@ -14,7 +14,7 @@ type StorageService struct { | |||||
| *Server | *Server | ||||
| } | } | ||||
| func (s *Server) StorageSvc() *StorageService { | |||||
| func (s *Server) Storage() *StorageService { | |||||
| return &StorageService{ | return &StorageService{ | ||||
| Server: s, | Server: s, | ||||
| } | } | ||||
| @@ -3,10 +3,13 @@ package services | |||||
| import ( | import ( | ||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| mytask "gitlink.org.cn/cloudream/storage/client/internal/task" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | stgglb "gitlink.org.cn/cloudream/storage/common/globals" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | ||||
| ) | ) | ||||
| @@ -18,6 +21,20 @@ func (svc *Service) ObjectSvc() *ObjectService { | |||||
| return &ObjectService{Service: svc} | return &ObjectService{Service: svc} | ||||
| } | } | ||||
| func (svc *ObjectService) StartUploading(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewUploadObjects(userID, packageID, objIter, nodeAffinity)) | |||||
| return tsk.ID(), nil | |||||
| } | |||||
| func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration) (bool, *mytask.UploadObjectsResult, error) { | |||||
| tsk := svc.TaskMgr.FindByID(taskID) | |||||
| if tsk.WaitTimeout(waitTimeout) { | |||||
| updatePkgTask := tsk.Body().(*mytask.UploadObjects) | |||||
| return true, updatePkgTask.Result, tsk.Error() | |||||
| } | |||||
| return false, nil, nil | |||||
| } | |||||
| func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { | func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { | ||||
| panic("not implement yet!") | panic("not implement yet!") | ||||
| } | } | ||||
| @@ -2,13 +2,10 @@ package services | |||||
| import ( | import ( | ||||
| "fmt" | "fmt" | ||||
| "time" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| mytask "gitlink.org.cn/cloudream/storage/client/internal/task" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | stgglb "gitlink.org.cn/cloudream/storage/common/globals" | ||||
| agtcmd "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | ||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | ||||
| @@ -37,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) | |||||
| return &getResp.Package, nil | return &getResp.Package, nil | ||||
| } | } | ||||
| func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return 0, fmt.Errorf("new coordinator client: %w", err) | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) | |||||
| if err != nil { | |||||
| return 0, fmt.Errorf("creating package: %w", err) | |||||
| } | |||||
| return resp.PackageID, nil | |||||
| } | |||||
| func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { | func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { | ||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | coorCli, err := stgglb.CoordinatorMQPool.Acquire() | ||||
| if err != nil { | if err != nil { | ||||
| @@ -56,34 +68,6 @@ func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssd | |||||
| return iter, nil | return iter, nil | ||||
| } | } | ||||
| func (svc *PackageService) StartCreatingPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity)) | |||||
| return tsk.ID(), nil | |||||
| } | |||||
| func (svc *PackageService) WaitCreatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreatePackageResult, error) { | |||||
| tsk := svc.TaskMgr.FindByID(taskID) | |||||
| if tsk.WaitTimeout(waitTimeout) { | |||||
| cteatePkgTask := tsk.Body().(*mytask.CreatePackage) | |||||
| return true, cteatePkgTask.Result, tsk.Error() | |||||
| } | |||||
| return false, nil, nil | |||||
| } | |||||
| func (svc *PackageService) StartUpdatingPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) (string, error) { | |||||
| tsk := svc.TaskMgr.StartNew(mytask.NewUpdatePackage(userID, packageID, objIter)) | |||||
| return tsk.ID(), nil | |||||
| } | |||||
| func (svc *PackageService) WaitUpdatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdatePackageResult, error) { | |||||
| tsk := svc.TaskMgr.FindByID(taskID) | |||||
| if tsk.WaitTimeout(waitTimeout) { | |||||
| updatePkgTask := tsk.Body().(*mytask.UpdatePackage) | |||||
| return true, updatePkgTask.Result, tsk.Error() | |||||
| } | |||||
| return false, nil, nil | |||||
| } | |||||
| func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error { | func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error { | ||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | coorCli, err := stgglb.CoordinatorMQPool.Acquire() | ||||
| if err != nil { | if err != nil { | ||||
| @@ -1,35 +0,0 @@ | |||||
| package task | |||||
| import ( | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/task" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| ) | |||||
| type CreatePackageResult = cmd.CreatePackageResult | |||||
| type CreatePackage struct { | |||||
| cmd cmd.CreatePackage | |||||
| Result *CreatePackageResult | |||||
| } | |||||
| func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { | |||||
| return &CreatePackage{ | |||||
| cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), | |||||
| } | |||||
| } | |||||
| func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { | |||||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }) | |||||
| t.Result = ret | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| } | |||||
| @@ -1,36 +0,0 @@ | |||||
| package task | |||||
| import ( | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/task" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| ) | |||||
| type UpdatePackageResult = cmd.UpdatePackageResult | |||||
| type UpdatePackage struct { | |||||
| cmd cmd.UpdatePackage | |||||
| Result *UpdatePackageResult | |||||
| } | |||||
| func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdatePackage { | |||||
| return &UpdatePackage{ | |||||
| cmd: *cmd.NewUpdatePackage(userID, packageID, objectIter), | |||||
| } | |||||
| } | |||||
| func (t *UpdatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { | |||||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }) | |||||
| t.Result = ret | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| } | |||||
| @@ -0,0 +1,36 @@ | |||||
| package task | |||||
| import ( | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/task" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| ) | |||||
| type UploadObjectsResult = cmd.UploadObjectsResult | |||||
| type UploadObjects struct { | |||||
| cmd cmd.UploadObjects | |||||
| Result *UploadObjectsResult | |||||
| } | |||||
| func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { | |||||
| return &UploadObjects{ | |||||
| cmd: *cmd.NewUploadObjects(userID, packageID, objectIter, nodeAffinity), | |||||
| } | |||||
| } | |||||
| func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { | |||||
| ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }) | |||||
| t.Result = ret | |||||
| complete(err, CompleteOption{ | |||||
| RemovingDelay: time.Minute, | |||||
| }) | |||||
| } | |||||
| @@ -122,6 +122,8 @@ create table Object ( | |||||
| Size bigint not null comment '对象大小(Byte)', | Size bigint not null comment '对象大小(Byte)', | ||||
| FileHash varchar(100) not null comment '完整对象的FileHash', | FileHash varchar(100) not null comment '完整对象的FileHash', | ||||
| Redundancy JSON not null comment '冗余策略', | Redundancy JSON not null comment '冗余策略', | ||||
| CreateTime timestamp not null comment '创建时间', | |||||
| UpdateTime timestamp not null comment '更新时间', | |||||
| UNIQUE KEY PackagePath (PackageID, Path) | UNIQUE KEY PackagePath (PackageID, Path) | ||||
| ) comment = '对象表'; | ) comment = '对象表'; | ||||
| @@ -1,88 +0,0 @@ | |||||
| package cmd | |||||
| import ( | |||||
| "fmt" | |||||
| "github.com/samber/lo" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| ) | |||||
| type UpdatePackage struct { | |||||
| userID cdssdk.UserID | |||||
| packageID cdssdk.PackageID | |||||
| objectIter iterator.UploadingObjectIterator | |||||
| } | |||||
| type UpdatePackageResult struct { | |||||
| ObjectResults []ObjectUploadResult | |||||
| } | |||||
| type UpdateNodeInfo struct { | |||||
| UploadNodeInfo | |||||
| HasOldObject bool | |||||
| } | |||||
| func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) *UpdatePackage { | |||||
| return &UpdatePackage{ | |||||
| userID: userID, | |||||
| packageID: packageID, | |||||
| objectIter: objIter, | |||||
| } | |||||
| } | |||||
| func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult, error) { | |||||
| defer t.objectIter.Close() | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("new coordinator client: %w", err) | |||||
| } | |||||
| getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("getting user nodes: %w", err) | |||||
| } | |||||
| userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo { | |||||
| return UploadNodeInfo{ | |||||
| Node: node, | |||||
| IsSameLocation: node.LocationID == stgglb.Local.LocationID, | |||||
| } | |||||
| }) | |||||
| // 给上传节点的IPFS加锁 | |||||
| ipfsReqBlder := reqbuilder.NewBuilder() | |||||
| // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 | |||||
| if stgglb.Local.NodeID != nil { | |||||
| ipfsReqBlder.IPFS().Buzy(*stgglb.Local.NodeID) | |||||
| } | |||||
| for _, node := range userNodes { | |||||
| if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { | |||||
| continue | |||||
| } | |||||
| ipfsReqBlder.IPFS().Buzy(node.Node.NodeID) | |||||
| } | |||||
| // TODO 加Object的Create锁,最好一次性能加多个 | |||||
| // 防止上传的副本被清除 | |||||
| ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("acquire locks failed, err: %w", err) | |||||
| } | |||||
| defer ipfsMutex.Unlock() | |||||
| rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, nil) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return &UpdatePackageResult{ | |||||
| ObjectResults: rets, | |||||
| }, nil | |||||
| } | |||||
| @@ -4,6 +4,7 @@ import ( | |||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| "math/rand" | "math/rand" | ||||
| "time" | |||||
| "github.com/samber/lo" | "github.com/samber/lo" | ||||
| @@ -18,17 +19,15 @@ import ( | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | ||||
| ) | ) | ||||
| type CreatePackage struct { | |||||
| type UploadObjects struct { | |||||
| userID cdssdk.UserID | userID cdssdk.UserID | ||||
| bucketID cdssdk.BucketID | |||||
| name string | |||||
| packageID cdssdk.PackageID | |||||
| objectIter iterator.UploadingObjectIterator | objectIter iterator.UploadingObjectIterator | ||||
| nodeAffinity *cdssdk.NodeID | nodeAffinity *cdssdk.NodeID | ||||
| } | } | ||||
| type CreatePackageResult struct { | |||||
| PackageID cdssdk.PackageID | |||||
| ObjectResults []ObjectUploadResult | |||||
| type UploadObjectsResult struct { | |||||
| Objects []ObjectUploadResult | |||||
| } | } | ||||
| type ObjectUploadResult struct { | type ObjectUploadResult struct { | ||||
| @@ -43,21 +42,20 @@ type UploadNodeInfo struct { | |||||
| IsSameLocation bool | IsSameLocation bool | ||||
| } | } | ||||
| type UpdatePackageContext struct { | |||||
| type UploadObjectsContext struct { | |||||
| Distlock *distlock.Service | Distlock *distlock.Service | ||||
| } | } | ||||
| func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { | |||||
| return &CreatePackage{ | |||||
| func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { | |||||
| return &UploadObjects{ | |||||
| userID: userID, | userID: userID, | ||||
| bucketID: bucketID, | |||||
| name: name, | |||||
| packageID: packageID, | |||||
| objectIter: objIter, | objectIter: objIter, | ||||
| nodeAffinity: nodeAffinity, | nodeAffinity: nodeAffinity, | ||||
| } | } | ||||
| } | } | ||||
| func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult, error) { | |||||
| func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) { | |||||
| defer t.objectIter.Close() | defer t.objectIter.Close() | ||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | coorCli, err := stgglb.CoordinatorMQPool.Acquire() | ||||
| @@ -65,11 +63,6 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult | |||||
| return nil, fmt.Errorf("new coordinator client: %w", err) | return nil, fmt.Errorf("new coordinator client: %w", err) | ||||
| } | } | ||||
| createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name)) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("creating package: %w", err) | |||||
| } | |||||
| getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) | getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, fmt.Errorf("getting user nodes: %w", err) | return nil, fmt.Errorf("getting user nodes: %w", err) | ||||
| @@ -103,14 +96,13 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult | |||||
| } | } | ||||
| defer ipfsMutex.Unlock() | defer ipfsMutex.Unlock() | ||||
| rets, err := uploadAndUpdatePackage(createPkgResp.PackageID, t.objectIter, userNodes, t.nodeAffinity) | |||||
| rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, t.nodeAffinity) | |||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| return &CreatePackageResult{ | |||||
| PackageID: createPkgResp.PackageID, | |||||
| ObjectResults: rets, | |||||
| return &UploadObjectsResult{ | |||||
| Objects: rets, | |||||
| }, nil | }, nil | ||||
| } | } | ||||
| @@ -158,6 +150,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo | |||||
| err = func() error { | err = func() error { | ||||
| defer objInfo.File.Close() | defer objInfo.File.Close() | ||||
| uploadTime := time.Now() | |||||
| fileHash, err := uploadFile(objInfo.File, uploadNode) | fileHash, err := uploadFile(objInfo.File, uploadNode) | ||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("uploading file: %w", err) | return fmt.Errorf("uploading file: %w", err) | ||||
| @@ -168,7 +161,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo | |||||
| Error: err, | Error: err, | ||||
| }) | }) | ||||
| adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) | |||||
| adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadNode.Node.NodeID)) | |||||
| return nil | return nil | ||||
| }() | }() | ||||
| if err != nil { | if err != nil { | ||||
| @@ -46,11 +46,7 @@ type UserStorage struct { | |||||
| StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` | StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` | ||||
| } | } | ||||
| type Bucket struct { | |||||
| BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"` | |||||
| Name string `db:"Name" json:"name"` | |||||
| CreatorID cdssdk.UserID `db:"CreatorID" json:"creatorID"` | |||||
| } | |||||
| type Bucket = cdssdk.Bucket | |||||
| type Package = cdssdk.Package | type Package = cdssdk.Package | ||||
| @@ -43,10 +43,10 @@ func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.Packag | |||||
| return objIDs, nil | return objIDs, nil | ||||
| } | } | ||||
| func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { | |||||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" | |||||
| func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { | |||||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime) values(?,?,?,?,?,?,?)" | |||||
| ret, err := ctx.Exec(sql, packageID, path, size, redundancy) | |||||
| ret, err := ctx.Exec(sql, obj.PackageID, obj.Path, obj.Size, obj.FileHash, obj.Redundancy, obj.UpdateTime, obj.UpdateTime) | |||||
| if err != nil { | if err != nil { | ||||
| return 0, fmt.Errorf("insert object failed, err: %w", err) | return 0, fmt.Errorf("insert object failed, err: %w", err) | ||||
| } | } | ||||
| @@ -56,64 +56,18 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path stri | |||||
| return 0, fmt.Errorf("get id of inserted object failed, err: %w", err) | return 0, fmt.Errorf("get id of inserted object failed, err: %w", err) | ||||
| } | } | ||||
| return objectID, nil | |||||
| return cdssdk.ObjectID(objectID), nil | |||||
| } | } | ||||
| // 创建或者更新记录,返回值true代表是创建,false代表是更新 | |||||
| func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string) (cdssdk.ObjectID, bool, error) { | |||||
| // 首次上传Object时,默认不启用冗余,即使是在更新一个已有的Object也是如此 | |||||
| defRed := cdssdk.NewNoneRedundancy() | |||||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?) on duplicate key update Size = ?, FileHash = ?, Redundancy = ?" | |||||
| ret, err := ctx.Exec(sql, packageID, path, size, fileHash, defRed, size, fileHash, defRed) | |||||
| if err != nil { | |||||
| return 0, false, fmt.Errorf("insert object failed, err: %w", err) | |||||
| } | |||||
| affs, err := ret.RowsAffected() | |||||
| if err != nil { | |||||
| return 0, false, fmt.Errorf("getting affected rows: %w", err) | |||||
| } | |||||
| // 影响行数为1时是插入,为2时是更新 | |||||
| if affs == 1 { | |||||
| objectID, err := ret.LastInsertId() | |||||
| if err != nil { | |||||
| return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err) | |||||
| } | |||||
| return cdssdk.ObjectID(objectID), true, nil | |||||
| } | |||||
| var objID cdssdk.ObjectID | |||||
| if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil { | |||||
| return 0, false, fmt.Errorf("getting object id: %w", err) | |||||
| } | |||||
| return objID, false, nil | |||||
| } | |||||
| // 批量创建或者更新记录 | |||||
| // 可以用于批量创建或者更新记录 | |||||
| // 用于创建时,需要额外检查PackageID+Path的唯一性 | |||||
| // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 | |||||
| func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { | func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { | ||||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" + | |||||
| " values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" + | |||||
| " on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)" | |||||
| return BatchNamedExec(ctx, sql, 5, objs, nil) | |||||
| } | |||||
| func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { | |||||
| ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) | |||||
| if err != nil { | |||||
| return false, err | |||||
| } | |||||
| cnt, err := ret.RowsAffected() | |||||
| if err != nil { | |||||
| return false, fmt.Errorf("get affected rows failed, err: %w", err) | |||||
| } | |||||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + | |||||
| " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + | |||||
| " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" | |||||
| return cnt > 0, nil | |||||
| return BatchNamedExec(ctx, sql, 7, objs, nil) | |||||
| } | } | ||||
| func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { | func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { | ||||
| @@ -183,6 +137,8 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] | |||||
| Size: add.Size, | Size: add.Size, | ||||
| FileHash: add.FileHash, | FileHash: add.FileHash, | ||||
| Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 | Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 | ||||
| CreateTime: add.UploadTime, | |||||
| UpdateTime: add.UploadTime, | |||||
| }) | }) | ||||
| } | } | ||||
| @@ -256,9 +212,9 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb | |||||
| // 目前只能使用这种方式来同时更新大量数据 | // 目前只能使用这种方式来同时更新大量数据 | ||||
| err := BatchNamedExec(ctx, | err := BatchNamedExec(ctx, | ||||
| "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+ | |||||
| " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+ | |||||
| " on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil) | |||||
| "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ | |||||
| " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ | |||||
| " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("batch update object redundancy: %w", err) | return fmt.Errorf("batch update object redundancy: %w", err) | ||||
| } | } | ||||
| @@ -1,6 +1,8 @@ | |||||
| package coordinator | package coordinator | ||||
| import ( | import ( | ||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| @@ -92,10 +94,11 @@ type UpdatePackageResp struct { | |||||
| mq.MessageBodyBase | mq.MessageBodyBase | ||||
| } | } | ||||
| type AddObjectEntry struct { | type AddObjectEntry struct { | ||||
| Path string `json:"path"` | |||||
| Size int64 `json:"size,string"` | |||||
| FileHash string `json:"fileHash"` | |||||
| NodeID cdssdk.NodeID `json:"nodeID"` | |||||
| Path string `json:"path"` | |||||
| Size int64 `json:"size,string"` | |||||
| FileHash string `json:"fileHash"` | |||||
| UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 | |||||
| NodeID cdssdk.NodeID `json:"nodeID"` | |||||
| } | } | ||||
| func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { | func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { | ||||
| @@ -108,12 +111,13 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes | |||||
| func NewUpdatePackageResp() *UpdatePackageResp { | func NewUpdatePackageResp() *UpdatePackageResp { | ||||
| return &UpdatePackageResp{} | return &UpdatePackageResp{} | ||||
| } | } | ||||
| func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectEntry { | |||||
| func NewAddObjectEntry(path string, size int64, fileHash string, uploadTime time.Time, nodeID cdssdk.NodeID) AddObjectEntry { | |||||
| return AddObjectEntry{ | return AddObjectEntry{ | ||||
| Path: path, | |||||
| Size: size, | |||||
| FileHash: fileHash, | |||||
| NodeID: nodeIDs, | |||||
| Path: path, | |||||
| Size: size, | |||||
| FileHash: fileHash, | |||||
| UploadTime: uploadTime, | |||||
| NodeID: nodeID, | |||||
| } | } | ||||
| } | } | ||||
| func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { | func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { | ||||
| @@ -74,7 +74,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { | |||||
| } | } | ||||
| // TODO UserID | // TODO UserID | ||||
| getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(0)) | |||||
| getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(1)) | |||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("getting all nodes: %s", err.Error()) | log.Warnf("getting all nodes: %s", err.Error()) | ||||
| return | return | ||||