Browse Source

修改上传下载Package的逻辑

gitlink
Sydonian 1 year ago
parent
commit
b82e2e016f
33 changed files with 788 additions and 1642 deletions
  1. +3
    -31
      agent/internal/services/mq/storage.go
  2. +9
    -9
      agent/internal/task/create_ec_package.go
  3. +5
    -4
      client/internal/cmdline/bucket.go
  4. +3
    -1
      client/internal/cmdline/cache.go
  5. +23
    -125
      client/internal/cmdline/package.go
  6. +6
    -7
      client/internal/cmdline/storage.go
  7. +2
    -2
      client/internal/http/package.go
  8. +18
    -105
      client/internal/services/package.go
  9. +8
    -8
      client/internal/task/create_ec_package.go
  10. +8
    -8
      client/internal/task/update_ec_package.go
  11. +20
    -31
      common/assets/scripts/create_database.sql
  12. +0
    -11
      common/consts/consts.go
  13. +23
    -15
      common/models/models.go
  14. +244
    -62
      common/pkgs/cmd/create_ec_package.go
  15. +0
    -304
      common/pkgs/cmd/create_rep_package.go
  16. +9
    -75
      common/pkgs/cmd/download_package.go
  17. +18
    -33
      common/pkgs/cmd/update_ec_package.go
  18. +0
    -128
      common/pkgs/cmd/update_rep_package.go
  19. +0
    -30
      common/pkgs/db/ec.go
  20. +51
    -52
      common/pkgs/db/model/model.go
  21. +32
    -170
      common/pkgs/db/object.go
  22. +24
    -17
      common/pkgs/db/object_block.go
  23. +4
    -5
      common/pkgs/db/package.go
  24. +6
    -7
      common/pkgs/db/storage_package.go
  25. +34
    -0
      common/pkgs/db/storage_package_log.go
  26. +216
    -74
      common/pkgs/iterator/ec_object_iterator.go
  27. +0
    -212
      common/pkgs/iterator/rep_object_iterator.go
  28. +0
    -64
      common/pkgs/mq/coordinator/common.go
  29. +13
    -13
      common/pkgs/mq/coordinator/object.go
  30. +5
    -5
      common/pkgs/mq/coordinator/package.go
  31. +0
    -2
      common/pkgs/mq/coordinator/server.go
  32. +4
    -2
      common/utils/utils.go
  33. +0
    -30
      coordinator/internal/services/conmmon.go

+ 3
- 31
agent/internal/services/mq/storage.go View File

@@ -195,27 +195,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
}

objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes)

if msg.Redundancy.IsRepInfo() {
repInfo, err := msg.Redundancy.ToRepInfo()
if err != nil {
logger.Warnf("getting rep redundancy info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed")
}

tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, msg.NodeAffinity))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
}

ecInfo, err := msg.Redundancy.ToECInfo()
if err != nil {
logger.Warnf("getting ec redundancy info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed")
}

tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, msg.NodeAffinity))
tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.NodeAffinity))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
}

@@ -235,14 +215,6 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
}

// TODO 避免判断类型
if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID))
}

if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID))
}

return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
taskBody := tsk.Body().(*mytask.CreatePackage)
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", taskBody.Result.PackageID))
}

+ 9
- 9
agent/internal/task/create_ec_package.go View File

@@ -10,22 +10,22 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult
type CreatePackageResult = cmd.CreatePackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage
type CreatePackage struct {
cmd cmd.CreatePackage

Result *CreateECPackageResult
Result *CreatePackageResult
}

func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity),
func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage {
return &CreatePackage{
cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity),
}
}

func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateECPackage]("Task")
func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreatePackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")



+ 5
- 4
client/internal/cmdline/bucket.go View File

@@ -4,10 +4,11 @@ import (
"fmt"

"github.com/jedib0t/go-pretty/v6/table"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func BucketListUserBuckets(ctx CommandContext) error {
userID := int64(0)
userID := cdssdk.UserID(0)

buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets(userID)
if err != nil {
@@ -28,7 +29,7 @@ func BucketListUserBuckets(ctx CommandContext) error {
}

func BucketCreateBucket(ctx CommandContext, bucketName string) error {
userID := int64(0)
userID := cdssdk.UserID(0)

bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(userID, bucketName)
if err != nil {
@@ -39,8 +40,8 @@ func BucketCreateBucket(ctx CommandContext, bucketName string) error {
return nil
}

func BucketDeleteBucket(ctx CommandContext, bucketID int64) error {
userID := int64(0)
func BucketDeleteBucket(ctx CommandContext, bucketID cdssdk.BucketID) error {
userID := cdssdk.UserID(0)

err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(userID, bucketID)
if err != nil {


+ 3
- 1
client/internal/cmdline/cache.go View File

@@ -3,9 +3,11 @@ package cmdline
import (
"fmt"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error {
func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error {
taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID)
if err != nil {
return fmt.Errorf("start cache moving package: %w", err)


+ 23
- 125
client/internal/cmdline/package.go View File

@@ -12,8 +12,8 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

func PackageListBucketPackages(ctx CommandContext, bucketID int64) error {
userID := int64(0)
func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error {
userID := cdssdk.UserID(0)

packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID)
if err != nil {
@@ -23,17 +23,17 @@ func PackageListBucketPackages(ctx CommandContext, bucketID int64) error {
fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID, userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State", "Redundancy"})
tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State"})

for _, obj := range packages {
tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State, obj.Redundancy})
tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State})
}

fmt.Print(tb.Render())
fmt.Println(tb.Render())
return nil
}

func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int64) error {
func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdssdk.PackageID) error {
err := os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err)
@@ -86,7 +86,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6
return nil
}

func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int, nodeAffinity []int64) error {
func PackageCreatePackage(ctx CommandContext, rootPath string, bucketID cdssdk.BucketID, name string, nodeAffinity []cdssdk.NodeID) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string
@@ -105,122 +105,24 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

var nodeAff *int64
var nodeAff *cdssdk.NodeID
if len(nodeAffinity) > 0 {
nodeAff = &nodeAffinity[0]
n := cdssdk.NodeID(nodeAffinity[0])
nodeAff = &n
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, cdssdk.NewRepRedundancyInfo(repCount), nodeAff)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingPackage(0, bucketID, name, objIter, nodeAff)

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5)
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading rep object: %w", err)
}

tb := table.NewWriter()

tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"})
for i := 0; i < len(uploadObjectResult.ObjectResults); i++ {
tb.AppendRow(table.Row{
uploadObjectResult.ObjectResults[i].Info.Path,
uploadObjectResult.ObjectResults[i].ObjectID,
uploadObjectResult.ObjectResults[i].FileHash,
})
}
fmt.Print(tb.Render())
return nil
}

if err != nil {
return fmt.Errorf("wait uploading: %w", err)
}
}
}

func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingRepPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating rep object: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait updating: %w", err)
}
}
}

func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int, nodeAffinity []int64) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
}

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
}

return nil
})
if err != nil {
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

var nodeAff *int64
if len(nodeAffinity) > 0 {
nodeAff = &nodeAffinity[0]
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, chunkSize), nodeAff)

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}

for {
complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("uploading ec package: %w", err)
return fmt.Errorf("uploading package: %w", err)
}

tb := table.NewWriter()
@@ -242,7 +144,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64,
}
}

func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string) error {
func PackageUpdatePackage(ctx CommandContext, packageID cdssdk.PackageID, rootPath string) error {
//userID := int64(0)

var uploadFilePathes []string
@@ -262,16 +164,16 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update package %d failed, err: %w", packageID, err)
}

for {
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingECPackage(taskID, time.Second*5)
complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingPackage(taskID, time.Second*5)
if complete {
if err != nil {
return fmt.Errorf("updating ec package: %w", err)
return fmt.Errorf("updating package: %w", err)
}

return nil
@@ -283,8 +185,8 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string
}
}

func PackageDeletePackage(ctx CommandContext, packageID int64) error {
userID := int64(0)
func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error {
userID := cdssdk.UserID(0)
err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID)
if err != nil {
return fmt.Errorf("delete package %d failed, err: %w", packageID, err)
@@ -292,7 +194,7 @@ func PackageDeletePackage(ctx CommandContext, packageID int64) error {
return nil
}

func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) error {
func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID)
fmt.Printf("resp: %v\n", resp)
if err != nil {
@@ -301,7 +203,7 @@ func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) er
return nil
}

func PackageGetLoadedNodes(ctx CommandContext, packageID int64, userID int64) error {
func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID)
fmt.Printf("nodeIDs: %v\n", nodeIDs)
if err != nil {
@@ -315,13 +217,9 @@ func init() {

commands.MustAdd(PackageDownloadPackage, "pkg", "get")

commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "rep")

commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep")

commands.MustAdd(PackageUploadECPackage, "pkg", "new", "ec")
commands.MustAdd(PackageCreatePackage, "pkg", "new")

commands.MustAdd(PackageUpdateECPackage, "pkg", "update", "ec")
commands.MustAdd(PackageUpdatePackage, "pkg", "update")

commands.MustAdd(PackageDeletePackage, "pkg", "delete")



+ 6
- 7
client/internal/cmdline/storage.go View File

@@ -7,7 +7,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error {
func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
@@ -30,11 +30,10 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er
}
}

func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path,
cdssdk.NewTypedRepRedundancyInfo(repCount), nil)
func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, nil)
if err != nil {
return fmt.Errorf("start storage uploading rep package: %w", err)
return fmt.Errorf("start storage uploading package: %w", err)
}

