diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go deleted file mode 100644 index 96f4f5c..0000000 --- a/internal/cmdline/object.go +++ /dev/null @@ -1,320 +0,0 @@ -package cmdline - -import ( - "fmt" - "io" - "os" - "path/filepath" - "time" - - "github.com/jedib0t/go-pretty/v6/table" - "gitlink.org.cn/cloudream/storage-client/internal/task" -) - -func ObjectListBucketObjects(ctx CommandContext, bucketID int64) error { - userID := int64(0) - - objects, err := ctx.Cmdline.Svc.BucketSvc().GetBucketObjects(userID, bucketID) - if err != nil { - return err - } - - fmt.Printf("Find %d objects in bucket %d for user %d:\n", len(objects), bucketID, userID) - - tb := table.NewWriter() - tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy"}) - - for _, obj := range objects { - tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.FileSize, obj.BucketID, obj.State, obj.Redundancy}) - } - - fmt.Print(tb.Render()) - return nil -} - -func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int64) error { - // 创建本地文件 - outputFileDir := filepath.Dir(localFilePath) - - err := os.MkdirAll(outputFileDir, os.ModePerm) - if err != nil { - return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) - } - - outputFile, err := os.Create(localFilePath) - if err != nil { - return fmt.Errorf("create output file %s failed, err: %w", localFilePath, err) - } - defer outputFile.Close() - - // 下载文件 - reader, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObject(0, objectID) - if err != nil { - return fmt.Errorf("download object failed, err: %w", err) - } - defer reader.Close() - - _, err = io.Copy(outputFile, reader) - if err != nil { - return fmt.Errorf("copy object data to local file failed, err: %w", err) - } - - return nil -} - -func ObjectDownloadObjectDir(ctx CommandContext, outputBaseDir string, dirName string) error { - // 创建本地文件夹 - err := os.MkdirAll(outputBaseDir, os.ModePerm) - if err != nil { - return fmt.Errorf("create output base directory %s failed, err: %w", outputBaseDir, err) - } - - // 下载文件夹 - resObjs, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObjectDir(0, dirName) - if err != nil { - return fmt.Errorf("download folder failed, err: %w", err) - } - - // 遍历 关闭文件流 - defer func() { - for _, resObj := range resObjs { - resObj.Reader.Close() - } - }() - - for i := 0; i < len(resObjs); i++ { - if resObjs[i].Error != nil { - fmt.Printf("download file %s failed, err: %s", resObjs[i].ObjectName, err.Error()) - continue - } - outputFilePath := filepath.Join(outputBaseDir, resObjs[i].ObjectName) - outputFileDir := filepath.Dir(outputFilePath) - err = os.MkdirAll(outputFileDir, os.ModePerm) - if err != nil { - fmt.Printf("create output file directory %s failed, err: %s", outputFileDir, err.Error()) - continue - } - - outputFile, err := os.Create(outputFilePath) - if err != nil { - fmt.Printf("create output file %s failed, err: %s", outputFilePath, err.Error()) - continue - } - defer outputFile.Close() - - _, err = io.Copy(outputFile, resObjs[i].Reader) - if err != nil { - // TODO 写入到文件失败,是否要考虑删除这个不完整的文件? - fmt.Printf("copy object data to local file failed, err: %s", err.Error()) - continue - } - } - return nil -} - -func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID int64, objectName string, repCount int) error { - file, err := os.Open(localFilePath) - if err != nil { - return fmt.Errorf("open file %s failed, err: %w", localFilePath, err) - } - defer file.Close() - - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) - } - fileSize := fileInfo.Size() - - uploadObject := task.UploadObject{ - ObjectName: objectName, - File: file, - FileSize: fileSize, - } - uploadObjects := []task.UploadObject{uploadObject} - - taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadObjects, repCount) - if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("uploading rep object: %w", err) - } - - uploadRet := uploadObjectResult.Results[0] - if uploadRet.Error != nil { - return uploadRet.Error - } - - fmt.Print(uploadRet.FileHash) - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } -} - -func ObjectUploadEcObject(ctx CommandContext, localFilePath string, bucketID int64, objectName string, ecName string) error { - // TODO 参考rep的,改成异步流程 - file, err := os.Open(localFilePath) - if err != nil { - return fmt.Errorf("open file %s failed, err: %w", localFilePath, err) - } - - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) - } - fileSize := fileInfo.Size() - - err = ctx.Cmdline.Svc.ObjectSvc().UploadEcObject(0, bucketID, objectName, file, fileSize, ecName) - if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - return nil -} - -func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID int64, repCount int) error { - var uploadFiles []task.UploadObject - var uploadFile task.UploadObject - err := filepath.Walk(localDirPath, func(fname string, fi os.FileInfo, err error) error { - if !fi.IsDir() { - file, err := os.Open(fname) - if err != nil { - return fmt.Errorf("open file %s failed, err: %w", fname, err) - } - uploadFile = task.UploadObject{ - ObjectName: filepath.ToSlash(fname), - File: file, - FileSize: fi.Size(), - } - uploadFiles = append(uploadFiles, uploadFile) - } - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", localDirPath, err) - } - - // 遍历 关闭文件流 - defer func() { - for _, uploadFile := range uploadFiles { - uploadFile.File.Close() - } - }() - - taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount) - - if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("uploading rep object: %w", err) - } - - tb := table.NewWriter() - if uploadObjectResult.IsUploading { - - tb.AppendHeader(table.Row{"ObjectName", "ObjectID", "FileHash"}) - for i := 0; i < len(uploadObjectResult.Objects); i++ { - tb.AppendRow(table.Row{ - uploadObjectResult.Objects[i].ObjectName, - uploadObjectResult.Results[i].ObjectID, - uploadObjectResult.Results[i].FileHash, - }) - } - fmt.Print(tb.Render()) - - } else { - fmt.Println("The folder upload failed. Some files do not meet the upload requirements.") - - tb.AppendHeader(table.Row{"ObjectName", "Error"}) - for i := 0; i < len(uploadObjectResult.Objects); i++ { - if uploadObjectResult.Results[i].Error != nil { - tb.AppendRow(table.Row{uploadObjectResult.Objects[i].ObjectName, uploadObjectResult.Results[i].Error}) - } - } - fmt.Print(tb.Render()) - } - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } -} - -func ObjectUpdateRepObject(ctx CommandContext, objectID int64, filePath string) error { - userID := int64(0) - - file, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("open file %s failed, err: %w", filePath, err) - } - defer file.Close() - - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("get file %s state failed, err: %w", filePath, err) - } - fileSize := fileInfo.Size() - - taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUpdatingRepObject(userID, objectID, file, fileSize) - if err != nil { - return fmt.Errorf("update object %d failed, err: %w", objectID, err) - } - - for { - complete, err := ctx.Cmdline.Svc.ObjectSvc().WaitUpdatingRepObject(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("updating rep object: %w", err) - } - - return nil - } - - if err != nil { - return fmt.Errorf("wait updating: %w", err) - } - } -} - -func ObjectDeleteObject(ctx CommandContext, objectID int64) error { - userID := int64(0) - err := ctx.Cmdline.Svc.ObjectSvc().DeleteObject(userID, objectID) - if err != nil { - return fmt.Errorf("delete object %d failed, err: %w", objectID, err) - } - return nil -} - -func init() { - commands.MustAdd(ObjectListBucketObjects, "object", "ls") - - commands.MustAdd(ObjectUploadEcObject, "object", "new", "ec") - - commands.MustAdd(ObjectUploadRepObject, "object", "new", "rep") - - commands.MustAdd(ObjectUploadRepObjectDir, "object", "new", "dir") - - commands.MustAdd(ObjectDownloadObject, "object", "get") - - commands.MustAdd(ObjectDownloadObjectDir, "object", "get", "dir") - - commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep") - - commands.MustAdd(ObjectDeleteObject, "object", "delete") - -} diff --git a/internal/cmdline/package.go b/internal/cmdline/package.go new file mode 100644 index 0000000..c169a97 --- /dev/null +++ b/internal/cmdline/package.go @@ -0,0 +1,281 @@ +package cmdline + +import ( + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/jedib0t/go-pretty/v6/table" + "gitlink.org.cn/cloudream/common/models" +) + +func PackageListBucketPackages(ctx CommandContext, bucketID int64) error { + userID := int64(0) + + packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) + if err != nil { + return err + } + + fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID, userID) + + tb := table.NewWriter() + tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State", "Redundancy"}) + + for _, obj := range packages { + tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State, obj.Redundancy}) + } + + fmt.Print(tb.Render()) + return nil +} + +func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int64) error { + err := os.MkdirAll(outputDir, os.ModePerm) + if err != nil { + return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) + } + + // 下载文件 + objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID) + if err != nil { + return fmt.Errorf("download object failed, err: %w", err) + } + defer objIter.Close() + + for { + objInfo, ok := objIter.MoveNext() + if !ok { + break + } + + if objInfo.Error != nil { + return objInfo.Error + } + defer objInfo.File.Close() + + outputFile, err := os.Create(filepath.Join(outputDir, objInfo.Object.Path)) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + _, err = io.Copy(outputFile, objInfo.File) + if err != nil { + return fmt.Errorf("copy object data to local file failed, err: %w", err) + } + } + + return nil +} + +func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error { + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) + + if err != nil { + return fmt.Errorf("upload file data failed, err: %w", err) + } + + for { + complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("uploading rep object: %w", err) + } + + tb := table.NewWriter() + + tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"}) + for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { + tb.AppendRow(table.Row{ + uploadObjectResult.ObjectResults[i].Info.Path, + uploadObjectResult.ObjectResults[i].ObjectID, + uploadObjectResult.ObjectResults[i].FileHash, + }) + } + fmt.Print(tb.Render()) + return nil + } + + if err != nil { + return fmt.Errorf("wait uploading: %w", err) + } + } +} + +func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath string) error { + //userID := int64(0) + + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter) + if err != nil { + return fmt.Errorf("update object %d failed, err: %w", packageID, err) + } + + for { + complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingRepPackage(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("updating rep object: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait updating: %w", err) + } + } +} + +func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error { + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName)) + + if err != nil { + return fmt.Errorf("upload file data failed, err: %w", err) + } + + for { + complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("uploading ec package: %w", err) + } + + tb := table.NewWriter() + + tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"}) + for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { + tb.AppendRow(table.Row{ + uploadObjectResult.ObjectResults[i].Info.Path, + uploadObjectResult.ObjectResults[i].ObjectID, + uploadObjectResult.ObjectResults[i].FileHash, + }) + } + fmt.Print(tb.Render()) + return nil + } + + if err != nil { + return fmt.Errorf("wait uploading: %w", err) + } + } +} + +func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string) error { + //userID := int64(0) + + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter) + if err != nil { + return fmt.Errorf("update package %d failed, err: %w", packageID, err) + } + + for { + complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingECPackage(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("updating ec package: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait updating: %w", err) + } + } +} + +func PackageDeletePackage(ctx CommandContext, packageID int64) error { + userID := int64(0) + err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) + if err != nil { + return fmt.Errorf("delete package %d failed, err: %w", packageID, err) + } + return nil +} + +func init() { + commands.MustAdd(PackageListBucketPackages, "pkg", "ls") + + commands.MustAdd(PackageDownloadPackage, "pkg", "get") + + commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "rep") + + commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep") + + commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "ec") + + commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec") + + commands.MustAdd(PackageDeletePackage, "pkg", "delete") +} diff --git a/internal/cmdline/scanner.go b/internal/cmdline/scanner.go index 83adaaa..2014f7b 100644 --- a/internal/cmdline/scanner.go +++ b/internal/cmdline/scanner.go @@ -5,7 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/cmdtrie" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" - scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" + scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" ) var parseScannerEventCmdTrie cmdtrie.StaticCommandTrie[any] = cmdtrie.NewStaticCommandTrie[any]() @@ -33,7 +33,7 @@ func init() { parseScannerEventCmdTrie.MustAdd(scevt.NewCheckCache, myreflect.TypeNameOf[scevt.CheckCache]()) - parseScannerEventCmdTrie.MustAdd(scevt.NewCheckObject, myreflect.TypeNameOf[scevt.CheckObject]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackage, myreflect.TypeNameOf[scevt.CheckPackage]()) parseScannerEventCmdTrie.MustAdd(scevt.NewCheckRepCount, myreflect.TypeNameOf[scevt.CheckRepCount]()) diff --git a/internal/cmdline/storage.go b/internal/cmdline/storage.go index e1a15ed..266543c 100644 --- a/internal/cmdline/storage.go +++ b/internal/cmdline/storage.go @@ -3,16 +3,18 @@ package cmdline import ( "fmt" "time" + + "gitlink.org.cn/cloudream/common/models" ) -func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMoveObject(0, objectID, storageID) +func StorageMovePackage(ctx CommandContext, packageID int64, storageID int64) error { + taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMovePackage(0, packageID, storageID) if err != nil { - return fmt.Errorf("start moving object to storage: %w", err) + return fmt.Errorf("start moving package to storage: %w", err) } for { - complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10) + complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) @@ -27,46 +29,21 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro } } -func StorageMoveDir(ctx CommandContext, dirName string, storageID int64) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingDir(0, dirName, storageID) +func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, + models.NewTypedRepRedundancyInfo(repCount)) if err != nil { - return fmt.Errorf("start moving object to storage: %w", err) + return fmt.Errorf("start storage uploading rep package: %w", err) } for { - complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingDir(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("moving complete with: %w", err) - } - // 返回各object的move结果 - for _, result := range results { - if result.Error != nil { - fmt.Printf("moving %s to storage failed: %s\n", result.ObjectName, result.Error) - } - } - return nil - } - if err != nil { - return fmt.Errorf("wait moving: %w", err) - } - } -} - -func StorageUploadRepObject(ctx CommandContext, storageID int64, filePath string, bucketID int64, objectName string, repCount int) error { - nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageUploadRepObject(0, storageID, filePath, bucketID, objectName, repCount) - if err != nil { - return fmt.Errorf("start storage uploading rep object: %w", err) - } - - for { - complete, objectID, fileHash, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageUploadRepObject(nodeID, taskID, time.Second*10) + complete, packageID, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("uploading complete with: %w", err) } - fmt.Printf("%d\n%s\n", objectID, fileHash) + fmt.Printf("%d\n", packageID) return nil } @@ -77,9 +54,7 @@ func StorageUploadRepObject(ctx CommandContext, storageID int64, filePath string } func init() { - commands.MustAdd(StorageMoveObject, "storage", "move", "obj") - - commands.MustAdd(StorageMoveDir, "storage", "move", "dir") + commands.MustAdd(StorageMovePackage, "storage", "move", "pkg") - commands.MustAdd(StorageUploadRepObject, "storage", "upload", "rep") + commands.MustAdd(StorageCreateRepPackage, "storage", "upload", "rep") } diff --git a/internal/config/config.go b/internal/config/config.go index 8cee74e..c65dad9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,7 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/common/utils/ipfs" - racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { @@ -15,7 +15,7 @@ type Config struct { LocalIP string `json:"localIP"` ExternalIP string `json:"externalIP"` Logger logger.Config `json:"logger"` - RabbitMQ racfg.Config `json:"rabbitMQ"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon DistLock distlock.Config `json:"distlock"` } diff --git a/internal/http/object.go b/internal/http/object.go deleted file mode 100644 index 4843d98..0000000 --- a/internal/http/object.go +++ /dev/null @@ -1,200 +0,0 @@ -package http - -import ( - "io" - "mime/multipart" - "net/http" - "time" - - "github.com/gin-gonic/gin" - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/common/utils/serder" - "gitlink.org.cn/cloudream/storage-client/internal/task" -) - -type ObjectService struct { - *Server -} - -func (s *Server) ObjectSvc() *ObjectService { - return &ObjectService{ - Server: s, - } -} - -type ObjectDownloadReq struct { - UserID *int64 `form:"userID" binding:"required"` - ObjectID *int64 `form:"objectID" binding:"required"` -} - -func (s *ObjectService) Download(ctx *gin.Context) { - log := logger.WithField("HTTP", "Object.Download") - - var req ObjectDownloadReq - if err := ctx.ShouldBindQuery(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - file, err := s.svc.ObjectSvc().DownloadObject(*req.UserID, *req.ObjectID) - if err != nil { - log.Warnf("downloading object: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) - return - } - - ctx.Writer.WriteHeader(http.StatusOK) - // TODO 需要设置FileName - ctx.Header("Content-Disposition", "attachment; filename=filename") - ctx.Header("Content-Type", "application/octet-stream") - - buf := make([]byte, 4096) - ctx.Stream(func(w io.Writer) bool { - rd, err := file.Read(buf) - if err == io.EOF { - return false - } - - if err != nil { - log.Warnf("reading file data: %s", err.Error()) - return false - } - - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - return false - } - - return true - }) -} - -type ObjectUploadReq struct { - Info ObjectUploadInfo `form:"info" binding:"required"` - File *multipart.FileHeader `form:"file"` -} - -type ObjectUploadInfo struct { - UserID *int64 `json:"userID" binding:"required"` - BucketID *int64 `json:"bucketID" binding:"required"` - FileSize *int64 `json:"fileSize" binding:"required"` - ObjectName string `json:"objectName" binding:"required"` - Redundancy struct { - Type string `json:"type" binding:"required"` - Config any `json:"config" binding:"required"` - } `json:"redundancy" binding:"required"` -} - -type ObjectUploadResp struct { - ObjectID int64 `json:"objectID,string"` -} - -func (s *ObjectService) Upload(ctx *gin.Context) { - log := logger.WithField("HTTP", "Object.Upload") - - var req ObjectUploadReq - if err := ctx.ShouldBind(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - switch req.Info.Redundancy.Type { - case models.RedundancyRep: - s.uploadRep(ctx, &req) - return - case models.RedundancyEC: - - } - - ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet")) -} - -func (s *ObjectService) uploadRep(ctx *gin.Context, req *ObjectUploadReq) { - log := logger.WithField("HTTP", "Object.Upload") - - var repInfo models.RepRedundancyInfo - if err := serder.AnyToAny(req.Info.Redundancy.Config, &repInfo); err != nil { - log.Warnf("parsing rep redundancy config: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) - return - } - - file, err := req.File.Open() - if err != nil { - log.Warnf("opening file: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "open file failed")) - return - } - - taskID, err := s.svc.ObjectSvc().StartUploadingRepObjects(*req.Info.UserID, *req.Info.BucketID, []task.UploadObject{{ - ObjectName: req.Info.ObjectName, - File: file, - FileSize: *req.Info.FileSize, - }}, repInfo.RepCount) - - if err != nil { - log.Warnf("start uploading rep object task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) - return - } - - for { - complete, uploadObjectResult, err := s.svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) - if complete { - if err != nil { - log.Warnf("uploading rep object: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep object failed")) - return - } - - uploadRet := uploadObjectResult.Results[0] - if uploadRet.Error != nil { - log.Warnf("uploading rep object: %s", uploadRet.Error) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, uploadRet.Error.Error())) - return - } - - ctx.JSON(http.StatusOK, OK(ObjectUploadResp{ - ObjectID: uploadRet.ObjectID, - })) - return - } - - if err != nil { - log.Warnf("waiting task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) - return - } - } -} - -type ObjectDeleteReq struct { - UserID *int64 `json:"userID" binding:"required"` - ObjectID *int64 `json:"objectID" binding:"required"` -} - -func (s *ObjectService) Delete(ctx *gin.Context) { - log := logger.WithField("HTTP", "Object.Delete") - - var req ObjectDeleteReq - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - err := s.svc.ObjectSvc().DeleteObject(*req.UserID, *req.ObjectID) - if err != nil { - log.Warnf("deleting object: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete object failed")) - return - } - - ctx.JSON(http.StatusOK, OK(nil)) -} diff --git a/internal/http/package.go b/internal/http/package.go new file mode 100644 index 0000000..d2890bc --- /dev/null +++ b/internal/http/package.go @@ -0,0 +1,223 @@ +package http + +import ( + "io" + "mime/multipart" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type PackageService struct { + *Server +} + +func (s *Server) PackageSvc() *PackageService { + return &PackageService{ + Server: s, + } +} + +type PackageDownloadReq struct { + UserID *int64 `form:"userID" binding:"required"` + PackageID *int64 `form:"packageID" binding:"required"` +} + +func (s *PackageService) Download(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Download") + + var req PackageDownloadReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + file, err := s.svc.PackageSvc().DownloadPackage(*req.UserID, *req.PackageID) + if err != nil { + log.Warnf("downloading package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download package failed")) + return + } + + ctx.Writer.WriteHeader(http.StatusOK) + // TODO 需要设置FileName + ctx.Header("Content-Disposition", "attachment; filename=filename") + ctx.Header("Content-Type", "application/octet-stream") + + buf := make([]byte, 4096) + ctx.Stream(func(w io.Writer) bool { + rd, err := file.Read(buf) + if err == io.EOF { + return false + } + + if err != nil { + log.Warnf("reading file data: %s", err.Error()) + return false + } + + err = myio.WriteAll(w, buf[:rd]) + if err != nil { + log.Warnf("writing data to response: %s", err.Error()) + return false + } + + return true + }) +} + +type PackageUploadReq struct { + Info PackageUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` +} + +type PackageUploadInfo struct { + UserID *int64 `json:"userID" binding:"required"` + BucketID *int64 `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` +} + +type PackageUploadResp struct { + PackageID int64 `json:"packageID,string"` +} + +func (s *PackageService) Upload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Upload") + + var req PackageUploadReq + if err := ctx.ShouldBind(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + switch req.Info.Redundancy.Type { + case models.RedundancyRep: + s.uploadRep(ctx, &req) + return + case models.RedundancyEC: + s.uploadEC(ctx, &req) + return + } + + ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet")) +} + +func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { + log := logger.WithField("HTTP", "Package.Upload") + + var repInfo models.RepRedundancyInfo + if err := serder.AnyToAny(req.Info.Redundancy.Info, &repInfo); err != nil { + log.Warnf("parsing rep redundancy config: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) + return + } + + objIter := iterator.NewHTTPObjectIterator(req.Files) + + taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo) + + if err != nil { + log.Warnf("start uploading rep package task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + return + } + + for { + complete, createResult, err := s.svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + if complete { + if err != nil { + log.Warnf("uploading rep package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(PackageUploadResp{ + PackageID: createResult.PackageID, + })) + return + } + + if err != nil { + log.Warnf("waiting task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) + return + } + } +} + +func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { + log := logger.WithField("HTTP", "Package.Upload") + + var ecInfo models.ECRedundancyInfo + if err := serder.AnyToAny(req.Info.Redundancy.Info, &ecInfo); err != nil { + log.Warnf("parsing ec redundancy config: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) + return + } + + objIter := iterator.NewHTTPObjectIterator(req.Files) + + taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo) + + if err != nil { + log.Warnf("start uploading ec package task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + return + } + + for { + complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5) + if complete { + if err != nil { + log.Warnf("uploading ec package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(PackageUploadResp{ + PackageID: createResult.PackageID, + })) + return + } + + if err != nil { + log.Warnf("waiting task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) + return + } + } +} + +type PackageDeleteReq struct { + UserID *int64 `json:"userID" binding:"required"` + PackageID *int64 `json:"packageID" binding:"required"` +} + +func (s *PackageService) Delete(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Delete") + + var req PackageDeleteReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID) + if err != nil { + log.Warnf("deleting package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} diff --git a/internal/http/server.go b/internal/http/server.go index fc343d3..72738e0 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -38,9 +38,9 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET("/object/download", s.ObjectSvc().Download) - s.engine.POST("/object/upload", s.ObjectSvc().Upload) - s.engine.POST("/object/delete", s.ObjectSvc().Delete) + s.engine.GET("/package/download", s.PackageSvc().Download) + s.engine.POST("/package/upload", s.PackageSvc().Upload) + s.engine.POST("/package/delete", s.PackageSvc().Delete) - s.engine.POST("/storage/moveObject", s.StorageSvc().MoveObject) + s.engine.POST("/storage/movePackage", s.StorageSvc().MovePackage) } diff --git a/internal/http/storage.go b/internal/http/storage.go index ec65a03..d64065a 100644 --- a/internal/http/storage.go +++ b/internal/http/storage.go @@ -19,35 +19,35 @@ func (s *Server) StorageSvc() *StorageService { } } -type StorageMoveObjectReq struct { +type StorageMovePackageReq struct { UserID *int64 `json:"userID" binding:"required"` - ObjectID *int64 `json:"objectID" binding:"required"` + PackageID *int64 `json:"packageID" binding:"required"` StorageID *int64 `json:"storageID" binding:"required"` } -func (s *StorageService) MoveObject(ctx *gin.Context) { - log := logger.WithField("HTTP", "Storage.MoveObject") +func (s *StorageService) MovePackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.MovePackage") - var req StorageMoveObjectReq + var req StorageMovePackageReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - taskID, err := s.svc.StorageSvc().StartStorageMoveObject(*req.UserID, *req.ObjectID, *req.StorageID) + taskID, err := s.svc.StorageSvc().StartStorageMovePackage(*req.UserID, *req.PackageID, *req.StorageID) if err != nil { - log.Warnf("start storage move object: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed")) + log.Warnf("start storage move package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) return } for { - complete, err := s.svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10) + complete, err := s.svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10) if complete { if err != nil { log.Warnf("moving complete with: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) return } @@ -57,7 +57,7 @@ func (s *StorageService) MoveObject(ctx *gin.Context) { if err != nil { log.Warnf("wait moving: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) return } } diff --git a/internal/services/bucket.go b/internal/services/bucket.go index b8fee25..5b843ab 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -5,7 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type BucketService struct { @@ -22,7 +22,7 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, } func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { - resp, err := svc.coordinator.GetUserBuckets(coormsg.NewGetUserBuckets(userID)) + resp, err := svc.coordinator.GetUserBuckets(coormq.NewGetUserBuckets(userID)) if err != nil { return nil, fmt.Errorf("get user buckets failed, err: %w", err) } @@ -30,13 +30,13 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { return resp.Buckets, nil } -func (svc *BucketService) GetBucketObjects(userID int64, bucketID int64) ([]model.Object, error) { - resp, err := svc.coordinator.GetBucketObjects(coormsg.NewGetBucketObjects(userID, bucketID)) +func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) { + resp, err := svc.coordinator.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) if err != nil { - return nil, fmt.Errorf("get bucket objects failed, err: %w", err) + return nil, fmt.Errorf("get bucket packages failed, err: %w", err) } - return resp.Objects, nil + return resp.Packages, nil } func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { @@ -53,7 +53,7 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, } defer mutex.Unlock() - resp, err := svc.coordinator.CreateBucket(coormsg.NewCreateBucket(userID, bucketName)) + resp, err := svc.coordinator.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { return 0, fmt.Errorf("creating bucket: %w", err) } @@ -68,6 +68,7 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { Metadata(). UserBucket().WriteAny(). Bucket().WriteOne(bucketID). + // TODO2 Object().WriteAny(). ObjectRep().WriteAny(). ObjectBlock().WriteAny(). @@ -78,7 +79,7 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { } defer mutex.Unlock() - _, err = svc.coordinator.DeleteBucket(coormsg.NewDeleteBucket(userID, bucketID)) + _, err = svc.coordinator.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } diff --git a/internal/services/client_command_ec.go b/internal/services/client_command_ec.go deleted file mode 100644 index c5cc997..0000000 --- a/internal/services/client_command_ec.go +++ /dev/null @@ -1,457 +0,0 @@ -package services - -// TODO 将这里的逻辑拆分到services中实现 - -import ( - "bytes" - "fmt" - "io" - "math/rand" - "os" - "path/filepath" - "sync" - "time" - - "gitlink.org.cn/cloudream/storage-client/internal/config" - "gitlink.org.cn/cloudream/storage-common/pkgs/ec" - "gitlink.org.cn/cloudream/storage-common/utils" - - //"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" - ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" - agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -func (svc *ObjectService) UploadEcObject(userID int64, bucketID int64, objectName string, file io.ReadCloser, fileSize int64, ecName string) error { - // TODO 需要加锁 - - /*reqBlder := reqbuilder.NewBuilder() - for _, uploadObject := range t.Objects { - reqBlder.Metadata(). - // 用于防止创建了多个同名对象 - Object().CreateOne(t.bucketID, uploadObject.ObjectName) - }*/ - /* - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(userID, bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于设置Rep配置 - ObjectRep().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ - //发送写请求,请求Coor分配写入节点Ip - ecWriteResp, err := svc.coordinator.PreUploadEcObject(coormsg.NewPreUploadEcObject(bucketID, objectName, fileSize, ecName, userID, config.Cfg().ExternalIP)) - if err != nil { - return fmt.Errorf("request to coordinator failed, err: %w", err) - } - - if len(ecWriteResp.Nodes) == 0 { - return fmt.Errorf("no node to upload file") - } - //生成纠删码的写入节点序列 - nodes := make([]ramsg.RespNode, ecWriteResp.Ec.EcN) - numNodes := len(ecWriteResp.Nodes) - startWriteNodeID := rand.Intn(numNodes) - for i := 0; i < ecWriteResp.Ec.EcN; i++ { - nodes[i] = ecWriteResp.Nodes[(startWriteNodeID+i)%numNodes] - } - hashs, err := svc.ecWrite(file, fileSize, ecWriteResp.Ec.EcK, ecWriteResp.Ec.EcN, nodes) - if err != nil { - return fmt.Errorf("EcWrite failed, err: %w", err) - } - nodeIDs := make([]int64, len(nodes)) - for i := 0; i < len(nodes); i++ { - nodeIDs[i] = nodes[i].ID - } - //第二轮通讯:插入元数据hashs - dirName := utils.GetDirectoryName(objectName) - _, err = svc.coordinator.CreateEcObject(coormsg.NewCreateEcObject(bucketID, objectName, fileSize, userID, nodeIDs, hashs, ecName, dirName)) - if err != nil { - return fmt.Errorf("request to coordinator failed, err: %w", err) - } - return nil -} - -func (svc *ObjectService) ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []ramsg.RespNode) ([]string, error) { - - // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 - //获取文件大小 - - var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN - //计算每个块的packet数 - numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize) - //fmt.Println(numPacket) - //创建channel - loadBufs := make([]chan []byte, ecN) - encodeBufs := make([]chan []byte, ecN) - for i := 0; i < ecN; i++ { - loadBufs[i] = make(chan []byte) - } - for i := 0; i < ecN; i++ { - encodeBufs[i] = make(chan []byte) - } - hashs := make([]string, ecN) - //正式开始写入 - go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK)) //从本地文件系统加载数据 - go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) - - var wg sync.WaitGroup - wg.Add(ecN) - /*mutex, err := reqbuilder.NewBuilder(). - // 防止上传的副本被清除 - IPFS().CreateAnyRep(node.ID). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ - for i := 0; i < ecN; i++ { - go svc.send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i) - } - wg.Wait() - - return hashs, nil - -} - -func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) { - // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 - //wg := sync.WaitGroup{} - numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize) - getBufs := make([]chan []byte, ecN) - decodeBufs := make([]chan []byte, ecK) - for i := 0; i < ecN; i++ { - getBufs[i] = make(chan []byte) - } - for i := 0; i < ecK; i++ { - decodeBufs[i] = make(chan []byte) - } - for i := 0; i < len(blockIDs); i++ { - go svc.get(hashs[i], nodeIPs[i], getBufs[blockIDs[i]], numPacket) - } - print(numPacket) - go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) - r, w := io.Pipe() - //persist函数,将解码得到的文件写入pipe - go func() { - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(decodeBufs); j++ { - tmp := <-decodeBufs[j] - _, err := w.Write(tmp) - if err != nil { - fmt.Errorf("persist file falied, err:%w", err) - } - } - } - w.Close() - }() - return r, nil -} - -func (svc *ObjectService) get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error { - downloadFromAgent := false - //使用本地IPFS获取 - if svc.ipfs != nil { - log.Infof("try to use local IPFS to download file") - //获取IPFS的reader - reader, err := svc.downloadFromLocalIPFS(blockHash) - if err != nil { - downloadFromAgent = true - fmt.Errorf("read ipfs block failed, err: %w", err) - } - defer reader.Close() - for i := 0; int64(i) < numPacket; i++ { - buf := make([]byte, config.Cfg().ECPacketSize) - _, err := io.ReadFull(reader, buf) - if err != nil { - downloadFromAgent = true - fmt.Errorf("read file falied, err:%w", err) - } - getBuf <- buf - } - if downloadFromAgent == false { - close(getBuf) - return nil - } - } else { - downloadFromAgent = true - } - //从agent获取 - if downloadFromAgent == true { - /*// 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ - // 连接grpc - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - // 下载文件 - client := agentcaller.NewFileTransportClient(conn) - reader, err := mygrpc.GetFileAsStream(client, blockHash) - if err != nil { - conn.Close() - return fmt.Errorf("request to get file failed, err: %w", err) - } - for i := 0; int64(i) < numPacket; i++ { - buf := make([]byte, config.Cfg().ECPacketSize) - _, _ = reader.Read(buf) - fmt.Println(buf) - fmt.Println(numPacket, "\n") - getBuf <- buf - } - close(getBuf) - reader.Close() - return nil - } - return nil - -} - -func load(file io.ReadCloser, loadBufs []chan []byte, ecK int, totalNumPacket int64) error { - - for i := 0; int64(i) < totalNumPacket; i++ { - - buf := make([]byte, config.Cfg().ECPacketSize) - idx := i % ecK - _, err := file.Read(buf) - if err != nil { - return fmt.Errorf("read file falied, err:%w", err) - } - loadBufs[idx] <- buf - - if idx == ecK-1 { - for j := ecK; j < len(loadBufs); j++ { - zeroPkt := make([]byte, config.Cfg().ECPacketSize) - loadBufs[j] <- zeroPkt - } - } - if err != nil && err != io.EOF { - return fmt.Errorf("load file to buf failed, err:%w", err) - } - } - for i := 0; i < len(loadBufs); i++ { - - close(loadBufs[i]) - } - file.Close() - return nil -} - -func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int64, numPacket int64) { - var tmpIn [][]byte - tmpIn = make([][]byte, len(outBufs)) - enc := ec.NewRsEnc(ecK, len(outBufs)) - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(outBufs); j++ { - tmpIn[j] = <-inBufs[j] - } - enc.Encode(tmpIn) - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } - } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) - } -} - -func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { - fmt.Println("decode ") - var tmpIn [][]byte - var zeroPkt []byte - tmpIn = make([][]byte, len(inBufs)) - hasBlock := map[int]bool{} - for j := 0; j < len(blockSeq); j++ { - hasBlock[blockSeq[j]] = true - } - needRepair := false //检测是否传入了所有数据块 - for j := 0; j < len(outBufs); j++ { - if blockSeq[j] != j { - needRepair = true - } - } - enc := ec.NewRsEnc(ecK, len(inBufs)) - for i := 0; int64(i) < numPacket; i++ { - print("!!!!!") - for j := 0; j < len(inBufs); j++ { - if hasBlock[j] { - tmpIn[j] = <-inBufs[j] - } else { - tmpIn[j] = zeroPkt - } - } - if needRepair { - err := enc.Repair(tmpIn) - if err != nil { - fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error()) - } - } - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } - } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) - } -} - -func (svc *ObjectService) send(node ramsg.RespNode, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error { - // TODO zkx 先直接复制client\internal\task\upload_rep_objects.go中的uploadToNode和uploadToLocalIPFS来替代这部分逻辑 - // 方便之后异步化处理 - // uploadToAgent的逻辑反了,而且中间步骤失败,就必须打印日志后停止后续操作 - - uploadToAgent := true - if svc.ipfs != nil { //使用IPFS传输 - //创建IPFS文件 - log.Infof("try to use local IPFS to upload block") - writer, err := svc.ipfs.CreateFile() - if err != nil { - uploadToAgent = false - fmt.Errorf("create IPFS file failed, err: %w", err) - } - //逐packet写进ipfs - for i := 0; int64(i) < numPacket; i++ { - buf := <-inBuf - reader := bytes.NewReader(buf) - _, err = io.Copy(writer, reader) - if err != nil { - uploadToAgent = false - fmt.Errorf("copying block data to IPFS file failed, err: %w", err) - } - } - //finish, 获取哈希 - fileHash, err := writer.Finish() - if err != nil { - log.Warnf("upload block to local IPFS failed, so try to upload by agent, err: %s", err.Error()) - uploadToAgent = false - fmt.Errorf("finish writing blcok to IPFS failed, err: %w", err) - } - hashs[idx] = fileHash - if err != nil { - } - nodeID := node.ID - // 然后让最近节点pin本地上传的文件 - agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ) - if err != nil { - uploadToAgent = false - fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err) - } - defer agentClient.Close() - - pinObjResp, err := agentClient.StartPinningObject(agtmsg.NewStartPinningObject(fileHash)) - if err != nil { - uploadToAgent = false - fmt.Errorf("start pinning object: %w", err) - } - for { - waitResp, err := agentClient.WaitPinningObject(agtmsg.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) - if err != nil { - uploadToAgent = false - fmt.Errorf("waitting pinning object: %w", err) - } - if waitResp.IsComplete { - if waitResp.Error != "" { - uploadToAgent = false - fmt.Errorf("agent pinning object: %s", waitResp.Error) - } - break - } - } - if uploadToAgent == false { - return nil - } - } - //////////////////////////////通过Agent上传 - if uploadToAgent == true { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.ExternalIP - if node.IsSameLocation { - nodeIP = node.LocalIP - - log.Infof("client and node %d are at the same location, use local ip\n", node.ID) - } - - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer grpcCon.Close() - - client := agentcaller.NewFileTransportClient(grpcCon) - upload, err := mygrpc.SendFileAsStream(client) - if err != nil { - return fmt.Errorf("request to send file failed, err: %w", err) - } - // 发送文件数据 - for i := 0; int64(i) < numPacket; i++ { - buf := <-inBuf - reader := bytes.NewReader(buf) - _, err = io.Copy(upload, reader) - if err != nil { - // 发生错误则关闭连接 - upload.Abort(io.ErrClosedPipe) - return fmt.Errorf("copy block date to upload stream failed, err: %w", err) - } - } - // 发送EOF消息,并获得FileHash - fileHash, err := upload.Finish() - if err != nil { - upload.Abort(io.ErrClosedPipe) - return fmt.Errorf("send EOF failed, err: %w", err) - } - hashs[idx] = fileHash - wg.Done() - } - return nil -} - -func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { - fDir, err := os.Executable() - if err != nil { - panic(err) - } - fURL := filepath.Join(filepath.Dir(fDir), "assets") - _, err = os.Stat(fURL) - if os.IsNotExist(err) { - os.MkdirAll(fURL, os.ModePerm) - } - file, err := os.Create(filepath.Join(fURL, localFilePath)) - if err != nil { - return - } - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(inBuf); j++ { - tmp := <-inBuf[j] - fmt.Println(tmp) - file.Write(tmp) - } - } - file.Close() - wg.Done() -} diff --git a/internal/services/object.go b/internal/services/object.go deleted file mode 100644 index e8c6060..0000000 --- a/internal/services/object.go +++ /dev/null @@ -1,322 +0,0 @@ -package services - -import ( - "fmt" - "io" - "math/rand" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/storage-client/internal/config" - "gitlink.org.cn/cloudream/storage-client/internal/task" - "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" - ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - lo "github.com/samber/lo" -) - -type ObjectService struct { - *Service -} - -type ResultDownloadObject struct { - ObjectName string - Reader io.ReadCloser - Error error -} - -func (svc *Service) ObjectSvc() *ObjectService { - return &ObjectService{Service: svc} -} - -func (svc *ObjectService) GetObject(userID int64, objectID int64) (model.Object, error) { - // TODO - panic("not implement yet") -} - -func (svc *ObjectService) DownloadObjectDir(userID int64, dirName string) ([]ResultDownloadObject, error) { - - mutex, err := reqbuilder.NewBuilder(). - // 用于判断用户是否有对象权限 - Metadata().UserBucket().ReadAny(). - // 用于查询可用的下载节点 - Node().ReadAny(). - // 用于读取文件信息 - Object().ReadAny(). - // 用于查询Rep配置 - ObjectRep().ReadAny(). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - // 用于查询包含了副本的节点 - Cache().ReadAny(). - MutexLock(svc.distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - - // TODO 解锁时机需要优化,在所有文件都写入到本地后再解锁 - // 当前是所有文件流全部打开,处理完最后全部关闭,可以考虑加一个迭代器,将文件流逐个打开关闭 - defer mutex.Unlock() - - //根据dirName查询相关的所有文件 - objsResp, err := svc.coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(userID, dirName)) - if err != nil { - return nil, fmt.Errorf("get objectID by dirName failed: %w", err) - } - if len(objsResp.Objects) == 0 { - return nil, fmt.Errorf("dirName %v is not exist", dirName) - } - - resultDownloadObjects := []ResultDownloadObject{} - for i := 0; i < len(objsResp.Objects); i++ { - reader, err := svc.downloadSingleObject(objsResp.Objects[i].ObjectID, userID) - resultDownloadObjects = append(resultDownloadObjects, ResultDownloadObject{ - ObjectName: objsResp.Objects[i].Name, - Reader: reader, - Error: err, - }) - } - return resultDownloadObjects, nil -} - -func (svc *ObjectService) DownloadObject(userID int64, objectID int64) (io.ReadCloser, error) { - // TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分 - mutex, err := reqbuilder.NewBuilder(). - // 用于判断用户是否有对象权限 - Metadata().UserBucket().ReadAny(). - // 用于查询可用的下载节点 - Node().ReadAny(). - // 用于读取文件信息 - Object().ReadOne(objectID). - // 用于查询Rep配置 - ObjectRep().ReadOne(objectID). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - // 用于查询包含了副本的节点 - Cache().ReadAny(). - MutexLock(svc.distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - - reader, err := svc.downloadSingleObject(objectID, userID) - if err != nil { - return reader, err - } - - // TODO 需要返回Object信息 - return myio.AfterReadClosed(reader, func(closer io.ReadCloser) { - // TODO 可以考虑在打开了读取流之后就解锁,而不是要等外部读取完毕 - mutex.Unlock() - }), nil -} - -func (svc *ObjectService) downloadSingleObject(objectID int64, userID int64) (io.ReadCloser, error) { - preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObject(objectID, userID, config.Cfg().ExternalIP)) - if err != nil { - return nil, fmt.Errorf("pre download object: %w", err) - } - - switch redundancy := preDownloadResp.Redundancy.(type) { - case ramsg.RespRepRedundancyData: - if len(redundancy.Nodes) == 0 { - return nil, fmt.Errorf("no node has this file") - } - - // 选择下载节点 - entry := svc.chooseDownloadNode(redundancy.Nodes) - - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := entry.ExternalIP - if entry.IsSameLocation { - nodeIP = entry.LocalIP - - log.Infof("client and node %d are at the same location, use local ip\n", entry.ID) - } - - reader, err := svc.downloadRepObject(entry.ID, nodeIP, redundancy.FileHash) - if err != nil { - return nil, fmt.Errorf("rep read failed, err: %w", err) - } - return reader, nil - - case ramsg.RespEcRedundancyData: - // TODO EC部分的代码要考虑重构 - // ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName) - blocks := redundancy.Blocks - ec := redundancy.Ec - ecK := ec.EcK - ecN := ec.EcN - //采取直接读,优先选内网节点 - hashs := make([]string, ecK) - nds := make([]ramsg.RespNode, ecK) - for i := 0; i < ecK; i++ { - hashs[i] = blocks[i].FileHash - nds[i] = svc.chooseDownloadNode(redundancy.Nodes[i]) - } - //nodeIDs, nodeIPs直接按照第1~ecK个排列 - nodeIDs := make([]int64, ecK) - nodeIPs := make([]string, ecK) - for i := 0; i < ecK; i++ { - nodeIDs[i] = nds[i].ID - nodeIPs[i] = nds[i].ExternalIP - if nds[i].IsSameLocation { - nodeIPs[i] = nds[i].LocalIP - log.Infof("client and node %d are at the same location, use local ip\n", nds[i].ID) - } - } - - fileSize := preDownloadResp.FileSize - blockIDs := make([]int, ecK) - for i := 0; i < ecK; i++ { - blockIDs[i] = i - } - reader, err := svc.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, hashs) - if err != nil { - return nil, fmt.Errorf("ec read failed, err: %w", err) - } - return reader, nil - } - return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Redundancy) -} - -// chooseDownloadNode 选择一个下载节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.RespNode { - sameLocationEntries := lo.Filter(entries, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation }) - if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Intn(len(sameLocationEntries))] - } - - return entries[rand.Intn(len(entries))] -} - -func (svc *ObjectService) downloadRepObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - if svc.ipfs != nil { - log.Infof("try to use local IPFS to download file") - - reader, err := svc.downloadFromLocalIPFS(fileHash) - if err == nil { - return reader, nil - } - - log.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) - } - - return svc.downloadFromNode(nodeID, nodeIP, fileHash) -} - -func (svc *ObjectService) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - // 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(svc.distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - - // 连接grpc - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - - // 下载文件 - client := agentcaller.NewFileTransportClient(conn) - reader, err := mygrpc.GetFileAsStream(client, fileHash) - if err != nil { - conn.Close() - return nil, fmt.Errorf("request to get file failed, err: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - conn.Close() - mutex.Unlock() - }) - return reader, nil -} - -func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { - reader, err := svc.ipfs.OpenRead(fileHash) - if err != nil { - return nil, fmt.Errorf("read ipfs file failed, err: %w", err) - } - - return reader, nil -} - -func (svc *ObjectService) StartUploadingRepObjects(userID int64, bucketID int64, uploadObjects []task.UploadObject, repCount int) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewUploadRepObjects(userID, bucketID, uploadObjects, repCount)) - return tsk.ID(), nil -} - -func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, task.UploadRepObjectsResult, error) { - tsk := svc.taskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - uploadTask := tsk.Body().(*task.UploadRepObjects) - uploadObjectResult := task.UploadRepObjectsResult{ - Objects: uploadTask.Objects, - Results: uploadTask.Results, - IsUploading: uploadTask.IsUploading, - } - - return true, uploadObjectResult, tsk.Error() - } - return false, task.UploadRepObjectsResult{}, nil -} - -func (svc *ObjectService) UploadECObject(userID int64, file io.ReadCloser, fileSize int64, ecName string) error { - // TODO - panic("not implement yet") -} - -func (svc *ObjectService) StartUpdatingRepObject(userID int64, objectID int64, file io.ReadCloser, fileSize int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewUpdateRepObject(userID, objectID, file, fileSize)) - return tsk.ID(), nil -} - -func (svc *ObjectService) WaitUpdatingRepObject(taskID string, waitTimeout time.Duration) (bool, error) { - tsk := svc.taskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Error() - } - - return false, nil -} - -func (svc *ObjectService) DeleteObject(userID int64, objectID int64) error { - mutex, err := reqbuilder.NewBuilder(). - Metadata(). - // 用于判断用户是否有对象的权限 - UserBucket().ReadAny(). - // 用于读取、修改对象信息 - Object().WriteOne(objectID). - // 用于删除Rep配置 - ObjectRep().WriteOne(objectID). - // 用于删除Block配置 - ObjectBlock().WriteAny(). - // 用于修改Move此Object的记录的状态 - StorageObject().WriteAny(). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - _, err = svc.coordinator.DeleteObject(coormsg.NewDeleteObject(userID, objectID)) - if err != nil { - return fmt.Errorf("deleting object: %w", err) - } - - return nil -} diff --git a/internal/services/package.go b/internal/services/package.go new file mode 100644 index 0000000..c7ef8e6 --- /dev/null +++ b/internal/services/package.go @@ -0,0 +1,221 @@ +package services + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage-client/internal/config" + mytask "gitlink.org.cn/cloudream/storage-client/internal/task" + agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type PackageService struct { + *Service +} + +func (svc *Service) PackageSvc() *PackageService { + return &PackageService{Service: svc} +} + +func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { + /* + TODO2 + // TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分 + mutex, err := reqbuilder.NewBuilder(). + // 用于判断用户是否有对象权限 + Metadata().UserBucket().ReadAny(). + // 用于查询可用的下载节点 + Node().ReadAny(). + // 用于读取文件信息 + Object().ReadOne(objectID). + // 用于查询Rep配置 + ObjectRep().ReadOne(objectID). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于查询包含了副本的节点 + Cache().ReadAny(). + MutexLock(svc.distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + */ + getPkgResp, err := svc.coordinator.GetPackage(coormq.NewGetPackage(userID, packageID)) + if err != nil { + return nil, fmt.Errorf("getting package: %w", err) + } + + getObjsResp, err := svc.coordinator.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) + if err != nil { + return nil, fmt.Errorf("getting package objects: %w", err) + } + + if getPkgResp.Redundancy.Type == models.RedundancyRep { + getObjRepDataResp, err := svc.coordinator.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) + if err != nil { + return nil, fmt.Errorf("getting package object rep data: %w", err) + } + + iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, svc.coordinator, svc.distlock, iterator.DownloadConfig{ + LocalIPFS: svc.ipfs, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }) + + return iter, nil + } + + getObjECDataResp, err := svc.coordinator.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID)) + if err != nil { + return nil, fmt.Errorf("getting package object ec data: %w", err) + } + + var ecRed models.ECRedundancyInfo + if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil { + return nil, fmt.Errorf("get ec redundancy info: %w", err) + } + + getECResp, err := svc.coordinator.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) + if err != nil { + return nil, fmt.Errorf("getting ec: %w", err) + } + + iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, svc.coordinator, svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, iterator.DownloadConfig{ + LocalIPFS: svc.ipfs, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }) + + return iter, nil +} + +func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { + tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( + agtcmd.NewCreateRepPackage( + userID, bucketID, name, objIter, + repInfo, + agtcmd.UploadConfig{ + LocalIPFS: svc.ipfs, + LocalNodeID: nil, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }))) + return tsk.ID(), nil +} + +func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { + tsk := svc.taskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) + return true, &cteatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + +func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { + tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( + agtcmd.NewUpdateRepPackage( + userID, packageID, objIter, + agtcmd.UploadConfig{ + LocalIPFS: svc.ipfs, + LocalNodeID: nil, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }))) + return tsk.ID(), nil +} + +func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) { + tsk := svc.taskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateRepPackage) + return true, &updatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + +func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { + tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( + agtcmd.NewCreateECPackage( + userID, bucketID, name, objIter, + ecInfo, + config.Cfg().ECPacketSize, + agtcmd.UploadConfig{ + LocalIPFS: svc.ipfs, + LocalNodeID: nil, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }))) + return tsk.ID(), nil +} + +func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { + tsk := svc.taskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) + return true, &cteatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + +func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { + tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( + agtcmd.NewUpdateECPackage( + userID, packageID, objIter, + config.Cfg().ECPacketSize, + agtcmd.UploadConfig{ + LocalIPFS: svc.ipfs, + LocalNodeID: nil, + ExternalIP: config.Cfg().ExternalIP, + GRPCPort: config.Cfg().GRPCPort, + MQ: &config.Cfg().RabbitMQ, + }))) + return tsk.ID(), nil +} + +func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) { + tsk := svc.taskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateECPackage) + return true, &updatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + +func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { + /* + // TODO2 + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有对象的权限 + UserBucket().ReadAny(). + // 用于读取、修改对象信息 + Object().WriteOne(objectID). + // 用于删除Rep配置 + ObjectRep().WriteOne(objectID). + // 用于删除Block配置 + ObjectBlock().WriteAny(). + // 用于修改Move此Object的记录的状态 + StorageObject().WriteAny(). + MutexLock(svc.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + */ + + _, err := svc.coordinator.DeletePackage(coormq.NewDeletePackage(userID, packageID)) + if err != nil { + return fmt.Errorf("deleting package: %w", err) + } + + return nil +} diff --git a/internal/services/service.go b/internal/services/service.go index d3a2a3a..638fd3f 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -4,19 +4,19 @@ import ( distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-client/internal/task" - racli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" - sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" ) type Service struct { - coordinator *racli.Client + coordinator *coormq.Client ipfs *ipfs.IPFS - scanner *sccli.Client + scanner *scmq.Client distlock *distlock.Service taskMgr *task.Manager } -func NewService(coorClient *racli.Client, ipfsClient *ipfs.IPFS, scanner *sccli.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { +func NewService(coorClient *coormq.Client, ipfsClient *ipfs.IPFS, scanner *scmq.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { return &Service{ coordinator: coorClient, ipfs: ipfsClient, diff --git a/internal/services/storage.go b/internal/services/storage.go index 62c6df6..34a439e 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -4,11 +4,11 @@ import ( "fmt" "time" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/task" - agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" - agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type StorageService struct { @@ -19,12 +19,12 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } -func (svc *StorageService) StartStorageMoveObject(userID int64, objectID int64, storageID int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewMoveObjectToStorage(userID, objectID, storageID)) +func (svc *StorageService) StartStorageMovePackage(userID int64, packageID int64, storageID int64) (string, error) { + tsk := svc.taskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID)) return tsk.ID(), nil } -func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time.Duration) (bool, error) { +func (svc *StorageService) WaitStorageMovePackage(taskID string, waitTimeout time.Duration) (bool, error) { tsk := svc.taskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { return true, tsk.Error() @@ -33,67 +33,53 @@ func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time } -func (svc *StorageService) StartMovingDir(userID int64, dirName string, storageID int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewMoveDirToStorage(userID, dirName, storageID)) - return tsk.ID(), nil -} - -func (svc *StorageService) WaitMovingDir(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) { - tsk := svc.taskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Body().(*task.MoveDirToStorage).ResultObjectToStorages, tsk.Error() - } - - return false, nil, nil -} - -func (svc *StorageService) DeleteStorageObject(userID int64, objectID int64, storageID int64) error { +func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { // TODO panic("not implement yet") } // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID -func (svc *StorageService) StartStorageUploadRepObject(userID int64, storageID int64, filePath string, bucketID int64, objectName string, repCount int) (int64, string, error) { - stgResp, err := svc.coordinator.GetStorageInfo(coormsg.NewGetStorageInfo(userID, storageID)) +func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { + stgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := agtcli.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ) + agentCli, err := agtmq.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } defer agentCli.Close() - startResp, err := agentCli.StartStorageUploadRepObject(agtmsg.NewStartStorageUploadRepObject(userID, filePath, bucketID, objectName, repCount, stgResp.Directory)) + startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy)) if err != nil { - return 0, "", fmt.Errorf("start storage upload rep object: %w", err) + return 0, "", fmt.Errorf("start storage upload package: %w", err) } return stgResp.NodeID, startResp.TaskID, nil } -func (svc *StorageService) WaitStorageUploadRepObject(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, string, error) { - agentCli, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ) +func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) { + agentCli, err := agtmq.NewClient(nodeID, &config.Cfg().RabbitMQ) if err != nil { // TODO 失败是否要当做任务已经结束? - return true, 0, "", fmt.Errorf("new agent client: %w", err) + return true, 0, fmt.Errorf("new agent client: %w", err) } defer agentCli.Close() - waitResp, err := agentCli.WaitStorageUploadRepObject(agtmsg.NewWaitStorageUploadRepObject(taskID, waitTimeout.Milliseconds())) + waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds())) if err != nil { // TODO 请求失败是否要当做任务已经结束? - return true, 0, "", fmt.Errorf("wait storage upload rep object: %w", err) + return true, 0, fmt.Errorf("wait storage upload package: %w", err) } if !waitResp.IsComplete { - return false, 0, "", nil + return false, 0, nil } if waitResp.Error != "" { - return true, 0, "", fmt.Errorf("%s", waitResp.Error) + return true, 0, fmt.Errorf("%s", waitResp.Error) } - return true, waitResp.ObjectID, waitResp.FileHash, nil + return true, waitResp.PackageID, nil } diff --git a/internal/task/move_dir_to_storage.go b/internal/task/move_dir_to_storage.go deleted file mode 100644 index fdd9b60..0000000 --- a/internal/task/move_dir_to_storage.go +++ /dev/null @@ -1,83 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" -) - -type MoveDirToStorage struct { - userID int64 - dirName string - storageID int64 - ResultObjectToStorages []ResultObjectToStorage -} - -type ResultObjectToStorage struct { - ObjectName string - Error error -} - -func NewMoveDirToStorage(userID int64, dirName string, storageID int64) *MoveDirToStorage { - return &MoveDirToStorage{ - userID: userID, - dirName: dirName, - storageID: storageID, - } -} - -func (t *MoveDirToStorage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *MoveDirToStorage) do(ctx TaskContext) error { - //根据dirName查询相关的所有文件 - objsResp, err := ctx.Coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(t.userID, t.dirName)) - if err != nil { - return fmt.Errorf("get objectID by dirName failed: %w", err) - } - if len(objsResp.Objects) == 0 { - return fmt.Errorf("dirName %v is not exist", t.dirName) - } - - reqBlder := reqbuilder.NewBuilder() - for _, object := range objsResp.Objects { - reqBlder.Metadata(). - // 用于判断用户是否有Storage权限 - UserStorage().ReadOne(object.ObjectID, t.storageID). - // 用于读取对象信息 - Object().ReadOne(object.ObjectID). - // 用于查询Rep配置 - ObjectRep().ReadOne(object.ObjectID). - // 用于创建Move记录 - StorageObject().CreateOne(t.storageID, t.userID, object.ObjectID). - // 用于创建对象文件 - Storage().CreateOneObject(t.storageID, t.userID, object.ObjectID) - } - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有对象权限 - UserBucket().ReadAny(). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - MutexLock(ctx.DistLock) - - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - for i := 0; i < len(objsResp.Objects); i++ { - err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, t.storageID) - t.ResultObjectToStorages = append(t.ResultObjectToStorages, ResultObjectToStorage{ - ObjectName: objsResp.Objects[i].Name, - Error: err, - }) - } - return nil -} diff --git a/internal/task/move_object_to_storage.go b/internal/task/move_object_to_storage.go deleted file mode 100644 index f44bf80..0000000 --- a/internal/task/move_object_to_storage.go +++ /dev/null @@ -1,108 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage-client/internal/config" - agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" - agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" -) - -type MoveObjectToStorage struct { - userID int64 - objectID int64 - storageID int64 -} - -func NewMoveObjectToStorage(userID int64, objectID int64, storageID int64) *MoveObjectToStorage { - return &MoveObjectToStorage{ - userID: userID, - objectID: objectID, - storageID: storageID, - } -} - -func (t *MoveObjectToStorage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *MoveObjectToStorage) do(ctx TaskContext) error { - mutex, err := reqbuilder.NewBuilder(). - Metadata(). - // 用于判断用户是否有Storage权限 - UserStorage().ReadOne(t.objectID, t.storageID). - // 用于判断用户是否有对象权限 - UserBucket().ReadAny(). - // 用于读取对象信息 - Object().ReadOne(t.objectID). - // 用于查询Rep配置 - ObjectRep().ReadOne(t.objectID). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - // 用于创建Move记录 - StorageObject().CreateOne(t.storageID, t.userID, t.objectID). - Storage(). - // 用于创建对象文件 - CreateOneObject(t.storageID, t.userID, t.objectID). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, t.storageID) - return err -} - -func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, storageID int64) error { - // 先向协调端请求文件相关的元数据 - preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorage(objectID, storageID, userID)) - if err != nil { - return fmt.Errorf("pre move object to storage: %w", err) - } - - // 然后向代理端发送移动文件的请求 - agentClient, err := agtcli.NewClient(preMoveResp.NodeID, &config.Cfg().RabbitMQ) - if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", preMoveResp.NodeID, err) - } - defer agentClient.Close() - - agentMoveResp, err := agentClient.StartStorageMoveObject( - agtmsg.NewStartStorageMoveObject(preMoveResp.Directory, - objectID, - preMoveResp.Object.Name, - userID, - preMoveResp.Object.FileSize, - preMoveResp.Redundancy, - )) - if err != nil { - return fmt.Errorf("start moving object to storage: %w", err) - } - - for { - waitResp, err := agentClient.WaitStorageMoveObject(agtmsg.NewWaitStorageMoveObject(agentMoveResp.TaskID, int64(time.Second)*5)) - if err != nil { - return fmt.Errorf("wait moving object: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return fmt.Errorf("agent moving object: %s", waitResp.Error) - } - - break - } - } - - _, err = ctx.Coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorage(objectID, storageID, userID)) - if err != nil { - return fmt.Errorf("moving object to storage: %w", err) - } - return nil -} diff --git a/internal/task/storage_move_package.go b/internal/task/storage_move_package.go new file mode 100644 index 0000000..9f34b34 --- /dev/null +++ b/internal/task/storage_move_package.go @@ -0,0 +1,101 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/storage-client/internal/config" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type StorageMovePackage struct { + userID int64 + packageID int64 + storageID int64 +} + +func NewStorageMovePackage(userID int64, packageID int64, storageID int64) *StorageMovePackage { + return &StorageMovePackage{ + userID: userID, + packageID: packageID, + storageID: storageID, + } +} + +func (t *StorageMovePackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} + +func (t *StorageMovePackage) do(ctx TaskContext) error { + /* + TODO2 + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有Storage权限 + UserStorage().ReadOne(t.packageID, t.storageID). + // 用于判断用户是否有对象权限 + UserBucket().ReadAny(). + // 用于读取对象信息 + Object().ReadOne(t.packageID). + // 用于查询Rep配置 + ObjectRep().ReadOne(t.packageID). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于创建Move记录 + StorageObject().CreateOne(t.storageID, t.userID, t.packageID). + Storage(). + // 用于创建对象文件 + CreateOneObject(t.storageID, t.userID, t.packageID). + MutexLock(ctx.DistLock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + */ + getStgResp, err := ctx.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + if err != nil { + return fmt.Errorf("getting storage info: %w", err) + } + + // 然后向代理端发送移动文件的请求 + agentClient, err := agtmq.NewClient(getStgResp.NodeID, &config.Cfg().RabbitMQ) + if err != nil { + return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) + } + defer agentClient.Close() + + agentMoveResp, err := agentClient.StartStorageMovePackage( + agtmq.NewStartStorageMovePackage( + t.userID, + t.packageID, + t.storageID, + )) + if err != nil { + return fmt.Errorf("start moving package to storage: %w", err) + } + + for { + waitResp, err := agentClient.WaitStorageMovePackage(agtmq.NewWaitStorageMovePackage(agentMoveResp.TaskID, int64(time.Second)*5)) + if err != nil { + return fmt.Errorf("wait moving package: %w", err) + } + + if waitResp.IsComplete { + if waitResp.Error != "" { + return fmt.Errorf("agent moving package: %s", waitResp.Error) + } + + break + } + } + + _, err = ctx.Coordinator().PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID)) + if err != nil { + return fmt.Errorf("moving package to storage: %w", err) + } + return nil +} diff --git a/internal/task/task.go b/internal/task/task.go index fcbcfcb..69466e3 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -4,13 +4,13 @@ import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/utils/ipfs" - coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - IPFS *ipfs.IPFS - DistLock *distsvc.Service - Coordinator *coorcli.Client + ipfs *ipfs.IPFS + distLock *distsvc.Service + coordinator *coormq.Client } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +25,10 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coorcli.Client) Manager { +func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coormq.Client) Manager { return task.NewManager(TaskContext{ - IPFS: ipfs, - DistLock: distlock, - Coordinator: coorCli, + ipfs: ipfs, + distLock: distlock, + coordinator: coorCli, }) } diff --git a/internal/task/update_rep_object.go b/internal/task/update_rep_object.go deleted file mode 100644 index b4a6fe5..0000000 --- a/internal/task/update_rep_object.go +++ /dev/null @@ -1,147 +0,0 @@ -package task - -import ( - "fmt" - "io" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/common/pkgs/logger" - mysort "gitlink.org.cn/cloudream/common/utils/sort" - "gitlink.org.cn/cloudream/storage-client/internal/config" - - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" -) - -type UpdateRepObject struct { - userID int64 - objectID int64 - file io.ReadCloser - fileSize int64 -} - -func NewUpdateRepObject(userID int64, objectID int64, file io.ReadCloser, fileSize int64) *UpdateRepObject { - return &UpdateRepObject{ - userID: userID, - objectID: objectID, - file: file, - fileSize: fileSize, - } -} - -func (t *UpdateRepObject) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *UpdateRepObject) do(ctx TaskContext) error { - mutex, err := reqbuilder.NewBuilder(). - Metadata(). - // 用于判断用户是否有对象的权限 - UserBucket().ReadAny(). - // 用于读取、修改对象信息 - Object().WriteOne(t.objectID). - // 用于更新Rep配置 - ObjectRep().WriteOne(t.objectID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - // 用于修改Move此Object的记录的状态 - StorageObject().WriteAny(). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - preResp, err := ctx.Coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObject( - t.objectID, - t.fileSize, - t.userID, - config.Cfg().ExternalIP, - )) - if err != nil { - return fmt.Errorf("pre update rep object: %w", err) - } - - if len(preResp.Nodes) == 0 { - return fmt.Errorf("no node to upload file") - } - - // 上传文件的方式优先级: - // 1. 本地IPFS - // 2. 包含了旧文件,且与客户端在同地域的节点 - // 3. 不在同地域,但包含了旧文件的节点 - // 4. 同地域节点 - - uploadNode := t.chooseUpdateRepObjectNode(preResp.Nodes) - - var fileHash string - uploadedNodeIDs := []int64{} - willUploadToNode := true - // 本地有IPFS,则直接从本地IPFS上传 - if ctx.IPFS != nil { - logger.Infof("try to use local IPFS to upload file") - - fileHash, err = uploadToLocalIPFS(ctx.IPFS, t.file, uploadNode.ID) - if err != nil { - logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error()) - } else { - willUploadToNode = false - } - } - - // 否则发送到agent上传 - if willUploadToNode { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := uploadNode.ExternalIP - if uploadNode.IsSameLocation { - nodeIP = uploadNode.LocalIP - - logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID) - } - - mutex, err := reqbuilder.NewBuilder(). - IPFS(). - // 防止上传的副本被清除 - CreateAnyRep(uploadNode.ID). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - fileHash, err = uploadToNode(t.file, nodeIP) - if err != nil { - return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) - } - uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) - } - - // 更新Object - _, err = ctx.Coordinator.UpdateRepObject(coormsg.NewUpdateRepObject(t.objectID, fileHash, t.fileSize, uploadedNodeIDs, t.userID)) - if err != nil { - return fmt.Errorf("updating rep object: %w", err) - } - - return nil -} - -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *UpdateRepObject) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRepObjectRespNode) coormsg.PreUpdateRepObjectRespNode { - mysort.Sort(nodes, func(left, right coormsg.PreUpdateRepObjectRespNode) int { - v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject) - if v != 0 { - return v - } - - return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation) - }) - - return nodes[0] -} diff --git a/internal/task/update_rep_object_test.go b/internal/task/update_rep_object_test.go deleted file mode 100644 index a675869..0000000 --- a/internal/task/update_rep_object_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package task - -import ( - "testing" - - . "github.com/smartystreets/goconvey/convey" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" -) - -func Test_chooseUpdateRepObjectNode(t *testing.T) { - testcases := []struct { - title string - nodes []coormsg.PreUpdateRepObjectRespNode - wantNodeID int - }{ - { - title: "选择同地域,包含旧数据的节点", - nodes: []coormsg.PreUpdateRepObjectRespNode{ - coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false), - coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false), - coormsg.NewPreUpdateRepObjectRespNode(2, "", "", false, true), - coormsg.NewPreUpdateRepObjectRespNode(3, "", "", true, true), - }, - wantNodeID: 3, - }, - - { - title: "选择包含旧数据的节点", - nodes: []coormsg.PreUpdateRepObjectRespNode{ - coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false), - coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false), - coormsg.NewPreUpdateRepObjectRespNode(2, "", "", false, true), - }, - wantNodeID: 2, - }, - - { - title: "选择包含同地域的节点", - nodes: []coormsg.PreUpdateRepObjectRespNode{ - coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false), - coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false), - }, - wantNodeID: 0, - }, - } - - var tsk UpdateRepObject - for _, test := range testcases { - Convey(test.title, t, func() { - chooseNode := tsk.chooseUpdateRepObjectNode(test.nodes) - So(chooseNode.ID, ShouldEqual, test.wantNodeID) - }) - } -} diff --git a/internal/task/upload_rep_objects.go b/internal/task/upload_rep_objects.go deleted file mode 100644 index 3d6d7bd..0000000 --- a/internal/task/upload_rep_objects.go +++ /dev/null @@ -1,294 +0,0 @@ -package task - -import ( - "fmt" - "io" - "math/rand" - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" - "gitlink.org.cn/cloudream/storage-client/internal/config" - "gitlink.org.cn/cloudream/storage-common/utils" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" - - agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" - ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" - agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// UploadObjects和UploadRepResults为一一对应关系 -type UploadRepObjects struct { - userID int64 - bucketID int64 - repCount int - Objects []UploadObject - Results []UploadSingleRepObjectResult - IsUploading bool -} - -type UploadRepObjectsResult struct { - Objects []UploadObject - Results []UploadSingleRepObjectResult - IsUploading bool -} - -type UploadObject struct { - ObjectName string - File io.ReadCloser - FileSize int64 -} - -type UploadSingleRepObjectResult struct { - Error error - FileHash string - ObjectID int64 -} - -func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects { - return &UploadRepObjects{ - userID: userID, - bucketID: bucketID, - Objects: uploadObjects, - repCount: repCount, - } -} - -func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *UploadRepObjects) do(ctx TaskContext) error { - - reqBlder := reqbuilder.NewBuilder() - for _, uploadObject := range t.Objects { - reqBlder.Metadata(). - // 用于防止创建了多个同名对象 - Object().CreateOne(t.bucketID, uploadObject.ObjectName) - } - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(t.userID, t.bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于设置Rep配置 - ObjectRep().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - var repWriteResps []*coormsg.PreUploadResp - - //判断是否所有文件都符合上传条件 - hasFailure := true - for i := 0; i < len(t.Objects); i++ { - repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i]) - if err != nil { - hasFailure = false - t.Results = append(t.Results, - UploadSingleRepObjectResult{ - Error: err, - FileHash: "", - ObjectID: 0, - }) - continue - } - t.Results = append(t.Results, UploadSingleRepObjectResult{}) - repWriteResps = append(repWriteResps, repWriteResp) - } - - // 不满足上传条件,返回各文件检查结果 - if !hasFailure { - return nil - } - - //上传文件夹 - t.IsUploading = true - for i := 0; i < len(repWriteResps); i++ { - objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i]) - // 记录文件上传结果 - t.Results[i] = UploadSingleRepObjectResult{ - Error: err, - FileHash: fileHash, - ObjectID: objectID, - } - } - return nil -} - -// 检查单个文件是否能够上传 -func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) { - //发送写请求,请求Coor分配写入节点Ip - repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP)) - if err != nil { - return nil, fmt.Errorf("pre upload rep object: %w", err) - } - if len(repWriteResp.Nodes) == 0 { - return nil, fmt.Errorf("no node to upload file") - } - return repWriteResp, nil -} - -// 上传文件 -func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) { - uploadNode := t.chooseUploadNode(preResp.Nodes) - - var fileHash string - uploadedNodeIDs := []int64{} - willUploadToNode := true - // 本地有IPFS,则直接从本地IPFS上传 - if ctx.IPFS != nil { - logger.Infof("try to use local IPFS to upload file") - - var err error - fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File, uploadNode.ID) - if err != nil { - logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error()) - } else { - willUploadToNode = false - } - } - - // 否则发送到agent上传 - if willUploadToNode { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := uploadNode.ExternalIP - if uploadNode.IsSameLocation { - nodeIP = uploadNode.LocalIP - - logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID) - } - - mutex, err := reqbuilder.NewBuilder(). - // 防止上传的副本被清除 - IPFS().CreateAnyRep(uploadNode.ID). - MutexLock(ctx.DistLock) - if err != nil { - return 0, "", fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - fileHash, err = uploadToNode(uploadObject.File, nodeIP) - if err != nil { - return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) - } - uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) - } - - dirName := utils.GetDirectoryName(uploadObject.ObjectName) - - // 记录写入的文件的Hash - createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName)) - if err != nil { - return 0, "", fmt.Errorf("creating rep object: %w", err) - } - - return createResp.ObjectID, fileHash, nil -} - -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode { - sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation }) - if len(sameLocationNodes) > 0 { - return sameLocationNodes[rand.Intn(len(sameLocationNodes))] - } - - return nodes[rand.Intn(len(nodes))] -} - -func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) { - // 建立grpc连接,发送请求 - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer grpcCon.Close() - - client := agentcaller.NewFileTransportClient(grpcCon) - upload, err := mygrpc.SendFileAsStream(client) - if err != nil { - return "", fmt.Errorf("request to send file failed, err: %w", err) - } - - // 发送文件数据 - _, err = io.Copy(upload, file) - if err != nil { - // 发生错误则关闭连接 - upload.Abort(io.ErrClosedPipe) - return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err) - } - - // 发送EOF消息,并获得FileHash - fileHash, err := upload.Finish() - if err != nil { - upload.Abort(io.ErrClosedPipe) - return "", fmt.Errorf("send EOF failed, err: %w", err) - } - - return fileHash, nil -} - -func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser, nodeID int64) (string, error) { - // 从本地IPFS上传文件 - writer, err := ipfs.CreateFile() - if err != nil { - return "", fmt.Errorf("create IPFS file failed, err: %w", err) - } - - _, err = io.Copy(writer, file) - if err != nil { - return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err) - } - - fileHash, err := writer.Finish() - if err != nil { - return "", fmt.Errorf("finish writing IPFS failed, err: %w", err) - } - - // 然后让最近节点pin本地上传的文件 - agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ) - if err != nil { - return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err) - } - defer agentClient.Close() - - pinObjResp, err := agentClient.StartPinningObject(agtmsg.NewStartPinningObject(fileHash)) - if err != nil { - return "", fmt.Errorf("start pinning object: %w", err) - } - - for { - waitResp, err := agentClient.WaitPinningObject(agtmsg.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) - if err != nil { - return "", fmt.Errorf("waitting pinning object: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return "", fmt.Errorf("agent pinning object: %s", waitResp.Error) - } - - break - } - } - - return fileHash, nil -} diff --git a/main.go b/main.go index 0a5e663..35fa37f 100644 --- a/main.go +++ b/main.go @@ -13,8 +13,8 @@ import ( "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/services" "gitlink.org.cn/cloudream/storage-client/internal/task" - coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" - sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" ) func main() { @@ -30,13 +30,13 @@ func main() { os.Exit(1) } - coorClient, err := coorcli.NewClient(&config.Cfg().RabbitMQ) + coorClient, err := coormq.NewClient(&config.Cfg().RabbitMQ) if err != nil { log.Warnf("new coordinator client failed, err: %s", err.Error()) os.Exit(1) } - scanner, err := sccli.NewClient(&config.Cfg().RabbitMQ) + scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) if err != nil { log.Warnf("new scanner client failed, err: %s", err.Error()) os.Exit(1)