| @@ -9,6 +9,7 @@ import ( | |||||
| "github.com/jedib0t/go-pretty/v6/table" | "github.com/jedib0t/go-pretty/v6/table" | ||||
| "gitlink.org.cn/cloudream/common/models" | "gitlink.org.cn/cloudream/common/models" | ||||
| "gitlink.org.cn/cloudream/storage-client/internal/config" | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | ||||
| ) | ) | ||||
| @@ -190,7 +191,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, | |||||
| } | } | ||||
| objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) | ||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName)) | |||||
| taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize)) | |||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("upload file data failed, err: %w", err) | return fmt.Errorf("upload file data failed, err: %w", err) | ||||
| @@ -9,7 +9,6 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| "gitlink.org.cn/cloudream/common/models" | "gitlink.org.cn/cloudream/common/models" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/common/utils/serder" | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | ||||
| ) | ) | ||||
| @@ -49,11 +48,12 @@ func (s *PackageService) Upload(ctx *gin.Context) { | |||||
| return | return | ||||
| } | } | ||||
| switch req.Info.Redundancy.Type { | |||||
| case models.RedundancyRep: | |||||
| if req.Info.Redundancy.IsRepInfo() { | |||||
| s.uploadRep(ctx, &req) | s.uploadRep(ctx, &req) | ||||
| return | return | ||||
| case models.RedundancyEC: | |||||
| } | |||||
| if req.Info.Redundancy.IsECInfo() { | |||||
| s.uploadEC(ctx, &req) | s.uploadEC(ctx, &req) | ||||
| return | return | ||||
| } | } | ||||
| @@ -64,8 +64,9 @@ func (s *PackageService) Upload(ctx *gin.Context) { | |||||
| func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | ||||
| log := logger.WithField("HTTP", "Package.Upload") | log := logger.WithField("HTTP", "Package.Upload") | ||||
| var err error | |||||
| var repInfo models.RepRedundancyInfo | var repInfo models.RepRedundancyInfo | ||||
| if err := serder.AnyToAny(req.Info.Redundancy.Info, &repInfo); err != nil { | |||||
| if repInfo, err = req.Info.Redundancy.ToRepInfo(); err != nil { | |||||
| log.Warnf("parsing rep redundancy config: %s", err.Error()) | log.Warnf("parsing rep redundancy config: %s", err.Error()) | ||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) | ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) | ||||
| return | return | ||||
| @@ -107,8 +108,9 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { | |||||
| func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { | ||||
| log := logger.WithField("HTTP", "Package.Upload") | log := logger.WithField("HTTP", "Package.Upload") | ||||
| var err error | |||||
| var ecInfo models.ECRedundancyInfo | var ecInfo models.ECRedundancyInfo | ||||
| if err := serder.AnyToAny(req.Info.Redundancy.Info, &ecInfo); err != nil { | |||||
| if ecInfo, err = req.Info.Redundancy.ToECInfo(); err != nil { | |||||
| log.Warnf("parsing ec redundancy config: %s", err.Error()) | log.Warnf("parsing ec redundancy config: %s", err.Error()) | ||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) | ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) | ||||
| return | return | ||||
| @@ -5,8 +5,6 @@ import ( | |||||
| "time" | "time" | ||||
| "gitlink.org.cn/cloudream/common/models" | "gitlink.org.cn/cloudream/common/models" | ||||
| "gitlink.org.cn/cloudream/common/utils/serder" | |||||
| "gitlink.org.cn/cloudream/storage-client/internal/config" | |||||
| mytask "gitlink.org.cn/cloudream/storage-client/internal/task" | mytask "gitlink.org.cn/cloudream/storage-client/internal/task" | ||||
| "gitlink.org.cn/cloudream/storage-common/globals" | "gitlink.org.cn/cloudream/storage-common/globals" | ||||
| agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | ||||
| @@ -61,7 +59,7 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera | |||||
| return nil, fmt.Errorf("getting package objects: %w", err) | return nil, fmt.Errorf("getting package objects: %w", err) | ||||
| } | } | ||||
| if getPkgResp.Redundancy.Type == models.RedundancyRep { | |||||
| if getPkgResp.Redundancy.IsRepInfo() { | |||||
| iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli) | iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -108,21 +106,18 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. | |||||
| return nil, fmt.Errorf("getting package object ec data: %w", err) | return nil, fmt.Errorf("getting package object ec data: %w", err) | ||||
| } | } | ||||
| var ecRed models.ECRedundancyInfo | |||||
| if err := serder.AnyToAny(pkg.Redundancy.Info, &ecRed); err != nil { | |||||
| var ecInfo models.ECRedundancyInfo | |||||
| if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil { | |||||
| return nil, fmt.Errorf("get ec redundancy info: %w", err) | return nil, fmt.Errorf("get ec redundancy info: %w", err) | ||||
| } | } | ||||
| getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) | |||||
| getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName)) | |||||
| if err != nil { | if err != nil { | ||||
| return nil, fmt.Errorf("getting ec: %w", err) | return nil, fmt.Errorf("getting ec: %w", err) | ||||
| } | } | ||||
| iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{ | |||||
| DownloadContext: &iterator.DownloadContext{ | |||||
| Distlock: svc.DistLock, | |||||
| }, | |||||
| ECPacketSize: config.Cfg().ECPacketSize, | |||||
| iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{ | |||||
| Distlock: svc.DistLock, | |||||
| }) | }) | ||||
| return iter, nil | return iter, nil | ||||
| @@ -4,7 +4,6 @@ import ( | |||||
| "time" | "time" | ||||
| "gitlink.org.cn/cloudream/common/models" | "gitlink.org.cn/cloudream/common/models" | ||||
| "gitlink.org.cn/cloudream/storage-client/internal/config" | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | ||||
| ) | ) | ||||
| @@ -24,11 +23,8 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera | |||||
| } | } | ||||
| func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { | func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { | ||||
| ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ | |||||
| UpdatePackageContext: &cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }, | |||||
| ECPacketSize: config.Cfg().ECPacketSize, | |||||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }) | }) | ||||
| t.Result = ret | t.Result = ret | ||||
| @@ -3,7 +3,6 @@ package task | |||||
| import ( | import ( | ||||
| "time" | "time" | ||||
| "gitlink.org.cn/cloudream/storage-client/internal/config" | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | ||||
| ) | ) | ||||
| @@ -23,11 +22,8 @@ func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.Uploa | |||||
| } | } | ||||
| func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { | func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { | ||||
| ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ | |||||
| UpdatePackageContext: &cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }, | |||||
| ECPacketSize: config.Cfg().ECPacketSize, | |||||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||||
| Distlock: ctx.distlock, | |||||
| }) | }) | ||||
| t.Result = ret | t.Result = ret | ||||