for {
@@ -55,7 +54,7 @@ func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, st
}

func init() {
commands.MustAdd(StorageLoadPackage, "stg", "load", "pkg")
commands.MustAdd(StorageLoadPackage, "stg", "pkg", "load")

commands.MustAdd(StorageCreateRepPackage, "stg", "upload", "rep")
commands.MustAdd(StorageCreatePackage, "stg", "pkg", "new")
}

+ 2
- 2
client/internal/http/package.go View File

@@ -89,7 +89,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {

objIter := mapMultiPartFileToUploadingObject(req.Files)

taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity)
taskID, err := s.svc.PackageSvc().StartCreatingPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity)

if err != nil {
log.Warnf("start uploading ec package task: %s", err.Error())
@@ -98,7 +98,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {
}

for {
complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5)
complete, createResult, err := s.svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5)
if complete {
if err != nil {
log.Warnf("uploading ec package: %s", err.Error())


+ 18
- 105
client/internal/services/package.go View File

@@ -23,7 +23,7 @@ func (svc *Service) PackageSvc() *PackageService {
return &PackageService{Service: svc}
}

func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, error) {
func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*model.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -38,7 +38,7 @@ func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, e
return &getResp.Package, nil
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -65,137 +65,50 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID))
getObjsResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(packageID))
if err != nil {
return nil, fmt.Errorf("getting package: %w", err)
return nil, fmt.Errorf("getting package object details: %w", err)
}

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

if getPkgResp.Redundancy.IsRepInfo() {
iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli)

if err != nil {
mutex.Unlock()
return nil, err
}

iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
} else {
iter, err := svc.downloadECPackage(getPkgResp.Package, getObjsResp.Objects, coorCli)

if err != nil {
mutex.Unlock()
return nil, err
}

iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
}
}

func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.Client) (*iterator.RepObjectIterator, error) {
getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
}

iter := iterator.NewRepObjectIterator(objects, getObjRepDataResp.Data, &iterator.DownloadContext{
iter := iterator.NewObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{
Distlock: svc.DistLock,
})

return iter, nil
}
func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.Client) (*iterator.ECObjectIterator, error) {
getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID))
if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
iter.OnClosing = func() {
mutex.Unlock()
}

iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{
Distlock: svc.DistLock,
})

return iter, nil
}

func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo cdssdk.RepRedundancyInfo, nodeAffinity *int64) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo cdssdk.ECRedundancyInfo, nodeAffinity *int64) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity))
func (svc *PackageService) StartCreatingPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateECPackageResult, error) {
func (svc *PackageService) WaitCreatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreatePackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*mytask.CreateECPackage)
cteatePkgTask := tsk.Body().(*mytask.CreatePackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter))
func (svc *PackageService) StartUpdatingPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewUpdatePackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) {
func (svc *PackageService) WaitUpdatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdatePackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*mytask.UpdateECPackage)
updatePkgTask := tsk.Body().(*mytask.UpdatePackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
@@ -230,7 +143,7 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
return nil
}

func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (cdssdk.PackageCachingInfo, error) {
func (svc *PackageService) GetCachedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return cdssdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err)
@@ -250,7 +163,7 @@ func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (cdssdk
return tmp, nil
}

func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int64, error) {
func (svc *PackageService) GetLoadedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]cdssdk.NodeID, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)


+ 8
- 8
client/internal/task/create_ec_package.go View File

@@ -9,21 +9,21 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult
type CreatePackageResult = cmd.CreatePackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage
type CreatePackage struct {
cmd cmd.CreatePackage

Result *CreateECPackageResult
Result *CreatePackageResult
}

func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity),
func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage {
return &CreatePackage{
cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity),
}
}

func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})


+ 8
- 8
client/internal/task/update_ec_package.go View File

@@ -9,21 +9,21 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

type UpdateECPackageResult = cmd.UpdateECPackageResult
type UpdatePackageResult = cmd.UpdatePackageResult

type UpdateECPackage struct {
cmd cmd.UpdateECPackage
type UpdatePackage struct {
cmd cmd.UpdatePackage

Result *UpdateECPackageResult
Result *UpdatePackageResult
}

func NewUpdateECPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdateECPackage {
return &UpdateECPackage{
cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter),
func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdatePackage {
return &UpdatePackage{
cmd: *cmd.NewUpdatePackage(userID, packageID, objectIter),
}
}

func (t *UpdateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
func (t *UpdatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})


+ 20
- 31
common/assets/scripts/create_database.sql View File

@@ -37,13 +37,15 @@ values
5010,
1,
"alive"
) create table Storage (
StorageID int not null auto_increment primary key comment '存储服务ID',
Name varchar(100) not null comment '存储服务名称',
NodeID int not null comment '存储服务所在节点的ID',
Directory varchar(4096) not null comment '存储服务所在节点的目录',
State varchar(100) comment '状态'
) comment = "存储服务表";
);

create table Storage (
StorageID int not null auto_increment primary key comment '存储服务ID',
Name varchar(100) not null comment '存储服务名称',
NodeID int not null comment '存储服务所在节点的ID',
Directory varchar(4096) not null comment '存储服务所在节点的目录',
State varchar(100) comment '状态'
) comment = "存储服务表";

insert into
Storage (StorageID, Name, NodeID, Directory, State)
@@ -122,16 +124,12 @@ create table Object (
UNIQUE KEY PackagePath (PackageID, Path)
) comment = '对象表';

create table ObjectRep (
ObjectID int not null primary key comment '对象ID',
FileHash varchar(100) not null comment '副本哈希值'
) comment = '对象副本表';

create table ObjectBlock (
ObjectID int not null comment '对象ID',
`Index` int not null comment '编码块在条带内的排序',
NodeID int not null comment '此编码块应该存在的节点',
FileHash varchar(100) not null comment '编码块哈希值',
primary key(ObjectID, `Index`)
primary key(ObjectID, `Index`, NodeID)
) comment = '对象编码块表';

