Browse Source

Merge pull request '增加ClientPool,解除对config的依赖' (#3) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
c0938b8ae0
23 changed files with 594 additions and 407 deletions
  1. +3
    -9
      internal/cmdline/commandline.go
  2. +3
    -3
      internal/cmdline/distlock.go
  3. +19
    -10
      internal/cmdline/package.go
  4. +6
    -6
      internal/cmdline/storage.go
  5. +11
    -10
      internal/config/config.go
  6. +70
    -0
      internal/http/object.go
  7. +0
    -50
      internal/http/package.go
  8. +2
    -2
      internal/http/server.go
  9. +12
    -12
      internal/http/storage.go
  10. +34
    -9
      internal/services/bucket.go
  11. +15
    -0
      internal/services/object.go
  12. +116
    -117
      internal/services/package.go
  13. +9
    -1
      internal/services/scanner.go
  14. +5
    -14
      internal/services/service.go
  15. +14
    -8
      internal/services/storage.go
  16. +38
    -0
      internal/task/create_ec_package.go
  17. +34
    -0
      internal/task/create_rep_package.go
  18. +108
    -0
      internal/task/storage_load_package.go
  19. +0
    -101
      internal/task/storage_move_package.go
  20. +3
    -9
      internal/task/task.go
  21. +38
    -0
      internal/task/update_ec_package.go
  22. +34
    -0
      internal/task/update_rep_package.go
  23. +20
    -46
      main.go

+ 3
- 9
internal/cmdline/commandline.go View File

@@ -5,8 +5,6 @@ import (
"os"

"gitlink.org.cn/cloudream/common/pkgs/cmdtrie"
distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-client/internal/services"
)

@@ -17,16 +15,12 @@ type CommandContext struct {
var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]()

type Commandline struct {
Svc *services.Service
DistLock *distlocksvc.Service
IPFS *ipfs.IPFS
Svc *services.Service
}

func NewCommandline(svc *services.Service, distLock *distlocksvc.Service, ipfs *ipfs.IPFS) (*Commandline, error) {
func NewCommandline(svc *services.Service) (*Commandline, error) {
return &Commandline{
Svc: svc,
DistLock: distLock,
IPFS: ipfs,
Svc: svc,
}, nil
}



+ 3
- 3
internal/cmdline/distlock.go View File

@@ -6,8 +6,8 @@ import (

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/lockprovider"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

func DistLockLock(ctx CommandContext, lockData []string) error {
@@ -22,7 +22,7 @@ func DistLockLock(ctx CommandContext, lockData []string) error {
req.Locks = append(req.Locks, l)
}

reqID, err := ctx.Cmdline.DistLock.Acquire(req, service.AcquireOption{
reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req, service.AcquireOption{
RetryTimeMs: 5000,
})
if err != nil {
@@ -62,7 +62,7 @@ func parseOneLock(lockData string) (distlock.Lock, error) {
}

func DistLockUnlock(ctx CommandContext, reqID string) error {
return ctx.Cmdline.DistLock.Release(reqID)
return ctx.Cmdline.Svc.DistLock.Release(reqID)
}

func init() {


+ 19
- 10
internal/cmdline/package.go View File

@@ -9,6 +9,7 @@ import (

"github.com/jedib0t/go-pretty/v6/table"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

func PackageListBucketPackages(ctx CommandContext, bucketID int64) error {
@@ -46,17 +47,23 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6
defer objIter.Close()

for {
objInfo, ok := objIter.MoveNext()
if !ok {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}

if objInfo.Error != nil {
return objInfo.Error
if err != nil {
return err
}
defer objInfo.File.Close()

outputFile, err := os.Create(filepath.Join(outputDir, objInfo.Object.Path))
fullPath := filepath.Join(outputDir, objInfo.Object.Path)

dirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}

outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
@@ -72,6 +79,8 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6
}

func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string
err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
@@ -88,7 +97,7 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount))

if err != nil {
@@ -141,7 +150,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update object %d failed, err: %w", packageID, err)
@@ -180,7 +189,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64,
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName))

if err != nil {
@@ -233,7 +242,7 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string
return fmt.Errorf("open directory %s failed, err: %w", rootPath, err)
}

objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter)
if err != nil {
return fmt.Errorf("update package %d failed, err: %w", packageID, err)


+ 6
- 6
internal/cmdline/storage.go View File

@@ -7,14 +7,14 @@ import (
"gitlink.org.cn/cloudream/common/models"
)

func StorageMovePackage(ctx CommandContext, packageID int64, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMovePackage(0, packageID, storageID)
func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
if err != nil {
return fmt.Errorf("start moving package to storage: %w", err)
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10)
complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
@@ -54,7 +54,7 @@ func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, st
}

func init() {
commands.MustAdd(StorageMovePackage, "storage", "move", "pkg")
commands.MustAdd(StorageLoadPackage, "stg", "load", "pkg")

commands.MustAdd(StorageCreateRepPackage, "storage", "upload", "rep")
commands.MustAdd(StorageCreateRepPackage, "stg", "upload", "rep")
}

+ 11
- 10
internal/config/config.go View File

@@ -2,22 +2,23 @@ package config

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs"
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
)

type Config struct {
GRPCPort int `json:"grpcPort"`
ECPacketSize int64 `json:"ecPacketSize"`
MaxRepCount int `json:"maxRepCount"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
Logger logger.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
Local stgmodels.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
ECPacketSize int64 `json:"ecPacketSize"`
MaxRepCount int `json:"maxRepCount"`
Logger logger.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
}

var cfg Config


+ 70
- 0
internal/http/object.go View File

@@ -0,0 +1,70 @@
package http

import (
"io"
"net/http"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
)

type ObjectService struct {
*Server
}

func (s *Server) ObjectSvc() *ObjectService {
return &ObjectService{
Server: s,
}
}

type ObjectDownloadReq struct {
UserID *int64 `form:"userID" binding:"required"`
ObjectID *int64 `form:"objectID" binding:"required"`
}

func (s *ObjectService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Download")

var req ObjectDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID)
if err != nil {
log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))
return
}

ctx.Writer.WriteHeader(http.StatusOK)
// TODO 需要设置FileName
ctx.Header("Content-Disposition", "attachment; filename=filename")
ctx.Header("Content-Type", "application/octet-stream")

buf := make([]byte, 4096)
ctx.Stream(func(w io.Writer) bool {
rd, err := file.Read(buf)
if err == io.EOF {
return false
}

if err != nil {
log.Warnf("reading file data: %s", err.Error())
return false
}

err = myio.WriteAll(w, buf[:rd])
if err != nil {
log.Warnf("writing data to response: %s", err.Error())
return false
}

return true
})
}

+ 0
- 50
internal/http/package.go View File

@@ -1,7 +1,6 @@
package http

import (
"io"
"mime/multipart"
"net/http"
"time"
@@ -24,55 +23,6 @@ func (s *Server) PackageSvc() *PackageService {
}
}

type PackageDownloadReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}

func (s *PackageService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Download")

var req PackageDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

file, err := s.svc.PackageSvc().DownloadPackage(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("downloading package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download package failed"))
return
}

ctx.Writer.WriteHeader(http.StatusOK)
// TODO 需要设置FileName
ctx.Header("Content-Disposition", "attachment; filename=filename")
ctx.Header("Content-Type", "application/octet-stream")

buf := make([]byte, 4096)
ctx.Stream(func(w io.Writer) bool {
rd, err := file.Read(buf)
if err == io.EOF {
return false
}

if err != nil {
log.Warnf("reading file data: %s", err.Error())
return false
}

err = myio.WriteAll(w, buf[:rd])
if err != nil {
log.Warnf("writing data to response: %s", err.Error())
return false
}

return true
})
}

type PackageUploadReq struct {
Info PackageUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`


+ 2
- 2
internal/http/server.go View File

@@ -38,9 +38,9 @@ func (s *Server) Serve() error {
}

func (s *Server) initRouters() {
s.engine.GET("/package/download", s.PackageSvc().Download)
s.engine.GET("/object/download", s.ObjectSvc().Download)
s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete)

s.engine.POST("/storage/movePackage", s.StorageSvc().MovePackage)
s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage)
}

+ 12
- 12
internal/http/storage.go View File

@@ -19,35 +19,35 @@ func (s *Server) StorageSvc() *StorageService {
}
}

type StorageMovePackageReq struct {
type StorageLoadPackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
}

func (s *StorageService) MovePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.MovePackage")
func (s *StorageService) LoadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.LoadPackage")

var req StorageMovePackageReq
var req StorageLoadPackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

taskID, err := s.svc.StorageSvc().StartStorageMovePackage(*req.UserID, *req.PackageID, *req.StorageID)
taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
if err != nil {
log.Warnf("start storage move package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
log.Warnf("start storage load package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}

for {
complete, err := s.svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10)
complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("moving complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
log.Warnf("loading complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}

@@ -56,8 +56,8 @@ func (s *StorageService) MovePackage(ctx *gin.Context) {
}

if err != nil {
log.Warnf("wait moving: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed"))
log.Warnf("wait loadding: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
return
}
}


+ 34
- 9
internal/services/bucket.go View File

@@ -3,8 +3,9 @@ package services
import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

@@ -22,7 +23,13 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket,
}

func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) {
resp, err := svc.coordinator.GetUserBuckets(coormq.NewGetUserBuckets(userID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetUserBuckets(coormq.NewGetUserBuckets(userID))
if err != nil {
return nil, fmt.Errorf("get user buckets failed, err: %w", err)
}
@@ -31,7 +38,13 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) {
}

func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) {
resp, err := svc.coordinator.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

resp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID))
if err != nil {
return nil, fmt.Errorf("get bucket packages failed, err: %w", err)
}
@@ -40,6 +53,12 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod
}

func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return 0, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

// TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。
// 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。

@@ -47,13 +66,13 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64,
Metadata().Bucket().CreateOne(userID, bucketName).
// TODO 可以考虑二次加锁,加的更精确
UserBucket().CreateAny().
MutexLock(svc.distlock)
MutexLock(svc.DistLock)
if err != nil {
return 0, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

resp, err := svc.coordinator.CreateBucket(coormq.NewCreateBucket(userID, bucketName))
resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName))
if err != nil {
return 0, fmt.Errorf("creating bucket: %w", err)
}
@@ -62,24 +81,30 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64,
}

func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

// TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁

mutex, err := reqbuilder.NewBuilder().
Metadata().
UserBucket().WriteAny().
Bucket().WriteOne(bucketID).
// TODO2
Package().WriteAny().
Object().WriteAny().
ObjectRep().WriteAny().
ObjectBlock().WriteAny().
StorageObject().WriteAny().
MutexLock(svc.distlock)
StoragePackage().WriteAny().
MutexLock(svc.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

_, err = svc.coordinator.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID))
_, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}


