Browse Source

Merge pull request '统一单文件和多文件逻辑;优化程序结构' (#2) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
26b1f6dc12
23 changed files with 905 additions and 2102 deletions
  1. +0
    -320
      internal/cmdline/object.go
  2. +281
    -0
      internal/cmdline/package.go
  3. +2
    -2
      internal/cmdline/scanner.go
  4. +14
    -39
      internal/cmdline/storage.go
  5. +2
    -2
      internal/config/config.go
  6. +0
    -200
      internal/http/object.go
  7. +223
    -0
      internal/http/package.go
  8. +4
    -4
      internal/http/server.go
  9. +11
    -11
      internal/http/storage.go
  10. +9
    -8
      internal/services/bucket.go
  11. +0
    -457
      internal/services/client_command_ec.go
  12. +0
    -322
      internal/services/object.go
  13. +221
    -0
      internal/services/package.go
  14. +5
    -5
      internal/services/service.go
  15. +20
    -34
      internal/services/storage.go
  16. +0
    -83
      internal/task/move_dir_to_storage.go
  17. +0
    -108
      internal/task/move_object_to_storage.go
  18. +101
    -0
      internal/task/storage_move_package.go
  19. +8
    -8
      internal/task/task.go
  20. +0
    -147
      internal/task/update_rep_object.go
  21. +0
    -54
      internal/task/update_rep_object_test.go
  22. +0
    -294
      internal/task/upload_rep_objects.go
  23. +4
    -4
      main.go

+ 0
- 320
internal/cmdline/object.go View File

@@ -1,320 +0,0 @@
package cmdline

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"gitlink.org.cn/cloudream/storage-client/internal/task"
)

func ObjectListBucketObjects(ctx CommandContext, bucketID int64) error {
userID := int64(0)

objects, err := ctx.Cmdline.Svc.BucketSvc().GetBucketObjects(userID, bucketID)
if err != nil {
return err
}

fmt.Printf("Find %d objects in bucket %d for user %d:\n", len(objects), bucketID, userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy"})

for _, obj := range objects {
tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.FileSize, obj.BucketID, obj.State, obj.Redundancy})
}

fmt.Print(tb.Render())
return nil
}

func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int64) error {
// 创建本地文件
outputFileDir := filepath.Dir(localFilePath)

err := os.MkdirAll(outputFileDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err)
}

outputFile, err := os.Create(localFilePath)
if err != nil {
return fmt.Errorf("create output file %s failed, err: %w", localFilePath, err)
}
defer outputFile.Close()

// 下载文件
reader, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObject(0, objectID)
if err != nil {
return fmt.Errorf("download object failed, err: %w", err)
}
defer reader.Close()

_, err = io.Copy(outputFile, reader)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}

return nil
}

func ObjectDownloadObjectDir(ctx CommandContext, outputBaseDir string, dirName string) error {
// 创建本地文件夹
err := os.MkdirAll(outputBaseDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output base directory %s failed, err: %w", outputBaseDir, err)
}

// 下载文件夹
resObjs, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObjectDir(0, dirName)
if err != nil {
return fmt.Errorf("download folder failed, err: %w", err)
}

// 遍历 关闭文件流
defer func() {
for _, resObj := range resObjs {
resObj.Reader.Close()
}
}()

for i := 0; i < len(resObjs); i++ {
if resObjs[i].Error != nil {
fmt.Printf("download file %s failed, err: %s", resObjs[i].ObjectName, err.Error())
continue
}
outputFilePath := filepath.Join(outputBaseDir, resObjs[i].ObjectName)
outputFileDir := filepath.Dir(outputFilePath)
err = os.MkdirAll(outputFileDir, os.ModePerm)
if err != nil {
fmt.Printf("create output file directory %s failed, err: %s", outputFileDir, err.Error())
continue
}

outputFile, err := os.Create(outputFilePath)
if err != nil {
fmt.Printf("create output file %s failed, err: %s", outputFilePath, err.Error())
continue
}
defer outputFile.Close()

_, err = io.Copy(outputFile, resObjs[i].Reader)
if err != nil {
// TODO 写入到文件失败,是否要考虑删除这个不完整的文件?
fmt.Printf("copy object data to local file failed, err: %s", err.Error())
continue
}
}
return nil
}

func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID int64, objectName string, repCount int) error {
file, err := os.Open(localFilePath)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", localFilePath, err)
}
defer file.Close()

fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err)
}
fileSize := fileInfo.Size()

uploadObject := task.UploadObject{
ObjectName: objectName,
File: file,
FileSize: fileSize,
}
uploadObjects := []task.UploadObject{uploadObject}

taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadObjects, repCount)
if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

uploadRet := uploadObjectResult.Results[0]
if uploadRet.Error != nil {
return uploadRet.Error
}

fmt.Print(uploadRet.FileHash)
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func ObjectUploadEcObject(ctx CommandContext, localFilePath string, bucketID int64, objectName string, ecName string) error {
// TODO 参考rep的,改成异步流程
file, err := os.Open(localFilePath)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", localFilePath, err)
}

fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err)
}
fileSize := fileInfo.Size()

err = ctx.Cmdline.Svc.ObjectSvc().UploadEcObject(0, bucketID, objectName, file, fileSize, ecName)
if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

return nil
}

func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID int64, repCount int) error {
var uploadFiles []task.UploadObject
var uploadFile task.UploadObject
err := filepath.Walk(localDirPath, func(fname string, fi os.FileInfo, err error) error {
if !fi.IsDir() {
file, err := os.Open(fname)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", fname, err)
}
uploadFile = task.UploadObject{
ObjectName: filepath.ToSlash(fname),
File: file,
FileSize: fi.Size(),
}
uploadFiles = append(uploadFiles, uploadFile)
}
return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", localDirPath, err)
}

// 遍历 关闭文件流
defer func() {
for _, uploadFile := range uploadFiles {
uploadFile.File.Close()
}
}()

taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount)

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

tb := table.NewWriter()
if uploadObjectResult.IsUploading {

tb.AppendHeader(table.Row{"ObjectName", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.Objects); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.Objects[i].ObjectName,
uploadObjectResult.Results[i].ObjectID,
uploadObjectResult.Results[i].FileHash,
})
}
fmt.Print(tb.Render())

} else {
fmt.Println("The folder upload failed. Some files do not meet the upload requirements.")

tb.AppendHeader(table.Row{"ObjectName", "Error"})
for i := 0; i < len(uploadObjectResult.Objects); i++ {
if uploadObjectResult.Results[i].Error != nil {
tb.AppendRow(table.Row{uploadObjectResult.Objects[i].ObjectName, uploadObjectResult.Results[i].Error})
}
}
fmt.Print(tb.Render())
}
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func ObjectUpdateRepObject(ctx CommandContext, objectID int64, filePath string) error {
userID := int64(0)

file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", filePath, err)
}
defer file.Close()

fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", filePath, err)
}
fileSize := fileInfo.Size()

taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUpdatingRepObject(userID, objectID, file, fileSize)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", objectID, err)
}

for {
complete, err := ctx.Cmdline.Svc.ObjectSvc().WaitUpdatingRepObject(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func ObjectDeleteObject(ctx CommandContext, objectID int64) error {
userID := int64(0)
err := ctx.Cmdline.Svc.ObjectSvc().DeleteObject(userID, objectID)
if err != nil {
return fmt.Errorf("delete object %d failed, err: %w", objectID, err)
}
return nil
}

func init() {
commands.MustAdd(ObjectListBucketObjects, "object", "ls")

commands.MustAdd(ObjectUploadEcObject, "object", "new", "ec")

commands.MustAdd(ObjectUploadRepObject, "object", "new", "rep")

commands.MustAdd(ObjectUploadRepObjectDir, "object", "new", "dir")

commands.MustAdd(ObjectDownloadObject, "object", "get")

commands.MustAdd(ObjectDownloadObjectDir, "object", "get", "dir")

commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep")

commands.MustAdd(ObjectDeleteObject, "object", "delete")

}

+ 281
- 0
internal/cmdline/package.go View File

@@ -0,0 +1,281 @@
package cmdline

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"gitlink.org.cn/cloudream/common/models"
)

func PackageListBucketPackages(ctx CommandContext, bucketID int64) error {
userID := int64(0)

packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID)
if err != nil {
return err
}

fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID, userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State", "Redundancy"})

for _, obj := range packages {
tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State, obj.Redundancy})
}