create table Cache (
@@ -151,6 +149,14 @@ create table StoragePackage (
primary key(PackageID, StorageID, UserID)
);

create table StoragePackageLog (
PackageID int not null comment '包ID',
StorageID int not null comment '存储服务ID',
UserID int not null comment '调度了此文件的用户ID',
CreateTime timestamp not null comment '加载Package完成的时间',
primary key(PackageID, StorageID, UserID)
);

create table Location (
LocationID int not null auto_increment primary key comment 'ID',
Name varchar(128) not null comment '名称'
@@ -159,21 +165,4 @@ create table Location (
insert into
Location (LocationID, Name)
values
(1, "Local");

create table Ec (
EcID int not null primary key comment '纠删码ID',
Name varchar(128) not null comment '纠删码名称',
EcK int not null comment 'ecK',
EcN int not null comment 'ecN'
) comment = '纠删码表';

insert into
Ec (EcID, Name, EcK, EcN)
values
(1, "rs_9_6", 6, 9);

insert into
Ec (EcID, Name, EcK, EcN)
values
(2, "rs_5_3", 3, 5);
(1, "Local");

+ 0
- 11
common/consts/consts.go View File

@@ -10,17 +10,6 @@ const (
NodeStateUnavailable = "Unavailable"
)

const (
PackageStateNormal = "Normal"
PackageStateDeleted = "Deleted"
)

const (
StoragePackageStateNormal = "Normal"
StoragePackageStateDeleted = "Deleted"
StoragePackageStateOutdated = "Outdated"
)

const (
CacheStatePinned = "Pinned"
CacheStateTemp = "Temp"


+ 23
- 15
common/models/models.go View File

@@ -2,40 +2,48 @@ package stgmod

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

/// TODO 将分散在各处的公共结构体定义集中到这里来
type ObjectBlock struct {
ObjectID cdssdk.ObjectID `db:"ObjectID" json:"objectID"`
Index int `db:"Index" json:"index"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` // 这个块应该在哪个节点上
FileHash string `db:"FileHash" json:"fileHash"`
}

type ObjectBlockData struct {
type ObjectBlockDetail struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
Index int `json:"index"`
FileHash string `json:"fileHash"`
NodeID cdssdk.NodeID `json:"nodeID"`
CachedNodeIDs []cdssdk.NodeID `json:"nodeIDs"`
NodeIDs []cdssdk.NodeID `json:"nodeID"` // 这个块应该在哪些节点上
CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 哪些节点实际缓存了这个块
}

func NewObjectBlockData(index int, fileHash string, nodeID cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockData {
return ObjectBlockData{
func NewObjectBlockDetail(objID cdssdk.ObjectID, index int, fileHash string, nodeIDs []cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockDetail {
return ObjectBlockDetail{
ObjectID: objID,
Index: index,
FileHash: fileHash,
NodeIDs: nodeIDs,
CachedNodeIDs: cachedNodeIDs,
}
}

type ObjectECData struct {
Object model.Object `json:"object"`
Blocks []ObjectBlockData `json:"blocks"`
type ObjectDetail struct {
Object cdssdk.Object `json:"object"`
Blocks []ObjectBlockDetail `json:"blocks"`
}

func NewObjectECData(object model.Object, blocks []ObjectBlockData) ObjectECData {
return ObjectECData{
func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlockDetail) ObjectDetail {
return ObjectDetail{
Object: object,
Blocks: blocks,
}
}

type LocalMachineInfo struct {
NodeID *cdssdk.NodeID `json:"nodeID"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
NodeID *cdssdk.NodeID `json:"nodeID"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
LocationID cdssdk.LocationID `json:"locationID"`
}

+ 244
- 62
common/pkgs/cmd/create_ec_package.go View File

@@ -1,56 +1,70 @@
package cmd

import (
"errors"
"fmt"
"io"
"math/rand"
"sync"
"time"

"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

myio "gitlink.org.cn/cloudream/common/utils/io"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type CreateECPackage struct {
userID int64
bucketID int64
type CreatePackage struct {
userID cdssdk.UserID
bucketID cdssdk.BucketID
name string
objectIter iterator.UploadingObjectIterator
redundancy cdssdk.ECRedundancyInfo
nodeAffinity *int64
nodeAffinity *cdssdk.NodeID
}

type CreateECPackageResult struct {
PackageID int64
ObjectResults []ECObjectUploadResult
type CreatePackageResult struct {
PackageID cdssdk.PackageID
ObjectResults []ObjectUploadResult
}

type ECObjectUploadResult struct {
Info *iterator.IterUploadingObject
Error error
ObjectID int64
type ObjectUploadResult struct {
Info *iterator.IterUploadingObject
Error error
// TODO 这个字段没有被赋值
ObjectID cdssdk.ObjectID
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
return &CreateECPackage{
type UploadNodeInfo struct {
Node model.Node
IsSameLocation bool
}

type UpdatePackageContext struct {
Distlock *distlock.Service
}

func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage {
return &CreatePackage{
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
nodeAffinity: nodeAffinity,
}
}

func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageResult, error) {
func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult, error) {
defer t.objectIter.Close()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
@@ -78,8 +92,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe
}
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
cdssdk.NewTypedRedundancyInfo(t.redundancy)))
createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}
@@ -89,30 +102,20 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe
return nil, fmt.Errorf("getting user nodes: %w", err)
}

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}

uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
return UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

// 给上传节点的IPFS加锁
ipfsReqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if stgglb.Local.NodeID != nil {
ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID)
}
for _, node := range uploadNodeInfos {
for _, node := range userNodes {
if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID {
continue
}
@@ -126,27 +129,66 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe
}
defer ipfsMutex.Unlock()

// TODO 需要支持设置节点亲和性
rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config)
rets, err := uploadAndUpdatePackage(createPkgResp.PackageID, t.objectIter, userNodes, t.nodeAffinity)
if err != nil {
return nil, err
}

return &CreateECPackageResult{
return &CreatePackageResult{
PackageID: createPkgResp.PackageID,
ObjectResults: rets,
}, nil
}

func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
// chooseUploadNode 选择一个上传文件的节点
// 1. 选择设置了亲和性的节点
// 2. 从与当前客户端相同地域的节点中随机选一个
// 3. 没有用的话从所有节点中随机选一个
func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo {
if nodeAffinity != nil {
aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity })
if ok {
return aff
}
}

sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
}

func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo {
for i := len(uploadNodes); i < extendTo; i++ {
uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))])
}

// 随机排列上传节点
rand.Shuffle(len(uploadNodes), func(i, j int) {
uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i]
})

return uploadNodes
}

func chooseRedundancy(obj *iterator.IterUploadingObject, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) (cdssdk.Redundancy, []UploadNodeInfo, error) {
// TODO 更好的算法
// uploadNodes = shuffleNodes(uploadNodes, ecRed.N)
uploadNode := chooseUploadNode(userNodes, nodeAffinity)
return cdssdk.NewRepRedundancy(), []UploadNodeInfo{uploadNode}, nil
}

func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

var uploadRets []ECObjectUploadResult
var uploadRets []ObjectUploadResult
//上传文件夹
var adds []coormq.AddECObjectInfo
var adds []coormq.AddObjectInfo
for {
objInfo, err := objectIter.MoveNext()
if err == iterator.ErrNoMoreItem {
@@ -158,8 +200,20 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje
err = func() error {
defer objInfo.File.Close()

fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec)
uploadRets = append(uploadRets, ECObjectUploadResult{
red, uploadNodes, err := chooseRedundancy(objInfo, userNodes, nodeAffinity)
if err != nil {
return fmt.Errorf("choosing redundancy: %w", err)
}

var addInfo *coormq.AddObjectInfo
switch r := red.(type) {
case *cdssdk.RepRedundancy:
addInfo, err = uploadRepObject(objInfo, uploadNodes, r)
case *cdssdk.ECRedundancy:
addInfo, err = uploadECObject(objInfo, uploadNodes, r)
}

uploadRets = append(uploadRets, ObjectUploadResult{
Info: objInfo,
Error: err,
})
@@ -167,7 +221,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje
return fmt.Errorf("uploading object: %w", err)
}

adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs))
adds = append(adds, *addInfo)
return nil
}()
if err != nil {
@@ -175,7 +229,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje
}
}

_, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil))
_, err = coorCli.UpdateECPackage(coormq.NewUpdatePackage(packageID, adds, nil))
if err != nil {
return nil, fmt.Errorf("updating package: %w", err)
}
@@ -183,16 +237,55 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje
return uploadRets, nil
}

// 上传文件
func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ecMod model.Ec) ([]string, []int64, error) {
uploadNodes = shuffleNodes(uploadNodes, ecMod.EcN)
func uploadRepObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, repRed *cdssdk.RepRedundancy) (*coormq.AddObjectInfo, error) {
clonedStrs := myio.Clone(obj.File, len(uploadNodes))

fileHashes := make([]string, len(uploadNodes))
anyErrs := make([]error, len(uploadNodes))
wg := sync.WaitGroup{}
for i := range uploadNodes {
idx := i
wg.Add(1)

go func() {
defer wg.Done()
fileHashes[idx], anyErrs[idx] = uploadFile(clonedStrs[idx], uploadNodes[idx])
}()
}

wg.Wait()

var uploadedNodeIDs []cdssdk.NodeID
var fileHash string
var errs []error
for i, e := range anyErrs {
if e != nil {
errs[i] = e
continue
}

uploadedNodeIDs = append(uploadedNodeIDs, uploadNodes[i].Node.NodeID)
fileHash = fileHashes[i]
}

if len(uploadedNodeIDs) == 0 {
return nil, fmt.Errorf("uploading file: %w", errors.Join(errs...))
}

info := coormq.NewAddObjectInfo(obj.Path, obj.Size, repRed,
[]stgmod.ObjectBlockDetail{
stgmod.NewObjectBlockDetail(0, 0, fileHash, uploadedNodeIDs, uploadedNodeIDs),
})
return &info, nil
}

rs, err := ec.NewRs(ecMod.EcK, ecMod.EcN, ecInfo.ChunkSize)
func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecRed *cdssdk.ECRedundancy) (*coormq.AddObjectInfo, error) {
rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, nil, err
return nil, err
}

outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{
outputs := myio.ChunkedSplit(obj.File, ecRed.ChunkSize, ecRed.K, myio.ChunkedSplitOption{
PaddingZeros: true,
})
var readers []io.Reader
@@ -209,17 +302,18 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI

wg := sync.WaitGroup{}

nodeIDs := make([]int64, ecMod.EcN)
fileHashes := make([]string, ecMod.EcN)
anyErrs := make([]error, ecMod.EcN)
blocks := make([]stgmod.ObjectBlockDetail, ecRed.N)
anyErrs := make([]error, ecRed.N)

for i := range encStrs {
idx := i
wg.Add(1)
nodeIDs[idx] = uploadNodes[idx].Node.NodeID
blocks[idx].Index = idx
blocks[idx].NodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID}
blocks[idx].CachedNodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID}
go func() {
defer wg.Done()
fileHashes[idx], anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx])
blocks[idx].FileHash, anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx])
}()
}

@@ -227,22 +321,110 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI

for i, e := range anyErrs {
if e != nil {
return nil, nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e)
return nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e)
}
}

return fileHashes, nodeIDs, nil
info := coormq.NewAddObjectInfo(obj.Path, obj.Size, ecRed, blocks)
return &info, nil
}