+ 15
- 0
internal/services/object.go View File

@@ -0,0 +1,15 @@
package services

import "io"

type ObjectService struct {
*Service
}

func (svc *Service) ObjectSvc() *ObjectService {
return &ObjectService{Service: svc}
}

func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) {
panic("not implement yet!")
}

+ 116
- 117
internal/services/package.go View File

@@ -8,7 +8,10 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-client/internal/config"
mytask "gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals"
agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)
@@ -22,197 +25,193 @@ func (svc *Service) PackageSvc() *PackageService {
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
/*
TODO2
// TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分
mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取文件信息
Object().ReadOne(objectID).
// 用于查询Rep配置
ObjectRep().ReadOne(objectID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
*/
getPkgResp, err := svc.coordinator.GetPackage(coormq.NewGetPackage(userID, packageID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

mutex, err := reqbuilder.NewBuilder().
// 用于判断用户是否有对象权限
Metadata().UserBucket().ReadAny().
// 用于查询可用的下载节点
Node().ReadAny().
// 用于读取包信息
Package().ReadOne(packageID).
// 用于读取包内的文件信息
Object().ReadAny().
// 用于查询Rep配置
ObjectRep().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于查询包含了副本的节点
Cache().ReadAny().
MutexLock(svc.DistLock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package: %w", err)
}

getObjsResp, err := svc.coordinator.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err)
}

if getPkgResp.Redundancy.Type == models.RedundancyRep {
getObjRepDataResp, err := svc.coordinator.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID))
iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli)