fmt.Print(tb.Render())
return nil
}

func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int64) error {
err := os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err)
}

// 下载文件
objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID)
if err != nil {
return fmt.Errorf("download object failed, err: %w", err)
}
defer objIter.Close()

for {
objInfo, ok := objIter.MoveNext()
if !ok {
break
}

if objInfo.Error != nil {
return objInfo.Error
}
defer objInfo.File.Close()

outputFile, err := os.Create(filepath.Join(outputDir, objInfo.Object.Path))
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()

_, err = io.Copy(outputFile, objInfo.File)
if err != nil {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}
}

return nil
}

func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error {
var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount))

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

tb := table.NewWriter()

tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.ObjectResults); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.ObjectResults[i].Info.Path,
uploadObjectResult.ObjectResults[i].ObjectID,
uploadObjectResult.ObjectResults[i].FileHash,
})
}
fmt.Print(tb.Render())
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string) error {
var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName))

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading ec package: %w", err)
}

tb := table.NewWriter()

tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.ObjectResults); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.ObjectResults[i].Info.Path,
uploadObjectResult.ObjectResults[i].ObjectID,
uploadObjectResult.ObjectResults[i].FileHash,
})
}
fmt.Print(tb.Render())
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update package %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingECPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating ec package: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func PackageDeletePackage(ctx CommandContext, packageID int64) error {
userID := int64(0)
err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID)
if err != nil {
return fmt.Errorf("delete package %d failed, err: %w", packageID, err)
}
return nil
}

func init() {
commands.MustAdd(PackageListBucketPackages, "pkg", "ls")

commands.MustAdd(PackageDownloadPackage, "pkg", "get")

commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "rep")

commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep")

commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "ec")

commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec")

commands.MustAdd(PackageDeletePackage, "pkg", "delete")
}

+ 2
- 2
internal/cmdline/scanner.go View File

@@ -5,7 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/cmdtrie" "gitlink.org.cn/cloudream/common/pkgs/cmdtrie"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect" myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event"
scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
) )


var parseScannerEventCmdTrie cmdtrie.StaticCommandTrie[any] = cmdtrie.NewStaticCommandTrie[any]() var parseScannerEventCmdTrie cmdtrie.StaticCommandTrie[any] = cmdtrie.NewStaticCommandTrie[any]()
@@ -33,7 +33,7 @@ func init() {


parseScannerEventCmdTrie.MustAdd(scevt.NewCheckCache, myreflect.TypeNameOf[scevt.CheckCache]()) parseScannerEventCmdTrie.MustAdd(scevt.NewCheckCache, myreflect.TypeNameOf[scevt.CheckCache]())


parseScannerEventCmdTrie.MustAdd(scevt.NewCheckObject, myreflect.TypeNameOf[scevt.CheckObject]())
parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackage, myreflect.TypeNameOf[scevt.CheckPackage]())


parseScannerEventCmdTrie.MustAdd(scevt.NewCheckRepCount, myreflect.TypeNameOf[scevt.CheckRepCount]()) parseScannerEventCmdTrie.MustAdd(scevt.NewCheckRepCount, myreflect.TypeNameOf[scevt.CheckRepCount]())




+ 14
- 39
internal/cmdline/storage.go View File

@@ -3,16 +3,18 @@ package cmdline
import ( import (
"fmt" "fmt"
"time" "time"

"gitlink.org.cn/cloudream/common/models"
) )


func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMoveObject(0, objectID, storageID)
func StorageMovePackage(ctx CommandContext, packageID int64, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMovePackage(0, packageID, storageID)
if err != nil { if err != nil {
return fmt.Errorf("start moving object to storage: %w", err)
return fmt.Errorf("start moving package to storage: %w", err)
} }


for { for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10)
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("moving complete with: %w", err) return fmt.Errorf("moving complete with: %w", err)
@@ -27,46 +29,21 @@ func StorageMoveObject(ctx CommandContext, objectID int64, storageID int64) erro
} }
} }


func StorageMoveDir(ctx CommandContext, dirName string, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartMovingDir(0, dirName, storageID)
func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path,
models.NewTypedRepRedundancyInfo(repCount))
if err != nil { if err != nil {
return fmt.Errorf("start moving object to storage: %w", err)
return fmt.Errorf("start storage uploading rep package: %w", err)
} }


for { for {
complete, results, err := ctx.Cmdline.Svc.StorageSvc().WaitMovingDir(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}
// 返回各object的move结果
for _, result := range results {
if result.Error != nil {
fmt.Printf("moving %s to storage failed: %s\n", result.ObjectName, result.Error)
}
}
return nil
}
if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}

func StorageUploadRepObject(ctx CommandContext, storageID int64, filePath string, bucketID int64, objectName string, repCount int) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageUploadRepObject(0, storageID, filePath, bucketID, objectName, repCount)
if err != nil {
return fmt.Errorf("start storage uploading rep object: %w", err)
}

for {
complete, objectID, fileHash, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageUploadRepObject(nodeID, taskID, time.Second*10)
complete, packageID, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("uploading complete with: %w", err) return fmt.Errorf("uploading complete with: %w", err)
} }


fmt.Printf("%d\n%s\n", objectID, fileHash)
fmt.Printf("%d\n", packageID)
return nil return nil
} }


@@ -77,9 +54,7 @@ func StorageUploadRepObject(ctx CommandContext, storageID int64, filePath string
} }


func init() { func init() {
commands.MustAdd(StorageMoveObject, "storage", "move", "obj")

commands.MustAdd(StorageMoveDir, "storage", "move", "dir")
commands.MustAdd(StorageMovePackage, "storage", "move", "pkg")


commands.MustAdd(StorageUploadRepObject, "storage", "upload", "rep")
commands.MustAdd(StorageCreateRepPackage, "storage", "upload", "rep")
} }

+ 2
- 2
internal/config/config.go View File

@@ -5,7 +5,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Config struct { type Config struct {
@@ -15,7 +15,7 @@ type Config struct {
LocalIP string `json:"localIP"` LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"` ExternalIP string `json:"externalIP"`
Logger logger.Config `json:"logger"` Logger logger.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"` DistLock distlock.Config `json:"distlock"`
} }


+ 0
- 200
internal/http/object.go View File

@@ -1,200 +0,0 @@
package http

import (
"io"
"mime/multipart"
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-client/internal/task"
)

type ObjectService struct {
*Server
}

func (s *Server) ObjectSvc() *ObjectService {
return &ObjectService{
Server: s,
}
}

type ObjectDownloadReq struct {
UserID *int64 `form:"userID" binding:"required"`
ObjectID *int64 `form:"objectID" binding:"required"`
}

func (s *ObjectService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Download")

var req ObjectDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

file, err := s.svc.ObjectSvc().DownloadObject(*req.UserID, *req.ObjectID)
if err != nil {
log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))
return
}

ctx.Writer.WriteHeader(http.StatusOK)
// TODO 需要设置FileName
ctx.Header("Content-Disposition", "attachment; filename=filename")
ctx.Header("Content-Type", "application/octet-stream")

buf := make([]byte, 4096)
ctx.Stream(func(w io.Writer) bool {
rd, err := file.Read(buf)
if err == io.EOF {
return false
}

if err != nil {
log.Warnf("reading file data: %s", err.Error())
return false
}

err = myio.WriteAll(w, buf[:rd])
if err != nil {
log.Warnf("writing data to response: %s", err.Error())
return false
}

return true
})
}

type ObjectUploadReq struct {
Info ObjectUploadInfo `form:"info" binding:"required"`
File *multipart.FileHeader `form:"file"`
}