func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo {
for i := len(uploadNodes); i < extendTo; i++ {
uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))])
func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) {
// 本地有IPFS,则直接从本地IPFS上传
if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to upload file")

// 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件
fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil)
if err == nil {
return fileHash, nil

} else {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error())
}
}

// 随机排列上传节点
rand.Shuffle(len(uploadNodes), func(i, j int) {
uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i]
})
// 否则发送到agent上传
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.Node.ExternalIP
grpcPort := uploadNode.Node.ExternalGRPCPort
if uploadNode.IsSameLocation {
nodeIP = uploadNode.Node.LocalIP
grpcPort = uploadNode.Node.LocalGRPCPort

return uploadNodes
logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID)
}

fileHash, err := uploadToNode(file, nodeIP, grpcPort)
if err != nil {
return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}

return fileHash, nil
}

func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) {
rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return "", fmt.Errorf("new agent rpc client: %w", err)
}
defer rpcCli.Close()

return rpcCli.SendIPFSFile(file)
}

func uploadToLocalIPFS(file io.Reader, nodeID cdssdk.NodeID, shouldPin bool) (string, error) {
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return "", fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

// 从本地IPFS上传文件
fileHash, err := ipfsCli.CreateFile(file)
if err != nil {
return "", fmt.Errorf("creating ipfs file: %w", err)
}

if !shouldPin {
return fileHash, nil
}

err = pinIPFSFile(nodeID, fileHash)
if err != nil {
return "", err
}

return fileHash, nil
}

func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error {
agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
return fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agtCli)

// 然后让最近节点pin本地上传的文件
pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash))
if err != nil {
return fmt.Errorf("start pinning object: %w", err)
}

for {
waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("waitting pinning object: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent pinning object: %s", waitResp.Error)
}

break
}
}

return nil
}

+ 0
- 304
common/pkgs/cmd/create_rep_package.go View File

@@ -1,304 +0,0 @@
package cmd

import (
"fmt"
"io"
"math/rand"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type UploadNodeInfo struct {
Node model.Node
IsSameLocation bool
}

type CreateRepPackage struct {
userID int64
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy cdssdk.RepRedundancyInfo
nodeAffinity *int64
}

type UpdatePackageContext struct {
Distlock *distlock.Service
}

type CreateRepPackageResult struct {
PackageID int64
ObjectResults []RepObjectUploadResult
}

type RepObjectUploadResult struct {
Info *iterator.IterUploadingObject
Error error
FileHash string
ObjectID int64
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
return &CreateRepPackage{
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
nodeAffinity: nodeAffinity,
}
}

func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackageResult, error) {
defer t.objectIter.Close()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

reqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if stgglb.Local.NodeID != nil {
reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID)
}
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().CreateOne(t.bucketID, t.name).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
cdssdk.NewTypedRedundancyInfo(t.redundancy)))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}

getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil {
return nil, fmt.Errorf("getting user nodes: %w", err)
}

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}

nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
return UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID,
}
})
uploadNode := t.chooseUploadNode(nodeInfos, t.nodeAffinity)

// 防止上传的副本被清除
ipfsMutex, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode)
if err != nil {
return nil, err
}

return &CreateRepPackageResult{
PackageID: createPkgResp.PackageID,
ObjectResults: rets,
}, nil
}

func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo) ([]RepObjectUploadResult, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

var uploadRets []RepObjectUploadResult
var adds []coormq.AddRepObjectInfo
for {
objInfo, err := objectIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
return nil, fmt.Errorf("reading object: %w", err)
}

err = func() error {
defer objInfo.File.Close()
fileHash, err := uploadFile(objInfo.File, uploadNode)
uploadRets = append(uploadRets, RepObjectUploadResult{
Info: objInfo,
Error: err,
FileHash: fileHash,
})
if err != nil {
return fmt.Errorf("uploading object: %w", err)
}

adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID}))
return nil
}()
if err != nil {
return nil, err
}
}

_, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil))
if err != nil {
return nil, fmt.Errorf("updating package: %w", err)
}

return uploadRets, nil
}

// 上传文件
func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) {
// 本地有IPFS,则直接从本地IPFS上传
if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to upload file")

// 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件
fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil)
if err == nil {
return fileHash, nil

} else {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error())
}
}

// 否则发送到agent上传
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.Node.ExternalIP
grpcPort := uploadNode.Node.ExternalGRPCPort
if uploadNode.IsSameLocation {
nodeIP = uploadNode.Node.LocalIP
grpcPort = uploadNode.Node.LocalGRPCPort

logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID)
}

fileHash, err := uploadToNode(file, nodeIP, grpcPort)
if err != nil {
return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}

return fileHash, nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 选择设置了亲和性的节点
// 2. 从与当前客户端相同地域的节点中随机选一个
// 3. 没有用的话从所有节点中随机选一个
func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *int64) UploadNodeInfo {
if nodeAffinity != nil {
aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity })
if ok {
return aff
}
}

sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
}

func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) {
rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return "", fmt.Errorf("new agent rpc client: %w", err)
}
defer rpcCli.Close()

return rpcCli.SendIPFSFile(file)
}

func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, error) {
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return "", fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

// 从本地IPFS上传文件
fileHash, err := ipfsCli.CreateFile(file)
if err != nil {
return "", fmt.Errorf("creating ipfs file: %w", err)
}

if !shouldPin {
return fileHash, nil
}

err = pinIPFSFile(nodeID, fileHash)
if err != nil {
return "", err
}

return fileHash, nil
}

func pinIPFSFile(nodeID int64, fileHash string) error {
agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
return fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agtCli)

// 然后让最近节点pin本地上传的文件
pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash))
if err != nil {
return fmt.Errorf("start pinning object: %w", err)
}

for {
waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("waitting pinning object: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent pinning object: %s", waitResp.Error)
}

break
}
}

return nil
}

+ 9
- 75
common/pkgs/cmd/download_package.go View File

@@ -10,14 +10,13 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/distlock"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type DownloadPackage struct {
userID int64
packageID int64
userID cdssdk.UserID
packageID cdssdk.PackageID
outputPath string
}

@@ -25,7 +24,7 @@ type DownloadPackageContext struct {
Distlock *distlock.Service
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
func NewDownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *DownloadPackage {
return &DownloadPackage{
userID: userID,
packageID: packageID,
@@ -40,85 +39,20 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error {
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
if err != nil {

return fmt.Errorf("getting package: %w", err)
}

var objIter iterator.DownloadingObjectIterator
if getPkgResp.Redundancy.IsRepInfo() {
objIter, err = t.downloadRep(ctx)
} else {
objIter, err = t.downloadEC(ctx, getPkgResp.Package)
return fmt.Errorf("getting package object details: %w", err)
}
if err != nil {
return err
}
defer objIter.Close()

return t.writeObject(objIter)
}

func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID))
if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
}

iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, &iterator.DownloadContext{
Distlock: ctx.Distlock,
})

return iter, nil
}

func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID))
if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{
objIter := iterator.NewObjectIterator(getObjectDetails.Objects, &iterator.DownloadContext{
Distlock: ctx.Distlock,
})
defer objIter.Close()

return iter, nil
return t.writeObjects(objIter)
}

func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator) error {
func (t *DownloadPackage) writeObjects(objIter iterator.DownloadingObjectIterator) error {
for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {


+ 18
- 33
common/pkgs/cmd/update_ec_package.go View File

@@ -14,25 +14,30 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type UpdateECPackage struct {
userID int64
packageID int64
type UpdatePackage struct {
userID cdssdk.UserID
packageID cdssdk.PackageID
objectIter iterator.UploadingObjectIterator
}

type UpdateECPackageResult struct {
ObjectResults []ECObjectUploadResult
type UpdatePackageResult struct {
ObjectResults []ObjectUploadResult
}

func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) *UpdateECPackage {
return &UpdateECPackage{
type UpdateNodeInfo struct {
UploadNodeInfo
HasOldObject bool
}

func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) *UpdatePackage {
return &UpdatePackage{
userID: userID,
packageID: packageID,
objectIter: objIter,
}
}

func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageResult, error) {
func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult, error) {
defer t.objectIter.Close()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
@@ -58,45 +63,25 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe
}
defer mutex.Unlock()

getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil {
return nil, fmt.Errorf("getting package: %w", err)
}

getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil {
return nil, fmt.Errorf("getting user nodes: %w", err)
}

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}

nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
return UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})

var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = getPkgResp.Package.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

// 给上传节点的IPFS加锁
ipfsReqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if stgglb.Local.NodeID != nil {
ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID)
}
for _, node := range nodeInfos {
for _, node := range userNodes {
if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID {
continue
}
@@ -110,12 +95,12 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdateECPackage(t.packageID, t.objectIter, nodeInfos, ecInfo, getECResp.Config)
rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, nil)
if err != nil {
return nil, err
}

return &UpdateECPackageResult{
return &UpdatePackageResult{
ObjectResults: rets,
}, nil
}

+ 0
- 128
common/pkgs/cmd/update_rep_package.go View File

@@ -1,128 +0,0 @@
package cmd

import (
"fmt"

"github.com/samber/lo"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type UpdateRepPackage struct {
userID int64
packageID int64
objectIter iterator.UploadingObjectIterator
}

type UpdateNodeInfo struct {
UploadNodeInfo
HasOldObject bool
}

type UpdateRepPackageResult struct {
ObjectResults []RepObjectUploadResult
}

func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage {
return &UpdateRepPackage{
userID: userID,
packageID: packageID,
objectIter: objectIter,
}
}

func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackageResult, error) {
defer t.objectIter.Close()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

reqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if stgglb.Local.NodeID != nil {
reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID)
}
mutex, err := reqBlder.
Metadata().
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().WriteOne(t.packageID).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil {
return nil, fmt.Errorf("getting user nodes: %w", err)
}

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}

nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UpdateNodeInfo {
return UpdateNodeInfo{
UploadNodeInfo: UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID,
},
}
})
// 上传文件的方式优先级:
// 1. 本地IPFS
// 2. 包含了旧文件,且与客户端在同地域的节点
// 3. 不在同地域,但包含了旧文件的节点
// 4. 同地域节点
// TODO 需要考虑在多文件的情况下的规则
uploadNode := t.chooseUploadNode(nodeInfos)

// 防止上传的副本被清除
ipfsMutex, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo)
if err != nil {
return nil, err
}

return &UpdateRepPackageResult{
ObjectResults: rets,
}, nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UpdateRepPackage) chooseUploadNode(nodes []UpdateNodeInfo) UpdateNodeInfo {
mysort.Sort(nodes, func(left, right UpdateNodeInfo) int {
v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
if v != 0 {
return v
}

return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
})

return nodes[0]
}

+ 0
- 30
common/pkgs/db/ec.go View File

@@ -1,30 +0,0 @@
package db

import (
//"database/sql"

"github.com/jmoiron/sqlx"
//"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type EcDB struct {
*DB
}

func (db *DB) Ec() *EcDB {
return &EcDB{DB: db}
}

// GetEc 查询纠删码参数
func (db *EcDB) GetEc(ctx SQLContext, ecName string) (model.Ec, error) {
var ret model.Ec
err := sqlx.Get(ctx, &ret, "select * from Ec where Name = ?", ecName)
return ret, err
}

func (db *EcDB) GetEcName(ctx SQLContext, objectID int) (string, error) {
var ret string
err := sqlx.Get(ctx, &ret, "select Redundancy from Object where ObjectID = ?")
return ret, err
}

+ 51
- 52
common/pkgs/db/model/model.go View File

@@ -4,28 +4,29 @@ import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里

type Node struct {
NodeID int64 `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
LocalIP string `db:"LocalIP" json:"localIP"`
ExternalIP string `db:"ExternalIP" json:"externalIP"`
LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"`
ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"`
LocationID int64 `db:"LocationID" json:"locationID"`
State string `db:"State" json:"state"`
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
LocalIP string `db:"LocalIP" json:"localIP"`
ExternalIP string `db:"ExternalIP" json:"externalIP"`
LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"`
ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"`
LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"`
State string `db:"State" json:"state"`
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
}

type Storage struct {
StorageID int64 `db:"StorageID" json:"storageID"`
Name string `db:"Name" json:"name"`
NodeID int64 `db:"NodeID" json:"nodeID"`
Directory string `db:"Directory" json:"directory"`
State string `db:"State" json:"state"`
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
Name string `db:"Name" json:"name"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
Directory string `db:"Directory" json:"directory"`
State string `db:"State" json:"state"`
}

type NodeDelay struct {
@@ -35,69 +36,67 @@ type NodeDelay struct {
}

type User struct {
UserID int64 `db:"UserID" json:"userID"`
Password string `db:"PassWord" json:"password"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
Password string `db:"PassWord" json:"password"`
}

type UserBucket struct {
UserID int64 `db:"UserID" json:"userID"`
BucketID int64 `db:"BucketID" json:"bucketID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"`
}

type UserNode struct {
UserID int64 `db:"UserID" json:"userID"`
NodeID int64 `db:"NodeID" json:"nodeID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
}

type UserStorage struct {
UserID int64 `db:"UserID" json:"userID"`
StorageID int64 `db:"StorageID" json:"storageID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
}

type Bucket struct {
BucketID int64 `db:"BucketID" json:"bucketID"`
Name string `db:"Name" json:"name"`
CreatorID int64 `db:"CreatorID" json:"creatorID"`
BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"`
Name string `db:"Name" json:"name"`
CreatorID cdssdk.UserID `db:"CreatorID" json:"creatorID"`
}

type Package = cdssdk.Package

type Object = cdssdk.Object

type ObjectRep struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
FileHash string `db:"FileHash" json:"fileHash"`
}

type ObjectBlock struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
Index int `db:"Index" json:"index"`
FileHash string `db:"FileHash" json:"fileHash"`
}
type ObjectBlock = stgmod.ObjectBlock

type Cache struct {
FileHash string `db:"FileHash" json:"fileHash"`
NodeID int64 `db:"NodeID" json:"nodeID"`
State string `db:"State" json:"state"`
CacheTime time.Time `db:"CacheTime" json:"cacheTime"`
Priority int `db:"Priority" json:"priority"`
FileHash string `db:"FileHash" json:"fileHash"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
State string `db:"State" json:"state"`
CacheTime time.Time `db:"CacheTime" json:"cacheTime"`
Priority int `db:"Priority" json:"priority"`
}

const (
StoragePackageStateNormal = "Normal"
StoragePackageStateDeleted = "Deleted"
StoragePackageStateOutdated = "Outdated"
)

// Storage当前加载的Package
type StoragePackage struct {
PackageID int64 `db:"PackageID" json:"packageID"`
StorageID int64 `db:"StorageID" json:"storageID"`
UserID int64 `db:"UserID" json:"userID"`
State string `db:"State" json:"state"`
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
State string `db:"State" json:"state"`
}

type Location struct {
LocationID int64 `db:"LocationID" json:"locationID"`
Name string `db:"Name" json:"name"`
type StoragePackageLog struct {
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
CreateTime time.Time `db:"CreateTime" json:"createTime"`
}

type Ec struct {
EcID int `db:"EcID" json:"ecID"`
Name string `db:"Name" json:"name"`
EcK int `db:"EcK" json:"ecK"`
EcN int `db:"EcN" json:"ecN"`
type Location struct {
LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"`
Name string `db:"Name" json:"name"`
}

+ 32
- 170
common/pkgs/db/object.go View File

@@ -4,7 +4,7 @@ import (
"fmt"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@@ -17,15 +17,16 @@ func (db *DB) Object() *ObjectDB {
return &ObjectDB{DB: db}
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID int64) (model.Object, error) {
func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
var ret model.Object
err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID)
return ret, err
}

func (db *ObjectDB) Create(ctx SQLContext, packageID int64, path string, size int64) (int64, error) {
sql := "insert into Object(PackageID, Path, Size) values(?,?,?)"
ret, err := ctx.Exec(sql, packageID, path, size)
func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (int64, error) {
sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?)"

ret, err := ctx.Exec(sql, packageID, path, size, redundancy)
if err != nil {
return 0, fmt.Errorf("insert object failed, err: %w", err)
}
@@ -39,9 +40,10 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID int64, path string, size in
}

// 创建或者更新记录,返回值true代表是创建,false代表是更新
func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string, size int64) (int64, bool, error) {
sql := "insert into Object(PackageID, Path, Size) values(?,?,?) on duplicate key update Size = ?"
ret, err := ctx.Exec(sql, packageID, path, size, size)
func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (cdssdk.ObjectID, bool, error) {
sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?) on duplicate key update Size = ?, Redundancy = ?"

ret, err := ctx.Exec(sql, packageID, path, size, redundancy, size, redundancy)
if err != nil {
return 0, false, fmt.Errorf("insert object failed, err: %w", err)
}
@@ -57,10 +59,10 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string,
if err != nil {
return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err)
}
return objectID, true, nil
return cdssdk.ObjectID(objectID), true, nil
}

var objID int64
var objID cdssdk.ObjectID
if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil {
return 0, false, fmt.Errorf("getting object id: %w", err)
}
@@ -68,65 +70,7 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string,
return objID, false, nil
}

func (db *ObjectDB) UpdateRepObject(ctx SQLContext, objectID int64, fileSize int64, nodeIDs []int64, fileHash string) error {
_, err := db.UpdateFileInfo(ctx, objectID, fileSize)
if err != nil {
if err != nil {
return fmt.Errorf("update rep object failed, err: %w", err)
}
}

objRep, err := db.ObjectRep().GetByID(ctx, objectID)
if err != nil {
return fmt.Errorf("get object rep failed, err: %w", err)
}

// 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录
if objRep.FileHash != fileHash {
_, err := db.ObjectRep().Update(ctx, objectID, fileHash)
if err != nil {
return fmt.Errorf("update rep object file hash failed, err: %w", err)
}

for _, nodeID := range nodeIDs {
err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority = 0
if err != nil {
return fmt.Errorf("create cache failed, err: %w", err)
}
}

} else {
// 如果相同,则只增加Cache中不存在的记录
cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, fileHash)
if err != nil {
return fmt.Errorf("find caching file nodes failed, err: %w", err)
}

// 筛选出不在cachedNodes中的id
newNodeIDs := lo.Filter(nodeIDs, func(id int64, index int) bool {
return lo.NoneBy(cachedNodes, func(node model.Node) bool {
return node.NodeID == id
})
})
for _, nodeID := range newNodeIDs {
err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority
if err != nil {
return fmt.Errorf("create cache failed, err: %w", err)
}
}
}