if err != nil {
mutex.Unlock()
return nil, err
}

iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
} else {
iter, err := svc.downloadECPackage(getPkgResp.Package, getObjsResp.Objects, coorCli)

if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
mutex.Unlock()
return nil, err
}

iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, svc.coordinator, svc.distlock, iterator.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})
iter.OnClosing = func() {
mutex.Unlock()
}

return iter, nil
}
}

getObjECDataResp, err := svc.coordinator.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID))
func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.RepObjectIterator, error) {
getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID))
if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err)
}

iter := iterator.NewRepObjectIterator(objects, getObjRepDataResp.Data, &iterator.DownloadContext{
Distlock: svc.DistLock,
})

return iter, nil
}
func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.ECObjectIterator, error) {
getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID))
if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecRed models.ECRedundancyInfo
if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil {
if err := serder.AnyToAny(pkg.Redundancy.Info, &ecRed); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := svc.coordinator.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, svc.coordinator, svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, iterator.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{
DownloadContext: &iterator.DownloadContext{
Distlock: svc.DistLock,
},
ECPacketSize: config.Cfg().ECPacketSize,
})

return iter, nil
}

func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewCreateRepPackage(
userID, bucketID, name, objIter,
repInfo,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage)
return true, &cteatePkgTask.Result, tsk.Error()
cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewUpdateRepPackage(
userID, packageID, objIter,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateRepPackage)
return true, &updatePkgTask.Result, tsk.Error()
updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewCreateECPackage(
userID, bucketID, name, objIter,
ecInfo,
config.Cfg().ECPacketSize,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo))
return tsk.ID(), nil
}