type ObjectUploadInfo struct {
UserID *int64 `json:"userID" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
FileSize *int64 `json:"fileSize" binding:"required"`
ObjectName string `json:"objectName" binding:"required"`
Redundancy struct {
Type string `json:"type" binding:"required"`
Config any `json:"config" binding:"required"`
} `json:"redundancy" binding:"required"`
}

type ObjectUploadResp struct {
ObjectID int64 `json:"objectID,string"`
}

func (s *ObjectService) Upload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Upload")

var req ObjectUploadReq
if err := ctx.ShouldBind(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

switch req.Info.Redundancy.Type {
case models.RedundancyRep:
s.uploadRep(ctx, &req)
return
case models.RedundancyEC:

}

ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet"))
}

func (s *ObjectService) uploadRep(ctx *gin.Context, req *ObjectUploadReq) {
log := logger.WithField("HTTP", "Object.Upload")

var repInfo models.RepRedundancyInfo
if err := serder.AnyToAny(req.Info.Redundancy.Config, &repInfo); err != nil {
log.Warnf("parsing rep redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
return
}

file, err := req.File.Open()
if err != nil {
log.Warnf("opening file: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "open file failed"))
return
}

taskID, err := s.svc.ObjectSvc().StartUploadingRepObjects(*req.Info.UserID, *req.Info.BucketID, []task.UploadObject{{
ObjectName: req.Info.ObjectName,
File: file,
FileSize: *req.Info.FileSize,
}}, repInfo.RepCount)

if err != nil {
log.Warnf("start uploading rep object task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
return
}

for {
complete, uploadObjectResult, err := s.svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading rep object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep object failed"))
return
}

uploadRet := uploadObjectResult.Results[0]
if uploadRet.Error != nil {
log.Warnf("uploading rep object: %s", uploadRet.Error)
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, uploadRet.Error.Error()))
return
}

ctx.JSON(http.StatusOK, OK(ObjectUploadResp{
ObjectID: uploadRet.ObjectID,
}))
return
}

if err != nil {
log.Warnf("waiting task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
return
}
}
}

type ObjectDeleteReq struct {
UserID *int64 `json:"userID" binding:"required"`
ObjectID *int64 `json:"objectID" binding:"required"`
}

func (s *ObjectService) Delete(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Delete")

var req ObjectDeleteReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.ObjectSvc().DeleteObject(*req.UserID, *req.ObjectID)
if err != nil {
log.Warnf("deleting object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete object failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
}

+ 223
- 0
internal/http/package.go View File

@@ -0,0 +1,223 @@
package http

import (
"io"
"mime/multipart"
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type PackageService struct {
*Server
}

func (s *Server) PackageSvc() *PackageService {
return &PackageService{
Server: s,
}
}

type PackageDownloadReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}

func (s *PackageService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Download")

var req PackageDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

file, err := s.svc.PackageSvc().DownloadPackage(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("downloading package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download package failed"))
return
}

ctx.Writer.WriteHeader(http.StatusOK)
// TODO 需要设置FileName
ctx.Header("Content-Disposition", "attachment; filename=filename")
ctx.Header("Content-Type", "application/octet-stream")

buf := make([]byte, 4096)
ctx.Stream(func(w io.Writer) bool {
rd, err := file.Read(buf)
if err == io.EOF {
return false
}

if err != nil {
log.Warnf("reading file data: %s", err.Error())
return false
}

err = myio.WriteAll(w, buf[:rd])
if err != nil {
log.Warnf("writing data to response: %s", err.Error())
return false
}

return true
})
}

type PackageUploadReq struct {
Info PackageUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
}

type PackageUploadInfo struct {
UserID *int64 `json:"userID" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"`
}

type PackageUploadResp struct {
PackageID int64 `json:"packageID,string"`
}

func (s *PackageService) Upload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Upload")

var req PackageUploadReq
if err := ctx.ShouldBind(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

switch req.Info.Redundancy.Type {
case models.RedundancyRep:
s.uploadRep(ctx, &req)
return
case models.RedundancyEC:
s.uploadEC(ctx, &req)
return
}

ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet"))
}

func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var repInfo models.RepRedundancyInfo
if err := serder.AnyToAny(req.Info.Redundancy.Info, &repInfo); err != nil {
log.Warnf("parsing rep redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
return
}

objIter := iterator.NewHTTPObjectIterator(req.Files)

taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo)

if err != nil {
log.Warnf("start uploading rep package task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
return
}

for {
complete, createResult, err := s.svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading rep package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageUploadResp{
PackageID: createResult.PackageID,
}))
return
}

if err != nil {
log.Warnf("waiting task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
return
}
}
}

func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var ecInfo models.ECRedundancyInfo
if err := serder.AnyToAny(req.Info.Redundancy.Info, &ecInfo); err != nil {
log.Warnf("parsing ec redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
return
}

objIter := iterator.NewHTTPObjectIterator(req.Files)

taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo)

if err != nil {
log.Warnf("start uploading ec package task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed"))
return
}

for {
complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading ec package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageUploadResp{
PackageID: createResult.PackageID,
}))
return
}

if err != nil {
log.Warnf("waiting task: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed"))
return
}
}
}

type PackageDeleteReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
}

func (s *PackageService) Delete(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Delete")

var req PackageDeleteReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("deleting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
}

+ 4
- 4
internal/http/server.go View File

@@ -38,9 +38,9 @@ func (s *Server) Serve() error {
} }


func (s *Server) initRouters() { func (s *Server) initRouters() {
s.engine.GET("/object/download", s.ObjectSvc().Download)
s.engine.POST("/object/upload", s.ObjectSvc().Upload)
s.engine.POST("/object/delete", s.ObjectSvc().Delete)
s.engine.GET("/package/download", s.PackageSvc().Download)
s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete)


s.engine.POST("/storage/moveObject", s.StorageSvc().MoveObject)
s.engine.POST("/storage/movePackage", s.StorageSvc().MovePackage)
} }

+ 11
- 11
internal/http/storage.go View File

@@ -19,35 +19,35 @@ func (s *Server) StorageSvc() *StorageService {
} }
} }


type StorageMoveObjectReq struct {
type StorageMovePackageReq struct {
UserID *int64 `json:"userID" binding:"required"` UserID *int64 `json:"userID" binding:"required"`
ObjectID *int64 `json:"objectID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"` StorageID *int64 `json:"storageID" binding:"required"`
} }


func (s *StorageService) MoveObject(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.MoveObject")
func (s *StorageService) MovePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.MovePackage")


var req StorageMoveObjectReq
var req StorageMovePackageReq
if err := ctx.ShouldBindJSON(&req); err != nil { if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


taskID, err := s.svc.StorageSvc().StartStorageMoveObject(*req.UserID, *req.ObjectID, *req.StorageID)
taskID, err := s.svc.StorageSvc().StartStorageMovePackage(*req.UserID, *req.PackageID, *req.StorageID)
if err != nil { if err != nil {
log.Warnf("start storage move object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed"))
log.Warnf("start storage move package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
return return
} }


for { for {
complete, err := s.svc.StorageSvc().WaitStorageMoveObject(taskID, time.Second*10)
complete, err := s.svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
log.Warnf("moving complete with: %s", err.Error()) log.Warnf("moving complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed"))
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
return return
} }


@@ -57,7 +57,7 @@ func (s *StorageService) MoveObject(ctx *gin.Context) {


if err != nil { if err != nil {
log.Warnf("wait moving: %s", err.Error()) log.Warnf("wait moving: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move object failed"))
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
return return
} }
} }


+ 9
- 8
internal/services/bucket.go View File

@@ -5,7 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type BucketService struct { type BucketService struct {
@@ -22,7 +22,7 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket,
} }


func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) {
resp, err := svc.coordinator.GetUserBuckets(coormsg.NewGetUserBuckets(userID))
resp, err := svc.coordinator.GetUserBuckets(coormq.NewGetUserBuckets(userID))
if err != nil { if err != nil {
return nil, fmt.Errorf("get user buckets failed, err: %w", err) return nil, fmt.Errorf("get user buckets failed, err: %w", err)
} }
@@ -30,13 +30,13 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) {
return resp.Buckets, nil return resp.Buckets, nil
} }


func (svc *BucketService) GetBucketObjects(userID int64, bucketID int64) ([]model.Object, error) {
resp, err := svc.coordinator.GetBucketObjects(coormsg.NewGetBucketObjects(userID, bucketID))
func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) {
resp, err := svc.coordinator.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID))
if err != nil { if err != nil {
return nil, fmt.Errorf("get bucket objects failed, err: %w", err)
return nil, fmt.Errorf("get bucket packages failed, err: %w", err)
} }