return nil
}

func (*ObjectDB) BatchGetAllEcObjectIDs(ctx SQLContext, start int, count int) ([]int64, error) {
var ret []int64
rep := "rep"
err := sqlx.Select(ctx, &ret, "SELECT ObjectID FROM object where Redundancy != ? limit ?, ?", rep, start, count)
return ret, err
}

func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64) (bool, error) {
func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) {
ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID)
if err != nil {
return false, err
@@ -140,100 +84,17 @@ func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64)
return cnt > 0, nil
}

func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID int64) ([]model.Object, error) {
func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
var ret []model.Object
err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
return ret, err
}

func (db *ObjectDB) BatchAddRep(ctx SQLContext, packageID int64, objs []coormq.AddRepObjectInfo) ([]int64, error) {
var objIDs []int64
for _, obj := range objs {
// 创建对象的记录
objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size)
if err != nil {
return nil, fmt.Errorf("creating object: %w", err)
}

objIDs = append(objIDs, objID)

if isCreate {
if err := db.createRep(ctx, objID, obj); err != nil {
return nil, err
}
} else {
if err := db.updateRep(ctx, objID, obj); err != nil {
return nil, err
}
}
}

return objIDs, nil
}

func (db *ObjectDB) createRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error {
// 创建对象副本的记录
if err := db.ObjectRep().Create(ctx, objID, obj.FileHash); err != nil {
return fmt.Errorf("creating object rep: %w", err)
}

// 创建缓存记录
priority := 0 //优先级暂时设置为0
for _, nodeID := range obj.NodeIDs {
if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, priority); err != nil {
return fmt.Errorf("creating cache: %w", err)
}
}

return nil
}
func (db *ObjectDB) updateRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error {
objRep, err := db.ObjectRep().GetByID(ctx, objID)
if err != nil {
return fmt.Errorf("getting object rep: %w", err)
}

// 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录
if objRep.FileHash != obj.FileHash {
_, err := db.ObjectRep().Update(ctx, objID, obj.FileHash)
if err != nil {
return fmt.Errorf("updating rep object file hash: %w", err)
}

for _, nodeID := range obj.NodeIDs {
if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil {
return fmt.Errorf("creating cache: %w", err)
}
}

} else {
// 如果相同,则只增加Cache中不存在的记录
cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, obj.FileHash)
if err != nil {
return fmt.Errorf("finding caching file nodes: %w", err)
}

// 筛选出不在cachedNodes中的id
newNodeIDs := lo.Filter(obj.NodeIDs, func(id int64, index int) bool {
return lo.NoneBy(cachedNodes, func(node model.Node) bool {
return node.NodeID == id
})
})
for _, nodeID := range newNodeIDs {
if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil {
return fmt.Errorf("creating cache: %w", err)
}
}
}

return nil
}

func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.AddECObjectInfo) ([]int64, error) {
objIDs := make([]int64, 0, len(objs))
func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectInfo) ([]cdssdk.ObjectID, error) {
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
for _, obj := range objs {
// 创建对象的记录
objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size)
objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.Redundancy)
if err != nil {
return nil, fmt.Errorf("creating object: %w", err)
}
@@ -245,23 +106,24 @@ func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.Ad
if err = db.ObjectBlock().DeleteObjectAll(ctx, objID); err != nil {
return nil, fmt.Errorf("deleting all object block: %w", err)
}

}

// 创建编码块的记录
for i := 0; i < len(obj.FileHashes); i++ {
err := db.ObjectBlock().Create(ctx, objID, i, obj.FileHashes[i])
if err != nil {
return nil, fmt.Errorf("creating object block: %w", err)
for _, b := range obj.Blocks {
for _, n := range b.NodeIDs {
err := db.ObjectBlock().Create(ctx, objID, b.Index, b.FileHash, n)
if err != nil {
return nil, fmt.Errorf("creating object block: %w", err)
}
}
}

// 创建缓存记录
priority := 0 //优先级暂时设置为0
for i, nodeID := range obj.NodeIDs {
err = db.Cache().CreatePinned(ctx, obj.FileHashes[i], nodeID, priority)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
// 创建缓存记录
priority := 0 //优先级暂时设置为0
for _, nodeID := range b.CachedNodeIDs {
err = db.Cache().CreatePinned(ctx, b.FileHash, nodeID, priority)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
}
}
}
@@ -269,12 +131,12 @@ func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.Ad
return objIDs, nil
}

func (*ObjectDB) BatchDelete(ctx SQLContext, ids []int64) error {
func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from Object where ObjectID in (?)", ids)
return err
}

func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID int64) error {
func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete from Object where PackageID = ?", packageID)
return err
}

+ 24
- 17
common/pkgs/db/object_block.go View File

@@ -8,7 +8,6 @@ import (

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)
@@ -42,7 +41,7 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in
"select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and"+
" ObjectBlock.ObjectID = Object.ObjectID and"+
" Object.PackageID = Package.PackageID and"+
" Package.State = ?", fileHash, consts.PackageStateNormal)
" Package.State = ?", fileHash, cdssdk.PackageStateNormal)
if err == sql.ErrNoRows {
return 0, nil
}
@@ -50,47 +49,49 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in
return cnt, err
}