func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage)
return true, &cteatePkgTask.Result, tsk.Error()
cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage)
return true, cteatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) {
tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext](
agtcmd.NewUpdateECPackage(
userID, packageID, objIter,
config.Cfg().ECPacketSize,
agtcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: nil,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter))
return tsk.ID(), nil
}

func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) {
tsk := svc.taskMgr.FindByID(taskID)
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateECPackage)
return true, &updatePkgTask.Result, tsk.Error()
updatePkgTask := tsk.Body().(*mytask.UpdateECPackage)
return true, updatePkgTask.Result, tsk.Error()
}
return false, nil, nil
}

func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
/*
// TODO2
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(objectID).
// 用于删除Rep配置
ObjectRep().WriteOne(objectID).
// 用于删除Block配置
ObjectBlock().WriteAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改包信息
Package().WriteOne(packageID).
// 用于删除包内的所有文件
Object().WriteAny().
// 用于删除Rep配置
ObjectRep().WriteAny().
// 用于删除Block配置
ObjectBlock().WriteAny().
// 用于修改Move此Object的记录的状态
StoragePackage().WriteAny().
MutexLock(svc.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

_, err := svc.coordinator.DeletePackage(coormq.NewDeletePackage(userID, packageID))
_, err = coorCli.DeletePackage(coormq.NewDeletePackage(userID, packageID))
if err != nil {
return fmt.Errorf("deleting package: %w", err)
}


+ 9
- 1
internal/services/scanner.go View File

@@ -2,6 +2,8 @@ package services

import (
"fmt"

"gitlink.org.cn/cloudream/storage-common/globals"
)

type ScannerService struct {
@@ -13,7 +15,13 @@ func (svc *Service) ScannerSvc() *ScannerService {
}

func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error {
err := svc.scanner.PostEvent(event, isEmergency, dontMerge)
scCli, err := globals.ScannerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new scacnner client: %w", err)
}
defer scCli.Close()

err = scCli.PostEvent(event, isEmergency, dontMerge)
if err != nil {
return fmt.Errorf("request to scanner failed, err: %w", err)
}


+ 5
- 14
internal/services/service.go View File

@@ -2,26 +2,17 @@ package services

import (
distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-client/internal/task"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
)

type Service struct {
coordinator *coormq.Client
ipfs *ipfs.IPFS
scanner *scmq.Client
distlock *distlock.Service
taskMgr *task.Manager
DistLock *distlock.Service
TaskMgr *task.Manager
}

func NewService(coorClient *coormq.Client, ipfsClient *ipfs.IPFS, scanner *scmq.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) {
return &Service{
coordinator: coorClient,
ipfs: ipfsClient,
scanner: scanner,
distlock: distlock,
taskMgr: taskMgr,
DistLock: distlock,
TaskMgr: taskMgr,
}, nil
}

+ 14
- 8
internal/services/storage.go View File

@@ -5,8 +5,8 @@ import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)
@@ -19,13 +19,13 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc}
}

func (svc *StorageService) StartStorageMovePackage(userID int64, packageID int64, storageID int64) (string, error) {
tsk := svc.taskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID))
func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64, storageID int64) (string, error) {
tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID))
return tsk.ID(), nil
}

func (svc *StorageService) WaitStorageMovePackage(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.taskMgr.FindByID(taskID)
func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Error()
}
@@ -40,12 +40,18 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s

// 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) {
stgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := agtmq.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ)
agentCli, err := globals.AgentMQPool.Acquire(stgResp.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
@@ -60,7 +66,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6
}

func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) {
agentCli, err := agtmq.NewClient(nodeID, &config.Cfg().RabbitMQ)
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, 0, fmt.Errorf("new agent client: %w", err)


+ 38
- 0
internal/task/create_ec_package.go View File

@@ -0,0 +1,38 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage

Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{
UpdatePackageContext: &cmd.UpdatePackageContext{
Distlock: ctx.distlock,
},
ECPacketSize: config.Cfg().ECPacketSize,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 34
- 0
internal/task/create_rep_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateRepPackageResult = cmd.CreateRepPackageResult

type CreateRepPackage struct {
cmd cmd.CreateRepPackage

Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 108
- 0
internal/task/storage_load_package.go View File

@@ -0,0 +1,108 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type StorageLoadPackage struct {
userID int64
packageID int64
storageID int64
}

func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageLoadPackage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.packageID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取包信息
Package().ReadOne(t.packageID).
// 用于读取对象信息
Object().ReadAny().
// 用于查询Rep配置
ObjectRep().ReadAny().
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StoragePackage().CreateOne(t.storageID, t.userID, t.packageID).
Storage().
// 用于创建对象文件
CreateOnePackage(t.storageID, t.userID, t.packageID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentClient, err := globals.AgentMQPool.Acquire(getStgResp.NodeID)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartStorageLoadPackage(
agtmq.NewStartStorageLoadPackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
waitResp, err := agentClient.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait loading package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent loading package: %s", waitResp.Error)
}

break
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.packageID, t.storageID))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}
return nil
}

+ 0
- 101
internal/task/storage_move_package.go View File

@@ -1,101 +0,0 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-client/internal/config"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type StorageMovePackage struct {
userID int64
packageID int64
storageID int64
}

func NewStorageMovePackage(userID int64, packageID int64, storageID int64) *StorageMovePackage {
return &StorageMovePackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageMovePackage) do(ctx TaskContext) error {
/*
TODO2
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有Storage权限
UserStorage().ReadOne(t.packageID, t.storageID).
// 用于判断用户是否有对象权限
UserBucket().ReadAny().
// 用于读取对象信息
Object().ReadOne(t.packageID).
// 用于查询Rep配置
ObjectRep().ReadOne(t.packageID).
// 用于查询Block配置
ObjectBlock().ReadAny().
// 用于创建Move记录
StorageObject().CreateOne(t.storageID, t.userID, t.packageID).
Storage().
// 用于创建对象文件
CreateOneObject(t.storageID, t.userID, t.packageID).
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
getStgResp, err := ctx.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentClient, err := agtmq.NewClient(getStgResp.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer agentClient.Close()

agentMoveResp, err := agentClient.StartStorageMovePackage(
agtmq.NewStartStorageMovePackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start moving package to storage: %w", err)
}

for {
waitResp, err := agentClient.WaitStorageMovePackage(agtmq.NewWaitStorageMovePackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait moving package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent moving package: %s", waitResp.Error)
}

break
}
}

_, err = ctx.Coordinator().PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID))
if err != nil {
return fmt.Errorf("moving package to storage: %w", err)
}
return nil
}

+ 3
- 9
internal/task/task.go View File

@@ -3,14 +3,10 @@ package task
import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/ipfs"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type TaskContext struct {
ipfs *ipfs.IPFS
distLock *distsvc.Service
coordinator *coormq.Client
distlock *distsvc.Service
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +21,8 @@ type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coormq.Client) Manager {
func NewManager(distlock *distsvc.Service) Manager {
return task.NewManager(TaskContext{
ipfs: ipfs,
distLock: distlock,
coordinator: coorCli,
distlock: distlock,
})
}

+ 38
- 0
internal/task/update_ec_package.go View File

@@ -0,0 +1,38 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type UpdateECPackageResult = cmd.UpdateECPackageResult

type UpdateECPackage struct {
cmd cmd.UpdateECPackage

Result *UpdateECPackageResult
}

func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateECPackage {
return &UpdateECPackage{
cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter),
}
}

func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{
UpdatePackageContext: &cmd.UpdatePackageContext{
Distlock: ctx.distlock,
},
ECPacketSize: config.Cfg().ECPacketSize,
})

t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 34
- 0
internal/task/update_rep_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type UpdateRepPackageResult = cmd.UpdateRepPackageResult

type UpdateRepPackage struct {
cmd cmd.UpdateRepPackage

Result *UpdateRepPackageResult
}

func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage {
return &UpdateRepPackage{
cmd: *cmd.NewUpdateRepPackage(userID, packageID, objectIter),
}
}

func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})

t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 20
- 46
main.go View File

@@ -6,15 +6,13 @@ import (

_ "google.golang.org/grpc/balancer/grpclb"

distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-client/internal/cmdline"
"gitlink.org.cn/cloudream/storage-client/internal/config"
"gitlink.org.cn/cloudream/storage-client/internal/services"
"gitlink.org.cn/cloudream/storage-client/internal/task"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock"
)

func main() {
@@ -24,77 +22,53 @@ func main() {
os.Exit(1)
}

err = log.Init(&config.Cfg().Logger)
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

coorClient, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Warnf("new coordinator client failed, err: %s", err.Error())
os.Exit(1)
}

scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Warnf("new scanner client failed, err: %s", err.Error())
os.Exit(1)
}

var ipfsCli *ipfs.IPFS
globals.InitLocal(&config.Cfg().Local)
globals.InitMQPool(&config.Cfg().RabbitMQ)
globals.InitAgentRPCPool(&config.Cfg().AgentGRPC)
if config.Cfg().IPFS != nil {
log.Infof("IPFS config is not empty, so create a ipfs client")
logger.Infof("IPFS config is not empty, so create a ipfs client")

ipfsCli, err = ipfs.NewIPFS(config.Cfg().IPFS)
if err != nil {
log.Warnf("new ipfs client failed, err: %s", err.Error())
os.Exit(1)
}
globals.InitIPFSPool(config.Cfg().IPFS)
}

distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock)
distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
log.Warnf("new distlock service failed, err: %s", err.Error())
logger.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1)
}
go serveDistLock(distlockSvc)

taskMgr := task.NewManager(ipfsCli, distlockSvc, coorClient)
taskMgr := task.NewManager(distlockSvc)

svc, err := services.NewService(coorClient, ipfsCli, scanner, distlockSvc, &taskMgr)
svc, err := services.NewService(distlockSvc, &taskMgr)
if err != nil {
log.Warnf("new services failed, err: %s", err.Error())
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}

cmds, err := cmdline.NewCommandline(svc, distlockSvc, ipfsCli)
cmds, err := cmdline.NewCommandline(svc)
if err != nil {
log.Warnf("new command line failed, err: %s", err.Error())
logger.Warnf("new command line failed, err: %s", err.Error())
os.Exit(1)
}

cmds.DispatchCommand(os.Args[1:])
/*
TO DO future:
1. ls命令,显示用户指定桶下的所有对象,及相关的元数据
2. rm命令,用户指定bucket和object名,执行删除操作
3. update命令,用户发起对象更新命令,查询元数据,判断对象的冗余方式,删除旧对象(unpin所有的副本或编码块),写入新对象
4. ipfsStat命令,查看本地有无ipfsdaemon,ipfs目录的使用率
5. ipfsFlush命令,unpin本地ipfs目录中的所有cid(block)
6. 改为交互式client,输入用户名及秘钥后进入交互界面
7. 支持纯缓存类型的IPFS节点,数据一律存在后端存储服务中
*/
}

func serveDistLock(svc *distlocksvc.Service) {
log.Info("start serving distlock")
func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
log.Errorf("distlock stopped with error: %s", err.Error())
logger.Errorf("distlock stopped with error: %s", err.Error())
}

log.Info("distlock stopped")
logger.Info("distlock stopped")
}

Loading…
Cancel
Save