return resp.Objects, nil
return resp.Packages, nil
} }


func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) {
@@ -53,7 +53,7 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64,
} }
defer mutex.Unlock() defer mutex.Unlock()


resp, err := svc.coordinator.CreateBucket(coormsg.NewCreateBucket(userID, bucketName))
resp, err := svc.coordinator.CreateBucket(coormq.NewCreateBucket(userID, bucketName))
if err != nil { if err != nil {
return 0, fmt.Errorf("creating bucket: %w", err) return 0, fmt.Errorf("creating bucket: %w", err)
} }
@@ -68,6 +68,7 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error {
Metadata(). Metadata().
UserBucket().WriteAny(). UserBucket().WriteAny().
Bucket().WriteOne(bucketID). Bucket().WriteOne(bucketID).
// TODO2
Object().WriteAny(). Object().WriteAny().
ObjectRep().WriteAny(). ObjectRep().WriteAny().
ObjectBlock().WriteAny(). ObjectBlock().WriteAny().
@@ -78,7 +79,7 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error {
} }
defer mutex.Unlock() defer mutex.Unlock()


_, err = svc.coordinator.DeleteBucket(coormsg.NewDeleteBucket(userID, bucketID))
_, err = svc.coordinator.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID))
if err != nil { if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err) return fmt.Errorf("request to coordinator failed, err: %w", err)
} }


+ 0
- 457
internal/services/client_command_ec.go View File

@@ -1,457 +0,0 @@
package services

// TODO 将这里的逻辑拆分到services中实现

import (
"bytes"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"sync"
"time"

"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec"
"gitlink.org.cn/cloudream/storage-common/utils"

//"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent"
ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func (svc *ObjectService) UploadEcObject(userID int64, bucketID int64, objectName string, file io.ReadCloser, fileSize int64, ecName string) error {
// TODO 需要加锁

/*reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.Objects {
reqBlder.Metadata().
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, uploadObject.ObjectName)
}*/
/*
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(userID, bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
//发送写请求,请求Coor分配写入节点Ip
ecWriteResp, err := svc.coordinator.PreUploadEcObject(coormsg.NewPreUploadEcObject(bucketID, objectName, fileSize, ecName, userID, config.Cfg().ExternalIP))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}

if len(ecWriteResp.Nodes) == 0 {
return fmt.Errorf("no node to upload file")
}
//生成纠删码的写入节点序列
nodes := make([]ramsg.RespNode, ecWriteResp.Ec.EcN)
numNodes := len(ecWriteResp.Nodes)
startWriteNodeID := rand.Intn(numNodes)
for i := 0; i < ecWriteResp.Ec.EcN; i++ {
nodes[i] = ecWriteResp.Nodes[(startWriteNodeID+i)%numNodes]
}
hashs, err := svc.ecWrite(file, fileSize, ecWriteResp.Ec.EcK, ecWriteResp.Ec.EcN, nodes)
if err != nil {
return fmt.Errorf("EcWrite failed, err: %w", err)
}
nodeIDs := make([]int64, len(nodes))
for i := 0; i < len(nodes); i++ {
nodeIDs[i] = nodes[i].ID
}
//第二轮通讯:插入元数据hashs
dirName := utils.GetDirectoryName(objectName)
_, err = svc.coordinator.CreateEcObject(coormsg.NewCreateEcObject(bucketID, objectName, fileSize, userID, nodeIDs, hashs, ecName, dirName))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
return nil
}

func (svc *ObjectService) ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []ramsg.RespNode) ([]string, error) {

// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理
//获取文件大小

var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
//计算每个块的packet数
numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize)
//fmt.Println(numPacket)
//创建channel
loadBufs := make([]chan []byte, ecN)
encodeBufs := make([]chan []byte, ecN)
for i := 0; i < ecN; i++ {
loadBufs[i] = make(chan []byte)
}
for i := 0; i < ecN; i++ {
encodeBufs[i] = make(chan []byte)
}
hashs := make([]string, ecN)
//正式开始写入
go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK)) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)

var wg sync.WaitGroup
wg.Add(ecN)
/*mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(node.ID).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
for i := 0; i < ecN; i++ {
go svc.send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i)
}
wg.Wait()

return hashs, nil

}

func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) {
// TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构
//wg := sync.WaitGroup{}
numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize)
getBufs := make([]chan []byte, ecN)
decodeBufs := make([]chan []byte, ecK)
for i := 0; i < ecN; i++ {
getBufs[i] = make(chan []byte)
}
for i := 0; i < ecK; i++ {
decodeBufs[i] = make(chan []byte)
}
for i := 0; i < len(blockIDs); i++ {
go svc.get(hashs[i], nodeIPs[i], getBufs[blockIDs[i]], numPacket)
}
print(numPacket)
go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket)
r, w := io.Pipe()
//persist函数,将解码得到的文件写入pipe
go func() {
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(decodeBufs); j++ {
tmp := <-decodeBufs[j]
_, err := w.Write(tmp)
if err != nil {
fmt.Errorf("persist file falied, err:%w", err)
}
}
}
w.Close()
}()
return r, nil
}

func (svc *ObjectService) get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error {
downloadFromAgent := false
//使用本地IPFS获取
if svc.ipfs != nil {
log.Infof("try to use local IPFS to download file")
//获取IPFS的reader
reader, err := svc.downloadFromLocalIPFS(blockHash)
if err != nil {
downloadFromAgent = true
fmt.Errorf("read ipfs block failed, err: %w", err)
}
defer reader.Close()
for i := 0; int64(i) < numPacket; i++ {
buf := make([]byte, config.Cfg().ECPacketSize)
_, err := io.ReadFull(reader, buf)
if err != nil {
downloadFromAgent = true
fmt.Errorf("read file falied, err:%w", err)
}
getBuf <- buf
}
if downloadFromAgent == false {
close(getBuf)
return nil
}
} else {
downloadFromAgent = true
}
//从agent获取
if downloadFromAgent == true {
/*// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
// 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
// 下载文件
client := agentcaller.NewFileTransportClient(conn)
reader, err := mygrpc.GetFileAsStream(client, blockHash)
if err != nil {
conn.Close()
return fmt.Errorf("request to get file failed, err: %w", err)
}
for i := 0; int64(i) < numPacket; i++ {
buf := make([]byte, config.Cfg().ECPacketSize)
_, _ = reader.Read(buf)
fmt.Println(buf)
fmt.Println(numPacket, "\n")
getBuf <- buf
}
close(getBuf)
reader.Close()
return nil
}
return nil

}

func load(file io.ReadCloser, loadBufs []chan []byte, ecK int, totalNumPacket int64) error {

for i := 0; int64(i) < totalNumPacket; i++ {

buf := make([]byte, config.Cfg().ECPacketSize)
idx := i % ecK
_, err := file.Read(buf)
if err != nil {
return fmt.Errorf("read file falied, err:%w", err)
}
loadBufs[idx] <- buf

if idx == ecK-1 {
for j := ecK; j < len(loadBufs); j++ {
zeroPkt := make([]byte, config.Cfg().ECPacketSize)
loadBufs[j] <- zeroPkt
}
}
if err != nil && err != io.EOF {
return fmt.Errorf("load file to buf failed, err:%w", err)
}
}
for i := 0; i < len(loadBufs); i++ {

close(loadBufs[i])
}
file.Close()
return nil
}

func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int64, numPacket int64) {
var tmpIn [][]byte
tmpIn = make([][]byte, len(outBufs))
enc := ec.NewRsEnc(ecK, len(outBufs))
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(outBufs); j++ {
tmpIn[j] = <-inBufs[j]
}
enc.Encode(tmpIn)
for j := 0; j < len(outBufs); j++ {
outBufs[j] <- tmpIn[j]
}
}
for i := 0; i < len(outBufs); i++ {
close(outBufs[i])
}
}