func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectECData, error) {
func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []model.Object
err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
if err != nil {
return nil, fmt.Errorf("query objectIDs: %w", err)
}

rets := make([]stgmod.ObjectECData, 0, len(objs))
rets := make([]stgmod.ObjectDetail, 0, len(objs))

for _, obj := range objs {
var tmpRets []struct {
Index int `db:"Index"`
FileHash string `db:"FileHash"`
NodeIDs *string `db:"NodeIDs"`
Index int `db:"Index"`
FileHashes string `db:"FileHashes"`
NodeIDs string `db:"NodeIDs"`
CachedNodeIDs *string `db:"CachedNodeIDs"`
}

err := sqlx.Select(ctx,
&tmpRets,
"select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock"+
" left join Cache on ObjectBlock.FileHash = Cache.FileHash"+
" where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash",
"select ObjectBlock.Index, group_concat(distinct ObjectBlock.FileHash) as FileHashes, group_concat(distinct ObjectBlock.NodeID) as NodeIDs, group_concat(distinct Cache.NodeID) as CachedNodeIDs"+
" from ObjectBlock left join Cache on ObjectBlock.FileHash = Cache.FileHash"+
" where ObjectID = ? group by ObjectBlock.Index",
obj.ObjectID,
)
if err != nil {
return nil, err
}

blocks := make([]stgmod.ObjectBlockData, 0, len(tmpRets))
blocks := make([]stgmod.ObjectBlockDetail, 0, len(tmpRets))
for _, tmp := range tmpRets {
var block stgmod.ObjectBlockData
var block stgmod.ObjectBlockDetail
block.Index = tmp.Index
block.FileHash = tmp.FileHash

if tmp.NodeIDs != nil {
block.CachedNodeIDs = splitIDStringUnsafe(*tmp.NodeIDs)
block.FileHash = splitConcatedToString(tmp.FileHashes)[0]
block.NodeIDs = splitConcatedToNodeID(tmp.NodeIDs)
if tmp.CachedNodeIDs != nil {
block.CachedNodeIDs = splitConcatedToNodeID(*tmp.CachedNodeIDs)
}

blocks = append(blocks, block)
}

rets = append(rets, stgmod.NewObjectECData(obj, blocks))
rets = append(rets, stgmod.NewObjectDetail(obj, blocks))
}

return rets, nil
@@ -98,7 +99,7 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID cdssdk

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitIDStringUnsafe(idStr string) []cdssdk.NodeID {
func splitConcatedToNodeID(idStr string) []cdssdk.NodeID {
idStrs := strings.Split(idStr, ",")
ids := make([]cdssdk.NodeID, 0, len(idStrs))

@@ -110,3 +111,9 @@ func splitIDStringUnsafe(idStr string) []cdssdk.NodeID {

return ids
}

// 按逗号切割字符串
func splitConcatedToString(idStr string) []string {
idStrs := strings.Split(idStr, ",")
return idStrs
}

+ 4
- 5
common/pkgs/db/package.go View File

@@ -8,7 +8,6 @@ import (
"github.com/jmoiron/sqlx"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

@@ -92,7 +91,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin
}

sql := "insert into Package(Name, BucketID, State) values(?,?,?)"
r, err := ctx.Exec(sql, name, bucketID, consts.PackageStateNormal)
r, err := ctx.Exec(sql, name, bucketID, cdssdk.PackageStateNormal)
if err != nil {
return 0, fmt.Errorf("insert package failed, err: %w", err)
}
@@ -114,11 +113,11 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID cdssdk.PackageID) erro

// 不是正常状态的Package,则不删除
// TODO 未来可能有其他状态
if obj.State != consts.PackageStateNormal {
if obj.State != cdssdk.PackageStateNormal {
return nil
}

err = db.ChangeState(ctx, packageID, consts.PackageStateDeleted)
err = db.ChangeState(ctx, packageID, cdssdk.PackageStateDeleted)
if err != nil {
return fmt.Errorf("change package state failed, err: %w", err)
}
@@ -145,7 +144,7 @@ func (PackageDB) DeleteUnused(ctx SQLContext, packageID cdssdk.PackageID) error
_, err := ctx.Exec("delete from Package where PackageID = ? and State = ? and "+
"not exists(select StorageID from StoragePackage where PackageID = ?)",
packageID,
consts.PackageStateDeleted,
cdssdk.PackageStateDeleted,
packageID,
)



+ 6
- 7
common/pkgs/db/storage_package.go View File

@@ -5,7 +5,6 @@ import (

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

@@ -36,7 +35,7 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.Stor
}

func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal)
_, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, model.StoragePackageStateNormal)
return err
}

@@ -48,11 +47,11 @@ func (*StoragePackageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID,
// SetStateNormal 将状态设置为Normal,如果记录状态是Deleted,则不进行操作
func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ? and State <> ?",
consts.StoragePackageStateNormal,
model.StoragePackageStateNormal,
storageID,
packageID,
userID,
consts.StoragePackageStateDeleted,
model.StoragePackageStateDeleted,
)
return err
}
@@ -80,8 +79,8 @@ func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID cdssdk.Pac
func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) {
ret, err := ctx.Exec(
"update StoragePackage set State = ? where State = ? and PackageID = ?",
consts.StoragePackageStateOutdated,
consts.StoragePackageStateNormal,
model.StoragePackageStateOutdated,
model.StoragePackageStateNormal,
packageID,
)
if err != nil {
@@ -97,7 +96,7 @@ func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk.
}

func (db *StoragePackageDB) SetAllPackageDeleted(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) {
return db.SetAllPackageState(ctx, packageID, consts.StoragePackageStateDeleted)
return db.SetAllPackageState(ctx, packageID, model.StoragePackageStateDeleted)
}

func (*StoragePackageDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {


+ 34
- 0
common/pkgs/db/storage_package_log.go View File

@@ -0,0 +1,34 @@
package db

import (
"time"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type StoragePackageLogDB struct {
*DB
}

func (db *DB) StoragePackageLog() *StoragePackageLogDB {
return &StoragePackageLogDB{DB: db}
}

func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackage, error) {
var ret model.StoragePackage
err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return ret, err
}

func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) (model.StoragePackage, error) {
var ret model.StoragePackage
err := sqlx.Get(ctx, &ret, "insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime)
return ret, err
}

func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("delete from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return err
}

+ 216
- 74
common/pkgs/iterator/ec_object_iterator.go View File

@@ -4,62 +4,62 @@ import (
"fmt"
"io"
"math/rand"
"reflect"

"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

myio "gitlink.org.cn/cloudream/common/utils/io"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type ECObjectIterator struct {
type DownloadingObjectIterator = Iterator[*IterDownloadingObject]

type IterDownloadingObject struct {
Object model.Object
File io.ReadCloser
}

type DownloadNodeInfo struct {
Node model.Node
IsSameLocation bool
}

type DownloadContext struct {
Distlock *distlock.Service
}
type ObjectIterator struct {
OnClosing func()

objects []model.Object
objectECData []stgmodels.ObjectECData
currentIndex int
inited bool
objectDetails []stgmodels.ObjectDetail
currentIndex int

ecInfo cdssdk.ECRedundancyInfo
ec model.Ec
downloadCtx *DownloadContext
cliLocation model.Location
}

func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator {
return &ECObjectIterator{
objects: objects,
objectECData: objectECData,
ecInfo: ecInfo,
ec: ec,
downloadCtx: downloadCtx,
func NewObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *ObjectIterator {
return &ObjectIterator{
objectDetails: objectDetails,
downloadCtx: downloadCtx,
}
}

func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
// TODO 加锁
func (i *ObjectIterator) MoveNext() (*IterDownloadingObject, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

if !i.inited {
i.inited = true

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}
i.cliLocation = findCliLocResp.Location
}

if i.currentIndex >= len(i.objects) {
if i.currentIndex >= len(i.objectDetails) {
return nil, ErrNoMoreItem
}

@@ -68,25 +68,63 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
return item, err
}

func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
obj := iter.objects[iter.currentIndex]
ecData := iter.objectECData[iter.currentIndex]
func (iter *ObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
obj := iter.objectDetails[iter.currentIndex]

//采取直接读,优先选内网节点
var chosenNodes []DownloadNodeInfo
var chosenBlocks []stgmodels.ObjectBlockData
for i := range ecData.Blocks {
if len(chosenBlocks) == iter.ec.EcK {
break
switch red := obj.Object.Redundancy.(type) {
case *cdssdk.RepRedundancy:
reader, err := iter.downloadRepObject(coorCli, iter.downloadCtx, obj, red)
if err != nil {
return nil, fmt.Errorf("downloading rep object: %w", err)
}

// 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行
return &IterDownloadingObject{
Object: obj.Object,
File: reader,
}, nil

if len(ecData.Blocks[i].NodeIDs) == 0 {
continue
case *cdssdk.ECRedundancy:
reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red)
if err != nil {
return nil, fmt.Errorf("downloading ec object: %w", err)
}

getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(ecData.Blocks[i].NodeIDs))
return &IterDownloadingObject{
Object: obj.Object,
File: reader,
}, nil
}

return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
}

func (i *ObjectIterator) Close() {
if i.OnClosing != nil {
i.OnClosing()
}
}

// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (i *ObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
}

return entries[rand.Intn(len(entries))]
}

func (iter *ObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) {
//采取直接读,优先选内网节点
var chosenNodes []DownloadNodeInfo
for i := range obj.Blocks {
if len(obj.Blocks[i].CachedNodeIDs) == 0 {
return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index)
}

getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
if err != nil {
continue
}
@@ -94,58 +132,78 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb
downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
return DownloadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == iter.cliLocation.LocationID,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})

chosenBlocks = append(chosenBlocks, ecData.Blocks[i])
chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))

}

if len(chosenBlocks) < iter.ec.EcK {
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", iter.ec.EcK, len(chosenBlocks))
}
var fileStrs []io.ReadCloser

reader, err := iter.downloadEcObject(iter.downloadCtx, obj.Size, chosenNodes, chosenBlocks)
if err != nil {
return nil, fmt.Errorf("ec read failed, err: %w", err)
for i := range obj.Blocks {
str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, fmt.Errorf("donwloading file: %w", err)
}

fileStrs = append(fileStrs, str)
}

return &IterDownloadingObject{
Object: obj,
File: reader,
}, nil
fileReaders, filesCloser := myio.ToReaders(fileStrs)
return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
}), nil
}

func (i *ECObjectIterator) Close() {
if i.OnClosing != nil {
i.OnClosing()
}
}
func (iter *ObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
//采取直接读,优先选内网节点
var chosenNodes []DownloadNodeInfo
var chosenBlocks []stgmodels.ObjectBlockDetail
for i := range obj.Blocks {
if len(chosenBlocks) == ecRed.K {
break
}

// 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行

if len(obj.Blocks[i].CachedNodeIDs) == 0 {
continue
}

getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
if err != nil {
continue
}

downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
return DownloadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})

chosenBlocks = append(chosenBlocks, obj.Blocks[i])
chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))

// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
}

return entries[rand.Intn(len(entries))]
}
if len(chosenBlocks) < ecRed.K {
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks))
}

func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize int64, nodes []DownloadNodeInfo, blocks []stgmodels.ObjectBlockData) (io.ReadCloser, error) {
var fileStrs []io.ReadCloser

rs, err := ec.NewRs(iter.ec.EcK, iter.ec.EcN, iter.ecInfo.ChunkSize)
rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, fmt.Errorf("new rs: %w", err)
}

for i := range blocks {
str, err := downloadFile(ctx, nodes[i], blocks[i].FileHash)
for i := range chosenBlocks {
str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
@@ -159,13 +217,97 @@ func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize in
fileReaders, filesCloser := myio.ToReaders(fileStrs)

var indexes []int
for _, b := range blocks {
for _, b := range chosenBlocks {
indexes = append(indexes, b.Index)
}

outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(iter.ecInfo.ChunkSize)), fileSize), func(c io.ReadCloser) {
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), nil
}

func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.Node.ExternalIP
grpcPort := node.Node.ExternalGRPCPort
if node.IsSameLocation {
nodeIP = node.Node.LocalIP
grpcPort = node.Node.LocalGRPCPort

logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID)
}

if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file")

reader, err := downloadFromLocalIPFS(ctx, fileHash)
if err == nil {
return reader, nil
}

logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}

return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash)
}

func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

// 连接grpc
agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return nil, fmt.Errorf("new agent grpc client: %w", err)
}

reader, err := agtCli.GetIPFSFile(fileHash)
if err != nil {
return nil, fmt.Errorf("getting ipfs file: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
mutex.Unlock()
})
return reader, nil
}

func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
onClosed := func() {}
if stgglb.Local.NodeID != nil {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
onClosed = func() {
mutex.Unlock()
}
}

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}

reader, err := ipfsCli.OpenRead(fileHash)
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
onClosed()
})
return reader, nil
}

+ 0
- 212
common/pkgs/iterator/rep_object_iterator.go View File

@@ -1,212 +0,0 @@
package iterator

import (
"fmt"
"io"
"math/rand"

"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type DownloadingObjectIterator = Iterator[*IterDownloadingObject]

type RepObjectIterator struct {
OnClosing func()

objects []model.Object
objectRepData []stgmod.ObjectRepData
currentIndex int
inited bool

downloadCtx *DownloadContext
cliLocation model.Location
}

type IterDownloadingObject struct {
Object model.Object
File io.ReadCloser
}

type DownloadNodeInfo struct {
Node model.Node
IsSameLocation bool
}

type DownloadContext struct {
Distlock *distlock.Service
}

func NewRepObjectIterator(objects []model.Object, objectRepData []stgmod.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator {
return &RepObjectIterator{
objects: objects,
objectRepData: objectRepData,
downloadCtx: downloadCtx,
}
}

func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) {
// TODO 加锁
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

if !i.inited {
i.inited = true

findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
if err != nil {
return nil, fmt.Errorf("finding client location: %w", err)
}
i.cliLocation = findCliLocResp.Location
}

if i.currentIndex >= len(i.objects) {
return nil, ErrNoMoreItem
}

item, err := i.doMove(coorCli)
i.currentIndex++
return item, err
}

func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
repData := i.objectRepData[i.currentIndex]
if len(repData.NodeIDs) == 0 {
return nil, fmt.Errorf("no node has this file %s", repData.FileHash)
}

getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs))
if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err)
}

downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
return DownloadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == i.cliLocation.LocationID,
}
})

reader, err := downloadFile(i.downloadCtx, i.chooseDownloadNode(downloadNodes), repData.FileHash)
if err != nil {
return nil, fmt.Errorf("rep read failed, err: %w", err)
}
return &IterDownloadingObject{
Object: i.objects[i.currentIndex],
File: reader,
}, nil
}

func (i *RepObjectIterator) Close() {
if i.OnClosing != nil {
i.OnClosing()
}
}

// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
}

return entries[rand.Intn(len(entries))]
}

func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.Node.ExternalIP
grpcPort := node.Node.ExternalGRPCPort
if node.IsSameLocation {
nodeIP = node.Node.LocalIP
grpcPort = node.Node.LocalGRPCPort

logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID)
}

if stgglb.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file")

reader, err := downloadFromLocalIPFS(ctx, fileHash)
if err == nil {
return reader, nil
}

logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}

return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash)
}

func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

// 连接grpc
agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
if err != nil {
return nil, fmt.Errorf("new agent grpc client: %w", err)
}

reader, err := agtCli.GetIPFSFile(fileHash)
if err != nil {
return nil, fmt.Errorf("getting ipfs file: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
mutex.Unlock()
})
return reader, nil
}

func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
onClosed := func() {}
if stgglb.Local.NodeID != nil {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
onClosed = func() {
mutex.Unlock()
}
}

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}

reader, err := ipfsCli.OpenRead(fileHash)
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
onClosed()
})
return reader, nil
}

+ 0
- 64
common/pkgs/mq/coordinator/common.go View File

@@ -1,64 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type CommonService interface {
FindClientLocation(msg *FindClientLocation) (*FindClientLocationResp, *mq.CodeMessage)

GetECConfig(msg *GetECConfig) (*GetECConfigResp, *mq.CodeMessage)
}

// 查询指定IP所属的地域
var _ = Register(Service.FindClientLocation)

type FindClientLocation struct {
mq.MessageBodyBase
IP string `json:"ip"`
}
type FindClientLocationResp struct {
mq.MessageBodyBase
Location model.Location `json:"location"`
}

func NewFindClientLocation(ip string) *FindClientLocation {
return &FindClientLocation{
IP: ip,
}
}
func NewFindClientLocationResp(location model.Location) *FindClientLocationResp {
return &FindClientLocationResp{
Location: location,
}
}
func (client *Client) FindClientLocation(msg *FindClientLocation) (*FindClientLocationResp, error) {
return mq.Request(Service.FindClientLocation, client.rabbitCli, msg)
}

// 获取EC具体配置
var _ = Register(Service.GetECConfig)

type GetECConfig struct {
mq.MessageBodyBase
ECName string `json:"ecName"`
}
type GetECConfigResp struct {
mq.MessageBodyBase
Config model.Ec `json:"config"`
}

func NewGetECConfig(ecName string) *GetECConfig {
return &GetECConfig{
ECName: ecName,
}
}
func NewGetECConfigResp(config model.Ec) *GetECConfigResp {
return &GetECConfigResp{
Config: config,
}
}
func (client *Client) GetECConfig(msg *GetECConfig) (*GetECConfigResp, error) {
return mq.Request(Service.GetECConfig, client.rabbitCli, msg)
}

+ 13
- 13
common/pkgs/mq/coordinator/object.go View File

@@ -11,7 +11,7 @@ import (
type ObjectService interface {
GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage)
GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
@@ -42,28 +42,28 @@ func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObje
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 获取指定Object的EC数据,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjectECData)
// 获取Package中所有Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjectDetails)

type GetPackageObjectECData struct {
type GetPackageObjectDetails struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
}
type GetPackageObjectECDataResp struct {
type GetPackageObjectDetailsResp struct {
mq.MessageBodyBase
Data []stgmod.ObjectECData `json:"data"`
Objects []stgmod.ObjectDetail `json:"objects"`
}

func NewGetPackageObjectECData(packageID cdssdk.PackageID) *GetPackageObjectECData {
return &GetPackageObjectECData{
func NewGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails {
return &GetPackageObjectDetails{
PackageID: packageID,
}
}
func NewGetPackageObjectECDataResp(data []stgmod.ObjectECData) *GetPackageObjectECDataResp {
return &GetPackageObjectECDataResp{
Data: data,
func NewGetPackageObjectDetailsResp(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp {
return &GetPackageObjectDetailsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, error) {
return mq.Request(Service.GetPackageObjectECData, client.rabbitCli, msg)
func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, error) {
return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg)
}

+ 5
- 5
common/pkgs/mq/coordinator/package.go View File

@@ -93,10 +93,10 @@ type UpdatePackageResp struct {
mq.MessageBodyBase
}
type AddObjectInfo struct {
Path string `json:"path"`
Size int64 `json:"size,string"`
Redundancy cdssdk.Redundancy `json:"redundancy"`
Blocks []stgmod.ObjectBlockData `json:"blocks"`
Path string `json:"path"`
Size int64 `json:"size,string"`
Redundancy cdssdk.Redundancy `json:"redundancy"`
Blocks []stgmod.ObjectBlockDetail `json:"blocks"`
}

func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes []cdssdk.ObjectID) *UpdatePackage {
@@ -109,7 +109,7 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes
func NewUpdatePackageResp() *UpdatePackageResp {
return &UpdatePackageResp{}
}
func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockData) AddObjectInfo {
func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockDetail) AddObjectInfo {
return AddObjectInfo{
Path: path,
Size: size,


+ 0
- 2
common/pkgs/mq/coordinator/server.go View File

@@ -13,8 +13,6 @@ type Service interface {

CacheService

CommonService

NodeService

ObjectService


+ 4
- 2
common/utils/utils.go View File

@@ -3,9 +3,11 @@ package utils
import (
"path/filepath"
"strconv"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

// MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称
func MakeStorageLoadPackagePath(stgDir string, userID int64, packageID int64) string {
return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "packages", strconv.FormatInt(packageID, 10))
func MakeStorageLoadPackagePath(stgDir string, userID cdssdk.UserID, packageID cdssdk.PackageID) string {
return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "packages", strconv.FormatInt(int64(packageID), 10))
}

+ 0
- 30
coordinator/internal/services/conmmon.go View File

@@ -1,30 +0,0 @@
package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) {
location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP)
if err != nil {
logger.WithField("IP", msg.IP).
Warnf("finding location by external ip: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query client location failed")
}

return mq.ReplyOK(coormq.NewFindClientLocationResp(location))
}

func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) {
ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName)
if err != nil {
logger.WithField("ECName", msg.ECName).
Warnf("query ec failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query ec failed")
}

return mq.ReplyOK(coormq.NewGetECConfigResp(ec))
}

Loading…
Cancel
Save