func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
fmt.Println("decode ")
var tmpIn [][]byte
var zeroPkt []byte
tmpIn = make([][]byte, len(inBufs))
hasBlock := map[int]bool{}
for j := 0; j < len(blockSeq); j++ {
hasBlock[blockSeq[j]] = true
}
needRepair := false //检测是否传入了所有数据块
for j := 0; j < len(outBufs); j++ {
if blockSeq[j] != j {
needRepair = true
}
}
enc := ec.NewRsEnc(ecK, len(inBufs))
for i := 0; int64(i) < numPacket; i++ {
print("!!!!!")
for j := 0; j < len(inBufs); j++ {
if hasBlock[j] {
tmpIn[j] = <-inBufs[j]
} else {
tmpIn[j] = zeroPkt
}
}
if needRepair {
err := enc.Repair(tmpIn)
if err != nil {
fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
}
}
for j := 0; j < len(outBufs); j++ {
outBufs[j] <- tmpIn[j]
}
}
for i := 0; i < len(outBufs); i++ {
close(outBufs[i])
}
}

func (svc *ObjectService) send(node ramsg.RespNode, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error {
// TODO zkx 先直接复制client\internal\task\upload_rep_objects.go中的uploadToNode和uploadToLocalIPFS来替代这部分逻辑
// 方便之后异步化处理
// uploadToAgent的逻辑反了,而且中间步骤失败,就必须打印日志后停止后续操作

uploadToAgent := true
if svc.ipfs != nil { //使用IPFS传输
//创建IPFS文件
log.Infof("try to use local IPFS to upload block")
writer, err := svc.ipfs.CreateFile()
if err != nil {
uploadToAgent = false
fmt.Errorf("create IPFS file failed, err: %w", err)
}
//逐packet写进ipfs
for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
reader := bytes.NewReader(buf)
_, err = io.Copy(writer, reader)
if err != nil {
uploadToAgent = false
fmt.Errorf("copying block data to IPFS file failed, err: %w", err)
}
}
//finish, 获取哈希
fileHash, err := writer.Finish()
if err != nil {
log.Warnf("upload block to local IPFS failed, so try to upload by agent, err: %s", err.Error())
uploadToAgent = false
fmt.Errorf("finish writing blcok to IPFS failed, err: %w", err)
}
hashs[idx] = fileHash
if err != nil {
}
nodeID := node.ID
// 然后让最近节点pin本地上传的文件
agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
if err != nil {
uploadToAgent = false
fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
}
defer agentClient.Close()

pinObjResp, err := agentClient.StartPinningObject(agtmsg.NewStartPinningObject(fileHash))
if err != nil {
uploadToAgent = false
fmt.Errorf("start pinning object: %w", err)
}
for {
waitResp, err := agentClient.WaitPinningObject(agtmsg.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil {
uploadToAgent = false
fmt.Errorf("waitting pinning object: %w", err)
}
if waitResp.IsComplete {
if waitResp.Error != "" {
uploadToAgent = false
fmt.Errorf("agent pinning object: %s", waitResp.Error)
}
break
}
}
if uploadToAgent == false {
return nil
}
}
//////////////////////////////通过Agent上传
if uploadToAgent == true {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.ExternalIP
if node.IsSameLocation {
nodeIP = node.LocalIP

log.Infof("client and node %d are at the same location, use local ip\n", node.ID)
}

grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return fmt.Errorf("request to send file failed, err: %w", err)
}
// 发送文件数据
for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
reader := bytes.NewReader(buf)
_, err = io.Copy(upload, reader)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return fmt.Errorf("copy block date to upload stream failed, err: %w", err)
}
}
// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return fmt.Errorf("send EOF failed, err: %w", err)
}
hashs[idx] = fileHash
wg.Done()
}
return nil
}

func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
fDir, err := os.Executable()
if err != nil {
panic(err)
}
fURL := filepath.Join(filepath.Dir(fDir), "assets")
_, err = os.Stat(fURL)
if os.IsNotExist(err) {
os.MkdirAll(fURL, os.ModePerm)
}
file, err := os.Create(filepath.Join(fURL, localFilePath))
if err != nil {
return
}
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(inBuf); j++ {
tmp := <-inBuf[j]
fmt.Println(tmp)
file.Write(tmp)
}
}
file.Close()
wg.Done()
}

+ 0
- 322
internal/services/object.go View File

@@ -1,322 +0,0 @@
package services

import (
"fmt"
"io"
"math/rand"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

lo "github.com/samber/lo"
)

type ObjectService struct {
*Service
}

type ResultDownloadObject struct {
ObjectName string
Reader io.ReadCloser
Error error
}

func (svc *Service) ObjectSvc() *ObjectService {
return &ObjectService{Service: svc}
}

func (svc *ObjectService) GetObject(userID int64, objectID int64) (model.Object, error) {
// TODO
panic("not implement yet")
}

func (svc *ObjectService) DownloadObjectDir(userID int64, dirName string) ([]ResultDownloadObject, error) {

mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取文件信息
Object().ReadAny().
// 用于查询Rep配置
ObjectRep().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

// TODO 解锁时机需要优化,在所有文件都写入到本地后再解锁
// 当前是所有文件流全部打开,处理完最后全部关闭,可以考虑加一个迭代器,将文件流逐个打开关闭
defer mutex.Unlock()

//根据dirName查询相关的所有文件
objsResp, err := svc.coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(userID, dirName))
if err != nil {
return nil, fmt.Errorf("get objectID by dirName failed: %w", err)
}
if len(objsResp.Objects) == 0 {
return nil, fmt.Errorf("dirName %v is not exist", dirName)
}

resultDownloadObjects := []ResultDownloadObject{}
for i := 0; i < len(objsResp.Objects); i++ {
reader, err := svc.downloadSingleObject(objsResp.Objects[i].ObjectID, userID)
resultDownloadObjects = append(resultDownloadObjects, ResultDownloadObject{
ObjectName: objsResp.Objects[i].Name,
Reader: reader,
Error: err,
})
}
return resultDownloadObjects, nil
}

func (svc *ObjectService) DownloadObject(userID int64, objectID int64) (io.ReadCloser, error) {
// TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分
mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取文件信息
Object().ReadOne(objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

reader, err := svc.downloadSingleObject(objectID, userID)
if err != nil {
return reader, err
}

// TODO 需要返回Object信息
return myio.AfterReadClosed(reader, func(closer io.ReadCloser) {
// TODO 可以考虑在打开了读取流之后就解锁,而不是要等外部读取完毕
mutex.Unlock()
}), nil
}

func (svc *ObjectService) downloadSingleObject(objectID int64, userID int64) (io.ReadCloser, error) {
preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObject(objectID, userID, config.Cfg().ExternalIP))
if err != nil {
return nil, fmt.Errorf("pre download object: %w", err)
}

switch redundancy := preDownloadResp.Redundancy.(type) {
case ramsg.RespRepRedundancyData:
if len(redundancy.Nodes) == 0 {
return nil, fmt.Errorf("no node has this file")
}

// 选择下载节点
entry := svc.chooseDownloadNode(redundancy.Nodes)

// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := entry.ExternalIP
if entry.IsSameLocation {
nodeIP = entry.LocalIP

log.Infof("client and node %d are at the same location, use local ip\n", entry.ID)
}

reader, err := svc.downloadRepObject(entry.ID, nodeIP, redundancy.FileHash)
if err != nil {
return nil, fmt.Errorf("rep read failed, err: %w", err)
}
return reader, nil

case ramsg.RespEcRedundancyData:
// TODO EC部分的代码要考虑重构
// ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
blocks := redundancy.Blocks
ec := redundancy.Ec
ecK := ec.EcK
ecN := ec.EcN
//采取直接读,优先选内网节点
hashs := make([]string, ecK)
nds := make([]ramsg.RespNode, ecK)
for i := 0; i < ecK; i++ {
hashs[i] = blocks[i].FileHash
nds[i] = svc.chooseDownloadNode(redundancy.Nodes[i])
}
//nodeIDs, nodeIPs直接按照第1~ecK个排列
nodeIDs := make([]int64, ecK)
nodeIPs := make([]string, ecK)
for i := 0; i < ecK; i++ {
nodeIDs[i] = nds[i].ID
nodeIPs[i] = nds[i].ExternalIP
if nds[i].IsSameLocation {
nodeIPs[i] = nds[i].LocalIP
log.Infof("client and node %d are at the same location, use local ip\n", nds[i].ID)
}
}

fileSize := preDownloadResp.FileSize
blockIDs := make([]int, ecK)
for i := 0; i < ecK; i++ {
blockIDs[i] = i
}
reader, err := svc.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, hashs)
if err != nil {
return nil, fmt.Errorf("ec read failed, err: %w", err)
}
return reader, nil
}
return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Redundancy)
}

// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.RespNode {
sameLocationEntries := lo.Filter(entries, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
}

return entries[rand.Intn(len(entries))]
}

func (svc *ObjectService) downloadRepObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
if svc.ipfs != nil {
log.Infof("try to use local IPFS to download file")

reader, err := svc.downloadFromLocalIPFS(fileHash)
if err == nil {
return reader, nil
}

log.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}

return svc.downloadFromNode(nodeID, nodeIP, fileHash)
}

func (svc *ObjectService) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(svc.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

// 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}

// 下载文件
client := agentcaller.NewFileTransportClient(conn)
reader, err := mygrpc.GetFileAsStream(client, fileHash)
if err != nil {
conn.Close()
return nil, fmt.Errorf("request to get file failed, err: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
conn.Close()
mutex.Unlock()
})
return reader, nil
}

func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
reader, err := svc.ipfs.OpenRead(fileHash)
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}

return reader, nil
}

func (svc *ObjectService) StartUploadingRepObjects(userID int64, bucketID int64, uploadObjects []task.UploadObject, repCount int) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewUploadRepObjects(userID, bucketID, uploadObjects, repCount))
return tsk.ID(), nil
}

func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, task.UploadRepObjectsResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
uploadTask := tsk.Body().(*task.UploadRepObjects)
uploadObjectResult := task.UploadRepObjectsResult{
Objects: uploadTask.Objects,
Results: uploadTask.Results,
IsUploading: uploadTask.IsUploading,
}

return true, uploadObjectResult, tsk.Error()
}
return false, task.UploadRepObjectsResult{}, nil
}

func (svc *ObjectService) UploadECObject(userID int64, file io.ReadCloser, fileSize int64, ecName string) error {
// TODO
panic("not implement yet")
}

func (svc *ObjectService) StartUpdatingRepObject(userID int64, objectID int64, file io.ReadCloser, fileSize int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewUpdateRepObject(userID, objectID, file, fileSize))
return tsk.ID(), nil
}

func (svc *ObjectService) WaitUpdatingRepObject(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
}

return false, nil
}

func (svc *ObjectService) DeleteObject(userID int64, objectID int64) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(objectID).
// 用于删除Rep配置
ObjectRep().WriteOne(objectID).
// 用于删除Block配置
ObjectBlock().WriteAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

_, err = svc.coordinator.DeleteObject(coormsg.NewDeleteObject(userID, objectID))
if err != nil {
return fmt.Errorf("deleting object: %w", err)
}

return nil
}

+ 221
- 0
internal/services/package.go View File

@@ -0,0 +1,221 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-client/internal/config"
mytask "gitlink.org.cn/cloudream/storage-client/internal/task"
agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type PackageService struct {
*Service
}

func (svc *Service) PackageSvc() *PackageService {
return &PackageService{Service: svc}
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
/*
TODO2
// TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分
mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取文件信息
Object().ReadOne(objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
*/
getPkgResp, err := svc.coordinator.GetPackage(coormq.NewGetPackage(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package: %w", err)
}

getObjsResp, err := svc.coordinator.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

if getPkgResp.Redundancy.Type == models.RedundancyRep {
getObjRepDataResp, err := svc.coordinator.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
}

iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, svc.coordinator, svc.distlock, iterator.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})

return iter, nil
}

getObjECDataResp, err := svc.coordinator.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecRed models.ECRedundancyInfo
if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := svc.coordinator.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, svc.coordinator, svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, iterator.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})

return iter, nil
}

func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewCreateRepPackage(
userID, bucketID, name, objIter,
repInfo,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage)
return true, &cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewUpdateRepPackage(
userID, packageID, objIter,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateRepPackage)
return true, &updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewCreateECPackage(
userID, bucketID, name, objIter,
ecInfo,
config.Cfg().ECPacketSize,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage)
return true, &cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewUpdateECPackage(
userID, packageID, objIter,
config.Cfg().ECPacketSize,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateECPackage)
return true, &updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
/*
// TODO2
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(objectID).
// 用于删除Rep配置
ObjectRep().WriteOne(objectID).
// 用于删除Block配置
ObjectBlock().WriteAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/

_, err := svc.coordinator.DeletePackage(coormq.NewDeletePackage(userID, packageID))
if err != nil {
return fmt.Errorf("deleting package: %w", err)
}

return nil
}

+ 5
- 5
internal/services/service.go View File

@@ -4,19 +4,19 @@ import (
distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-client/internal/task" "gitlink.org.cn/cloudream/storage-client/internal/task"
racli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
) )


type Service struct { type Service struct {
coordinator *racli.Client
coordinator *coormq.Client
ipfs *ipfs.IPFS ipfs *ipfs.IPFS
scanner *sccli.Client
scanner *scmq.Client
distlock *distlock.Service distlock *distlock.Service
taskMgr *task.Manager taskMgr *task.Manager
} }


func NewService(coorClient *racli.Client, ipfsClient *ipfs.IPFS, scanner *sccli.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
func NewService(coorClient *coormq.Client, ipfsClient *ipfs.IPFS, scanner *scmq.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
return &Service{ return &Service{
coordinator: coorClient, coordinator: coorClient,
ipfs: ipfsClient, ipfs: ipfsClient,


+ 20
- 34
internal/services/storage.go View File

@@ -4,11 +4,11 @@ import (
"fmt" "fmt"
"time" "time"


"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/task" "gitlink.org.cn/cloudream/storage-client/internal/task"
agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type StorageService struct { type StorageService struct {
@@ -19,12 +19,12 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc} return &StorageService{Service: svc}
} }


func (svc *StorageService) StartStorageMoveObject(userID int64, objectID int64, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewMoveObjectToStorage(userID, objectID, storageID))
func (svc *StorageService) StartStorageMovePackage(userID int64, packageID int64, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID))
return tsk.ID(), nil return tsk.ID(), nil
} }


func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time.Duration) (bool, error) {
func (svc *StorageService) WaitStorageMovePackage(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID) tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) { if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error() return true, tsk.Error()
@@ -33,67 +33,53 @@ func (svc *StorageService) WaitStorageMoveObject(taskID string, waitTimeout time


} }


func (svc *StorageService) StartMovingDir(userID int64, dirName string, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewMoveDirToStorage(userID, dirName, storageID))
return tsk.ID(), nil
}

func (svc *StorageService) WaitMovingDir(taskID string, waitTimeout time.Duration) (bool, []task.ResultObjectToStorage, error) {
tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Body().(*task.MoveDirToStorage).ResultObjectToStorages, tsk.Error()
}

return false, nil, nil
}

func (svc *StorageService) DeleteStorageObject(userID int64, objectID int64, storageID int64) error {
func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {
// TODO // TODO
panic("not implement yet") panic("not implement yet")
} }


// 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
func (svc *StorageService) StartStorageUploadRepObject(userID int64, storageID int64, filePath string, bucketID int64, objectName string, repCount int) (int64, string, error) {
stgResp, err := svc.coordinator.GetStorageInfo(coormsg.NewGetStorageInfo(userID, storageID))
func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) {
stgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil { if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err) return 0, "", fmt.Errorf("getting storage info: %w", err)
} }


agentCli, err := agtcli.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ)
agentCli, err := agtmq.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err) return 0, "", fmt.Errorf("new agent client: %w", err)
} }
defer agentCli.Close() defer agentCli.Close()


startResp, err := agentCli.StartStorageUploadRepObject(agtmsg.NewStartStorageUploadRepObject(userID, filePath, bucketID, objectName, repCount, stgResp.Directory))
startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy))
if err != nil { if err != nil {
return 0, "", fmt.Errorf("start storage upload rep object: %w", err)
return 0, "", fmt.Errorf("start storage upload package: %w", err)
} }


return stgResp.NodeID, startResp.TaskID, nil return stgResp.NodeID, startResp.TaskID, nil
} }


func (svc *StorageService) WaitStorageUploadRepObject(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, string, error) {
agentCli, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) {
agentCli, err := agtmq.NewClient(nodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
// TODO 失败是否要当做任务已经结束? // TODO 失败是否要当做任务已经结束?
return true, 0, "", fmt.Errorf("new agent client: %w", err)
return true, 0, fmt.Errorf("new agent client: %w", err)
} }
defer agentCli.Close() defer agentCli.Close()


waitResp, err := agentCli.WaitStorageUploadRepObject(agtmsg.NewWaitStorageUploadRepObject(taskID, waitTimeout.Milliseconds()))
waitResp, err := agentCli.WaitStorageCreatePackage(agtmq.NewWaitStorageCreatePackage(taskID, waitTimeout.Milliseconds()))
if err != nil { if err != nil {
// TODO 请求失败是否要当做任务已经结束? // TODO 请求失败是否要当做任务已经结束?
return true, 0, "", fmt.Errorf("wait storage upload rep object: %w", err)
return true, 0, fmt.Errorf("wait storage upload package: %w", err)
} }


if !waitResp.IsComplete { if !waitResp.IsComplete {
return false, 0, "", nil
return false, 0, nil
} }


if waitResp.Error != "" { if waitResp.Error != "" {
return true, 0, "", fmt.Errorf("%s", waitResp.Error)
return true, 0, fmt.Errorf("%s", waitResp.Error)
} }


return true, waitResp.ObjectID, waitResp.FileHash, nil
return true, waitResp.PackageID, nil
} }

+ 0
- 83
internal/task/move_dir_to_storage.go View File

@@ -1,83 +0,0 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type MoveDirToStorage struct {
userID int64
dirName string
storageID int64
ResultObjectToStorages []ResultObjectToStorage
}

type ResultObjectToStorage struct {
ObjectName string
Error error
}

func NewMoveDirToStorage(userID int64, dirName string, storageID int64) *MoveDirToStorage {
return &MoveDirToStorage{
userID: userID,
dirName: dirName,
storageID: storageID,
}
}

func (t *MoveDirToStorage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *MoveDirToStorage) do(ctx TaskContext) error {
//根据dirName查询相关的所有文件
objsResp, err := ctx.Coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(t.userID, t.dirName))
if err != nil {
return fmt.Errorf("get objectID by dirName failed: %w", err)
}
if len(objsResp.Objects) == 0 {
return fmt.Errorf("dirName %v is not exist", t.dirName)
}

reqBlder := reqbuilder.NewBuilder()
for _, object := range objsResp.Objects {
reqBlder.Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(object.ObjectID, t.storageID).
// 用于读取对象信息
Object().ReadOne(object.ObjectID).
// 用于查询Rep配置
ObjectRep().ReadOne(object.ObjectID).
// 用于创建Move记录
StorageObject().CreateOne(t.storageID, t.userID, object.ObjectID).
// 用于创建对象文件
Storage().CreateOneObject(t.storageID, t.userID, object.ObjectID)
}
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
MutexLock(ctx.DistLock)

if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

for i := 0; i < len(objsResp.Objects); i++ {
err := moveSingleObjectToStorage(ctx, t.userID, objsResp.Objects[i].ObjectID, t.storageID)
t.ResultObjectToStorages = append(t.ResultObjectToStorages, ResultObjectToStorage{
ObjectName: objsResp.Objects[i].Name,
Error: err,
})
}
return nil
}

+ 0
- 108
internal/task/move_object_to_storage.go View File

@@ -1,108 +0,0 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-client/internal/config"
agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type MoveObjectToStorage struct {
userID int64
objectID int64
storageID int64
}

func NewMoveObjectToStorage(userID int64, objectID int64, storageID int64) *MoveObjectToStorage {
return &MoveObjectToStorage{
userID: userID,
objectID: objectID,
storageID: storageID,
}
}

func (t *MoveObjectToStorage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *MoveObjectToStorage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.objectID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取对象信息
Object().ReadOne(t.objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(t.objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StorageObject().CreateOne(t.storageID, t.userID, t.objectID).
Storage().
// 用于创建对象文件
CreateOneObject(t.storageID, t.userID, t.objectID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
err = moveSingleObjectToStorage(ctx, t.userID, t.objectID, t.storageID)
return err
}

func moveSingleObjectToStorage(ctx TaskContext, userID int64, objectID int64, storageID int64) error {
// 先向协调端请求文件相关的元数据
preMoveResp, err := ctx.Coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorage(objectID, storageID, userID))
if err != nil {
return fmt.Errorf("pre move object to storage: %w", err)
}

// 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewClient(preMoveResp.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", preMoveResp.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartStorageMoveObject(
agtmsg.NewStartStorageMoveObject(preMoveResp.Directory,
objectID,
preMoveResp.Object.Name,
userID,
preMoveResp.Object.FileSize,
preMoveResp.Redundancy,
))
if err != nil {
return fmt.Errorf("start moving object to storage: %w", err)
}

for {
waitResp, err := agentClient.WaitStorageMoveObject(agtmsg.NewWaitStorageMoveObject(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait moving object: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent moving object: %s", waitResp.Error)
}

break
}
}

_, err = ctx.Coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorage(objectID, storageID, userID))
if err != nil {
return fmt.Errorf("moving object to storage: %w", err)
}
return nil
}

+ 101
- 0
internal/task/storage_move_package.go View File

@@ -0,0 +1,101 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-client/internal/config"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type StorageMovePackage struct {
userID int64
packageID int64
storageID int64
}

func NewStorageMovePackage(userID int64, packageID int64, storageID int64) *StorageMovePackage {
return &StorageMovePackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageMovePackage) do(ctx TaskContext) error {
/*
TODO2
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.packageID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取对象信息
Object().ReadOne(t.packageID).
// 用于查询Rep配置
ObjectRep().ReadOne(t.packageID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StorageObject().CreateOne(t.storageID, t.userID, t.packageID).
Storage().
// 用于创建对象文件
CreateOneObject(t.storageID, t.userID, t.packageID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
getStgResp, err := ctx.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentClient, err := agtmq.NewClient(getStgResp.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartStorageMovePackage(
agtmq.NewStartStorageMovePackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start moving package to storage: %w", err)
}

for {
waitResp, err := agentClient.WaitStorageMovePackage(agtmq.NewWaitStorageMovePackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait moving package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent moving package: %s", waitResp.Error)
}

break
}
}

_, err = ctx.Coordinator().PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID))
if err != nil {
return fmt.Errorf("moving package to storage: %w", err)
}
return nil
}

+ 8
- 8
internal/task/task.go View File

@@ -4,13 +4,13 @@ import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type TaskContext struct { type TaskContext struct {
IPFS *ipfs.IPFS
DistLock *distsvc.Service
Coordinator *coorcli.Client
ipfs *ipfs.IPFS
distLock *distsvc.Service
coordinator *coormq.Client
} }


// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +25,10 @@ type Task = task.Task[TaskContext]


type CompleteOption = task.CompleteOption type CompleteOption = task.CompleteOption


func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coorcli.Client) Manager {
func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coormq.Client) Manager {
return task.NewManager(TaskContext{ return task.NewManager(TaskContext{
IPFS: ipfs,
DistLock: distlock,
Coordinator: coorCli,
ipfs: ipfs,
distLock: distlock,
coordinator: coorCli,
}) })
} }

+ 0
- 147
internal/task/update_rep_object.go View File

@@ -1,147 +0,0 @@
package task

import (
"fmt"
"io"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage-client/internal/config"

coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

type UpdateRepObject struct {
userID int64
objectID int64
file io.ReadCloser
fileSize int64
}

func NewUpdateRepObject(userID int64, objectID int64, file io.ReadCloser, fileSize int64) *UpdateRepObject {
return &UpdateRepObject{
userID: userID,
objectID: objectID,
file: file,
fileSize: fileSize,
}
}

func (t *UpdateRepObject) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *UpdateRepObject) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(t.objectID).
// 用于更新Rep配置
ObjectRep().WriteOne(t.objectID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建Cache记录
Cache().CreateAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

preResp, err := ctx.Coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObject(
t.objectID,
t.fileSize,
t.userID,
config.Cfg().ExternalIP,
))
if err != nil {
return fmt.Errorf("pre update rep object: %w", err)
}

if len(preResp.Nodes) == 0 {
return fmt.Errorf("no node to upload file")
}

// 上传文件的方式优先级:
// 1. 本地IPFS
// 2. 包含了旧文件,且与客户端在同地域的节点
// 3. 不在同地域,但包含了旧文件的节点
// 4. 同地域节点

uploadNode := t.chooseUpdateRepObjectNode(preResp.Nodes)

var fileHash string
uploadedNodeIDs := []int64{}
willUploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if ctx.IPFS != nil {
logger.Infof("try to use local IPFS to upload file")

fileHash, err = uploadToLocalIPFS(ctx.IPFS, t.file, uploadNode.ID)
if err != nil {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
willUploadToNode = false
}
}

// 否则发送到agent上传
if willUploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
IPFS().
// 防止上传的副本被清除
CreateAnyRep(uploadNode.ID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = uploadToNode(t.file, nodeIP)
if err != nil {
return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

// 更新Object
_, err = ctx.Coordinator.UpdateRepObject(coormsg.NewUpdateRepObject(t.objectID, fileHash, t.fileSize, uploadedNodeIDs, t.userID))
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UpdateRepObject) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRepObjectRespNode) coormsg.PreUpdateRepObjectRespNode {
mysort.Sort(nodes, func(left, right coormsg.PreUpdateRepObjectRespNode) int {
v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
if v != 0 {
return v
}

return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
})

return nodes[0]
}

+ 0
- 54
internal/task/update_rep_object_test.go View File

@@ -1,54 +0,0 @@
package task

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
)

func Test_chooseUpdateRepObjectNode(t *testing.T) {
testcases := []struct {
title string
nodes []coormsg.PreUpdateRepObjectRespNode
wantNodeID int
}{
{
title: "选择同地域,包含旧数据的节点",
nodes: []coormsg.PreUpdateRepObjectRespNode{
coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false),
coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false),
coormsg.NewPreUpdateRepObjectRespNode(2, "", "", false, true),
coormsg.NewPreUpdateRepObjectRespNode(3, "", "", true, true),
},
wantNodeID: 3,
},

{
title: "选择包含旧数据的节点",
nodes: []coormsg.PreUpdateRepObjectRespNode{
coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false),
coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false),
coormsg.NewPreUpdateRepObjectRespNode(2, "", "", false, true),
},
wantNodeID: 2,
},

{
title: "选择包含同地域的节点",
nodes: []coormsg.PreUpdateRepObjectRespNode{
coormsg.NewPreUpdateRepObjectRespNode(0, "", "", true, false),
coormsg.NewPreUpdateRepObjectRespNode(1, "", "", false, false),
},
wantNodeID: 0,
},
}

var tsk UpdateRepObject
for _, test := range testcases {
Convey(test.title, t, func() {
chooseNode := tsk.chooseUpdateRepObjectNode(test.nodes)
So(chooseNode.ID, ShouldEqual, test.wantNodeID)
})
}
}

+ 0
- 294
internal/task/upload_rep_objects.go View File

@@ -1,294 +0,0 @@
package task

import (
"fmt"
"io"
"math/rand"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-common/utils"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"

agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent"
ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// UploadObjects和UploadRepResults为一一对应关系
type UploadRepObjects struct {
userID int64
bucketID int64
repCount int
Objects []UploadObject
Results []UploadSingleRepObjectResult
IsUploading bool
}

type UploadRepObjectsResult struct {
Objects []UploadObject
Results []UploadSingleRepObjectResult
IsUploading bool
}

type UploadObject struct {
ObjectName string
File io.ReadCloser
FileSize int64
}

type UploadSingleRepObjectResult struct {
Error error
FileHash string
ObjectID int64
}

func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects {
return &UploadRepObjects{
userID: userID,
bucketID: bucketID,
Objects: uploadObjects,
repCount: repCount,
}
}

func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *UploadRepObjects) do(ctx TaskContext) error {

reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.Objects {
reqBlder.Metadata().
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, uploadObject.ObjectName)
}
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

var repWriteResps []*coormsg.PreUploadResp

//判断是否所有文件都符合上传条件
hasFailure := true
for i := 0; i < len(t.Objects); i++ {
repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i])
if err != nil {
hasFailure = false
t.Results = append(t.Results,
UploadSingleRepObjectResult{
Error: err,
FileHash: "",
ObjectID: 0,
})
continue
}
t.Results = append(t.Results, UploadSingleRepObjectResult{})
repWriteResps = append(repWriteResps, repWriteResp)
}

// 不满足上传条件,返回各文件检查结果
if !hasFailure {
return nil
}

//上传文件夹
t.IsUploading = true
for i := 0; i < len(repWriteResps); i++ {
objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i])
// 记录文件上传结果
t.Results[i] = UploadSingleRepObjectResult{
Error: err,
FileHash: fileHash,
ObjectID: objectID,
}
}
return nil
}

// 检查单个文件是否能够上传
func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) {
//发送写请求,请求Coor分配写入节点Ip
repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP))
if err != nil {
return nil, fmt.Errorf("pre upload rep object: %w", err)
}
if len(repWriteResp.Nodes) == 0 {
return nil, fmt.Errorf("no node to upload file")
}
return repWriteResp, nil
}

// 上传文件
func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) {
uploadNode := t.chooseUploadNode(preResp.Nodes)

var fileHash string
uploadedNodeIDs := []int64{}
willUploadToNode := true
// 本地有IPFS,则直接从本地IPFS上传
if ctx.IPFS != nil {
logger.Infof("try to use local IPFS to upload file")

var err error
fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File, uploadNode.ID)
if err != nil {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
} else {
willUploadToNode = false
}
}

// 否则发送到agent上传
if willUploadToNode {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(uploadNode.ID).
MutexLock(ctx.DistLock)
if err != nil {
return 0, "", fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = uploadToNode(uploadObject.File, nodeIP)
if err != nil {
return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

dirName := utils.GetDirectoryName(uploadObject.ObjectName)

// 记录写入的文件的Hash
createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName))
if err != nil {
return 0, "", fmt.Errorf("creating rep object: %w", err)
}

return createResp.ObjectID, fileHash, nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
}

func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
// 建立grpc连接,发送请求
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return "", fmt.Errorf("request to send file failed, err: %w", err)
}

// 发送文件数据
_, err = io.Copy(upload, file)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
}

// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("send EOF failed, err: %w", err)
}

return fileHash, nil
}

func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser, nodeID int64) (string, error) {
// 从本地IPFS上传文件
writer, err := ipfs.CreateFile()
if err != nil {
return "", fmt.Errorf("create IPFS file failed, err: %w", err)
}

_, err = io.Copy(writer, file)
if err != nil {
return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
}

fileHash, err := writer.Finish()
if err != nil {
return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
}

// 然后让最近节点pin本地上传的文件
agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
if err != nil {
return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
}
defer agentClient.Close()

pinObjResp, err := agentClient.StartPinningObject(agtmsg.NewStartPinningObject(fileHash))
if err != nil {
return "", fmt.Errorf("start pinning object: %w", err)
}

for {
waitResp, err := agentClient.WaitPinningObject(agtmsg.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil {
return "", fmt.Errorf("waitting pinning object: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return "", fmt.Errorf("agent pinning object: %s", waitResp.Error)
}

break
}
}

return fileHash, nil
}

+ 4
- 4
main.go View File

@@ -13,8 +13,8 @@ import (
"gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/services" "gitlink.org.cn/cloudream/storage-client/internal/services"
"gitlink.org.cn/cloudream/storage-client/internal/task" "gitlink.org.cn/cloudream/storage-client/internal/task"
coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
) )


func main() { func main() {
@@ -30,13 +30,13 @@ func main() {
os.Exit(1) os.Exit(1)
} }


coorClient, err := coorcli.NewClient(&config.Cfg().RabbitMQ)
coorClient, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Warnf("new coordinator client failed, err: %s", err.Error()) log.Warnf("new coordinator client failed, err: %s", err.Error())
os.Exit(1) os.Exit(1)
} }


scanner, err := sccli.NewClient(&config.Cfg().RabbitMQ)
scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Warnf("new scanner client failed, err: %s", err.Error()) log.Warnf("new scanner client failed, err: %s", err.Error())
os.Exit(1) os.Exit(1)


Loading…
Cancel
Save