From a2feb16160b2cfb90c34e65ddee9d8ae63b09431 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 10:18:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ClientPool=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E9=99=A4=E5=AF=B9config=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmdline/commandline.go | 12 +-- internal/cmdline/distlock.go | 4 +- internal/cmdline/package.go | 18 ++-- internal/config/config.go | 21 ++--- internal/http/object.go | 70 ++++++++++++++++ internal/http/package.go | 50 ----------- internal/http/server.go | 2 +- internal/services/bucket.go | 37 ++++++-- internal/services/object.go | 15 ++++ internal/services/package.go | 116 ++++++++++---------------- internal/services/scanner.go | 10 ++- internal/services/service.go | 19 ++--- internal/services/storage.go | 18 ++-- internal/task/create_ec_package.go | 38 +++++++++ internal/task/create_rep_package.go | 34 ++++++++ internal/task/storage_move_package.go | 14 +++- internal/task/task.go | 12 +-- internal/task/update_ec_package.go | 38 +++++++++ internal/task/update_rep_package.go | 34 ++++++++ main.go | 60 ++++--------- 20 files changed, 385 insertions(+), 237 deletions(-) create mode 100644 internal/http/object.go create mode 100644 internal/services/object.go create mode 100644 internal/task/create_ec_package.go create mode 100644 internal/task/create_rep_package.go create mode 100644 internal/task/update_ec_package.go create mode 100644 internal/task/update_rep_package.go diff --git a/internal/cmdline/commandline.go b/internal/cmdline/commandline.go index 4d825d2..0533bd1 100644 --- a/internal/cmdline/commandline.go +++ b/internal/cmdline/commandline.go @@ -5,8 +5,6 @@ import ( "os" "gitlink.org.cn/cloudream/common/pkgs/cmdtrie" - distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-client/internal/services" ) @@ -17,16 +15,12 @@ type CommandContext struct { var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() type Commandline struct { - Svc *services.Service - DistLock *distlocksvc.Service - IPFS *ipfs.IPFS + Svc *services.Service } -func NewCommandline(svc *services.Service, distLock *distlocksvc.Service, ipfs *ipfs.IPFS) (*Commandline, error) { +func NewCommandline(svc *services.Service) (*Commandline, error) { return &Commandline{ - Svc: svc, - DistLock: distLock, - IPFS: ipfs, + Svc: svc, }, nil } diff --git a/internal/cmdline/distlock.go b/internal/cmdline/distlock.go index d0c1bfd..5fd701f 100644 --- a/internal/cmdline/distlock.go +++ b/internal/cmdline/distlock.go @@ -22,7 +22,7 @@ func DistLockLock(ctx CommandContext, lockData []string) error { req.Locks = append(req.Locks, l) } - reqID, err := ctx.Cmdline.DistLock.Acquire(req, service.AcquireOption{ + reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req, service.AcquireOption{ RetryTimeMs: 5000, }) if err != nil { @@ -62,7 +62,7 @@ func parseOneLock(lockData string) (distlock.Lock, error) { } func DistLockUnlock(ctx CommandContext, reqID string) error { - return ctx.Cmdline.DistLock.Release(reqID) + return ctx.Cmdline.Svc.DistLock.Release(reqID) } func init() { diff --git a/internal/cmdline/package.go b/internal/cmdline/package.go index c169a97..6fbd135 100644 --- a/internal/cmdline/package.go +++ b/internal/cmdline/package.go @@ -9,6 +9,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" ) func PackageListBucketPackages(ctx CommandContext, bucketID int64) error { @@ -46,13 +47,12 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 defer objIter.Close() for { - objInfo, ok := objIter.MoveNext() - if !ok { + objInfo, err := objIter.MoveNext() + if err == iterator.ErrNoMoreItem { break } - - if objInfo.Error != nil { - return objInfo.Error + if err != nil { + return err } defer objInfo.File.Close() @@ -88,7 +88,7 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) if err != nil { @@ -141,7 +141,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter) if err != nil { return fmt.Errorf("update object %d failed, err: %w", packageID, err) @@ -180,7 +180,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName)) if err != nil { @@ -233,7 +233,7 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter) if err != nil { return fmt.Errorf("update package %d failed, err: %w", packageID, err) diff --git a/internal/config/config.go b/internal/config/config.go index c65dad9..e4847eb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,22 +2,23 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" - "gitlink.org.cn/cloudream/common/utils/ipfs" + stgmodels "gitlink.org.cn/cloudream/storage-common/models" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { - GRPCPort int `json:"grpcPort"` - ECPacketSize int64 `json:"ecPacketSize"` - MaxRepCount int `json:"maxRepCount"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + ECPacketSize int64 `json:"ecPacketSize"` + MaxRepCount int `json:"maxRepCount"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/http/object.go b/internal/http/object.go new file mode 100644 index 0000000..671dc26 --- /dev/null +++ b/internal/http/object.go @@ -0,0 +1,70 @@ +package http + +import ( + "io" + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + myio "gitlink.org.cn/cloudream/common/utils/io" +) + +type ObjectService struct { + *Server +} + +func (s *Server) ObjectSvc() *ObjectService { + return &ObjectService{ + Server: s, + } +} + +type ObjectDownloadReq struct { + UserID *int64 `form:"userID" binding:"required"` + ObjectID *int64 `form:"objectID" binding:"required"` +} + +func (s *ObjectService) Download(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Download") + + var req ObjectDownloadReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID) + if err != nil { + log.Warnf("downloading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) + return + } + + ctx.Writer.WriteHeader(http.StatusOK) + // TODO 需要设置FileName + ctx.Header("Content-Disposition", "attachment; filename=filename") + ctx.Header("Content-Type", "application/octet-stream") + + buf := make([]byte, 4096) + ctx.Stream(func(w io.Writer) bool { + rd, err := file.Read(buf) + if err == io.EOF { + return false + } + + if err != nil { + log.Warnf("reading file data: %s", err.Error()) + return false + } + + err = myio.WriteAll(w, buf[:rd]) + if err != nil { + log.Warnf("writing data to response: %s", err.Error()) + return false + } + + return true + }) +} diff --git a/internal/http/package.go b/internal/http/package.go index d2890bc..bae4170 100644 --- a/internal/http/package.go +++ b/internal/http/package.go @@ -1,7 +1,6 @@ package http import ( - "io" "mime/multipart" "net/http" "time" @@ -24,55 +23,6 @@ func (s *Server) PackageSvc() *PackageService { } } -type PackageDownloadReq struct { - UserID *int64 `form:"userID" binding:"required"` - PackageID *int64 `form:"packageID" binding:"required"` -} - -func (s *PackageService) Download(ctx *gin.Context) { - log := logger.WithField("HTTP", "Package.Download") - - var req PackageDownloadReq - if err := ctx.ShouldBindQuery(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - file, err := s.svc.PackageSvc().DownloadPackage(*req.UserID, *req.PackageID) - if err != nil { - log.Warnf("downloading package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download package failed")) - return - } - - ctx.Writer.WriteHeader(http.StatusOK) - // TODO 需要设置FileName - ctx.Header("Content-Disposition", "attachment; filename=filename") - ctx.Header("Content-Type", "application/octet-stream") - - buf := make([]byte, 4096) - ctx.Stream(func(w io.Writer) bool { - rd, err := file.Read(buf) - if err == io.EOF { - return false - } - - if err != nil { - log.Warnf("reading file data: %s", err.Error()) - return false - } - - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - return false - } - - return true - }) -} - type PackageUploadReq struct { Info PackageUploadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` diff --git a/internal/http/server.go b/internal/http/server.go index 72738e0..182ca7b 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -38,7 +38,7 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET("/package/download", s.PackageSvc().Download) + s.engine.GET("/object/download", s.ObjectSvc().Download) s.engine.POST("/package/upload", s.PackageSvc().Upload) s.engine.POST("/package/delete", s.PackageSvc().Delete) diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 5b843ab..5b0549e 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -4,6 +4,7 @@ import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -22,7 +23,13 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, } func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { - resp, err := svc.coordinator.GetUserBuckets(coormq.NewGetUserBuckets(userID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + resp, err := coorCli.GetUserBuckets(coormq.NewGetUserBuckets(userID)) if err != nil { return nil, fmt.Errorf("get user buckets failed, err: %w", err) } @@ -31,7 +38,13 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { } func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) { - resp, err := svc.coordinator.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + resp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) if err != nil { return nil, fmt.Errorf("get bucket packages failed, err: %w", err) } @@ -40,6 +53,12 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod } func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return 0, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + // TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。 // 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。 @@ -47,13 +66,13 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, Metadata().Bucket().CreateOne(userID, bucketName). // TODO 可以考虑二次加锁,加的更精确 UserBucket().CreateAny(). - MutexLock(svc.distlock) + MutexLock(svc.DistLock) if err != nil { return 0, fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - resp, err := svc.coordinator.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) + resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { return 0, fmt.Errorf("creating bucket: %w", err) } @@ -62,6 +81,12 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, } func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + // TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁 mutex, err := reqbuilder.NewBuilder(). @@ -73,13 +98,13 @@ func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { ObjectRep().WriteAny(). ObjectBlock().WriteAny(). StorageObject().WriteAny(). - MutexLock(svc.distlock) + MutexLock(svc.DistLock) if err != nil { return fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - _, err = svc.coordinator.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) + _, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } diff --git a/internal/services/object.go b/internal/services/object.go new file mode 100644 index 0000000..94c6de7 --- /dev/null +++ b/internal/services/object.go @@ -0,0 +1,15 @@ +package services + +import "io" + +type ObjectService struct { + *Service +} + +func (svc *Service) ObjectSvc() *ObjectService { + return &ObjectService{Service: svc} +} + +func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) { + panic("not implement yet!") +} diff --git a/internal/services/package.go b/internal/services/package.go index c7ef8e6..df4efb1 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage-client/internal/config" mytask "gitlink.org.cn/cloudream/storage-client/internal/task" + "gitlink.org.cn/cloudream/storage-common/globals" agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" @@ -22,6 +23,12 @@ func (svc *Service) PackageSvc() *PackageService { } func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + /* TODO2 // TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分 @@ -43,33 +50,30 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera return nil, fmt.Errorf("acquire locks failed, err: %w", err) } */ - getPkgResp, err := svc.coordinator.GetPackage(coormq.NewGetPackage(userID, packageID)) + getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID)) if err != nil { return nil, fmt.Errorf("getting package: %w", err) } - getObjsResp, err := svc.coordinator.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) + getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) if err != nil { return nil, fmt.Errorf("getting package objects: %w", err) } if getPkgResp.Redundancy.Type == models.RedundancyRep { - getObjRepDataResp, err := svc.coordinator.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) + getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) if err != nil { return nil, fmt.Errorf("getting package object rep data: %w", err) } - iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, svc.coordinator, svc.distlock, iterator.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, + iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, &iterator.DownloadContext{ + Distlock: svc.DistLock, }) return iter, nil } - getObjECDataResp, err := svc.coordinator.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID)) + getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID)) if err != nil { return nil, fmt.Errorf("getting package object ec data: %w", err) } @@ -79,118 +83,84 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera return nil, fmt.Errorf("get ec redundancy info: %w", err) } - getECResp, err := svc.coordinator.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) + getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) if err != nil { return nil, fmt.Errorf("getting ec: %w", err) } - iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, svc.coordinator, svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, iterator.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, + iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{ + DownloadContext: &iterator.DownloadContext{ + Distlock: svc.DistLock, + }, + ECPacketSize: config.Cfg().ECPacketSize, }) return iter, nil } func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewCreateRepPackage( - userID, bucketID, name, objIter, - repInfo, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo)) return tsk.ID(), nil } -func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) +func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) { + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) - return true, &cteatePkgTask.Result, tsk.Error() + cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewUpdateRepPackage( - userID, packageID, objIter, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter)) return tsk.ID(), nil } func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateRepPackage) - return true, &updatePkgTask.Result, tsk.Error() + updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage) + return true, updatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewCreateECPackage( - userID, bucketID, name, objIter, - ecInfo, - config.Cfg().ECPacketSize, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo)) return tsk.ID(), nil } func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) - return true, &cteatePkgTask.Result, tsk.Error() + cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewUpdateECPackage( - userID, packageID, objIter, - config.Cfg().ECPacketSize, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter)) return tsk.ID(), nil } func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateECPackage) - return true, &updatePkgTask.Result, tsk.Error() + updatePkgTask := tsk.Body().(*mytask.UpdateECPackage) + return true, updatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + /* // TODO2 mutex, err := reqbuilder.NewBuilder(). @@ -212,7 +182,7 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { defer mutex.Unlock() */ - _, err := svc.coordinator.DeletePackage(coormq.NewDeletePackage(userID, packageID)) + _, err = coorCli.DeletePackage(coormq.NewDeletePackage(userID, packageID)) if err != nil { return fmt.Errorf("deleting package: %w", err) } diff --git a/internal/services/scanner.go b/internal/services/scanner.go index 58e568e..3743c60 100644 --- a/internal/services/scanner.go +++ b/internal/services/scanner.go @@ -2,6 +2,8 @@ package services import ( "fmt" + + "gitlink.org.cn/cloudream/storage-common/globals" ) type ScannerService struct { @@ -13,7 +15,13 @@ func (svc *Service) ScannerSvc() *ScannerService { } func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error { - err := svc.scanner.PostEvent(event, isEmergency, dontMerge) + scCli, err := globals.ScannerMQPool.Acquire() + if err != nil { + return fmt.Errorf("new scacnner client: %w", err) + } + defer scCli.Close() + + err = scCli.PostEvent(event, isEmergency, dontMerge) if err != nil { return fmt.Errorf("request to scanner failed, err: %w", err) } diff --git a/internal/services/service.go b/internal/services/service.go index 638fd3f..3f21616 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -2,26 +2,17 @@ package services import ( distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-client/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" ) type Service struct { - coordinator *coormq.Client - ipfs *ipfs.IPFS - scanner *scmq.Client - distlock *distlock.Service - taskMgr *task.Manager + DistLock *distlock.Service + TaskMgr *task.Manager } -func NewService(coorClient *coormq.Client, ipfsClient *ipfs.IPFS, scanner *scmq.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { return &Service{ - coordinator: coorClient, - ipfs: ipfsClient, - scanner: scanner, - distlock: distlock, - taskMgr: taskMgr, + DistLock: distlock, + TaskMgr: taskMgr, }, nil } diff --git a/internal/services/storage.go b/internal/services/storage.go index 34a439e..89f3270 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -5,8 +5,8 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/task" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -20,12 +20,12 @@ func (svc *Service) StorageSvc() *StorageService { } func (svc *StorageService) StartStorageMovePackage(userID int64, packageID int64, storageID int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID)) + tsk := svc.TaskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID)) return tsk.ID(), nil } func (svc *StorageService) WaitStorageMovePackage(taskID string, waitTimeout time.Duration) (bool, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { return true, tsk.Error() } @@ -40,12 +40,18 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { - stgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return 0, "", fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := agtmq.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ) + agentCli, err := globals.AgentMQPool.Acquire(stgResp.NodeID) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -60,7 +66,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 } func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) { - agentCli, err := agtmq.NewClient(nodeID, &config.Cfg().RabbitMQ) + agentCli, err := globals.AgentMQPool.Acquire(nodeID) if err != nil { // TODO 失败是否要当做任务已经结束? return true, 0, fmt.Errorf("new agent client: %w", err) diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go new file mode 100644 index 0000000..005abd4 --- /dev/null +++ b/internal/task/create_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-client/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateECPackageResult = cmd.CreateECPackageResult + +type CreateECPackage struct { + cmd cmd.CreateECPackage + + Result *CreateECPackageResult +} + +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { + return &CreateECPackage{ + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go new file mode 100644 index 0000000..7b15c64 --- /dev/null +++ b/internal/task/create_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateRepPackageResult = cmd.CreateRepPackageResult + +type CreateRepPackage struct { + cmd cmd.CreateRepPackage + + Result *CreateRepPackageResult +} + +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { + return &CreateRepPackage{ + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/storage_move_package.go b/internal/task/storage_move_package.go index 9f34b34..984ddbf 100644 --- a/internal/task/storage_move_package.go +++ b/internal/task/storage_move_package.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/storage-client/internal/config" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -56,13 +56,19 @@ func (t *StorageMovePackage) do(ctx TaskContext) error { } defer mutex.Unlock() */ - getStgResp, err := ctx.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) if err != nil { return fmt.Errorf("getting storage info: %w", err) } // 然后向代理端发送移动文件的请求 - agentClient, err := agtmq.NewClient(getStgResp.NodeID, &config.Cfg().RabbitMQ) + agentClient, err := globals.AgentMQPool.Acquire(getStgResp.NodeID) if err != nil { return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) } @@ -93,7 +99,7 @@ func (t *StorageMovePackage) do(ctx TaskContext) error { } } - _, err = ctx.Coordinator().PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID)) + _, err = coorCli.PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID)) if err != nil { return fmt.Errorf("moving package to storage: %w", err) } diff --git a/internal/task/task.go b/internal/task/task.go index 69466e3..3653981 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -3,14 +3,10 @@ package task import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/common/utils/ipfs" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - ipfs *ipfs.IPFS - distLock *distsvc.Service - coordinator *coormq.Client + distlock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +21,8 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coormq.Client) Manager { +func NewManager(distlock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - ipfs: ipfs, - distLock: distlock, - coordinator: coorCli, + distlock: distlock, }) } diff --git a/internal/task/update_ec_package.go b/internal/task/update_ec_package.go new file mode 100644 index 0000000..15035db --- /dev/null +++ b/internal/task/update_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-client/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type UpdateECPackageResult = cmd.UpdateECPackageResult + +type UpdateECPackage struct { + cmd cmd.UpdateECPackage + + Result *UpdateECPackageResult +} + +func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateECPackage { + return &UpdateECPackage{ + cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter), + } +} + +func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/update_rep_package.go b/internal/task/update_rep_package.go new file mode 100644 index 0000000..18646d1 --- /dev/null +++ b/internal/task/update_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type UpdateRepPackageResult = cmd.UpdateRepPackageResult + +type UpdateRepPackage struct { + cmd cmd.UpdateRepPackage + + Result *UpdateRepPackageResult +} + +func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage { + return &UpdateRepPackage{ + cmd: *cmd.NewUpdateRepPackage(userID, packageID, objectIter), + } +} + +func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/main.go b/main.go index 35fa37f..0f9a07b 100644 --- a/main.go +++ b/main.go @@ -7,14 +7,12 @@ import ( _ "google.golang.org/grpc/balancer/grpclb" distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-client/internal/cmdline" "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/services" "gitlink.org.cn/cloudream/storage-client/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" + "gitlink.org.cn/cloudream/storage-common/globals" ) func main() { @@ -24,77 +22,53 @@ func main() { os.Exit(1) } - err = log.Init(&config.Cfg().Logger) + err = logger.Init(&config.Cfg().Logger) if err != nil { fmt.Printf("init logger failed, err: %s", err.Error()) os.Exit(1) } - coorClient, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Warnf("new coordinator client failed, err: %s", err.Error()) - os.Exit(1) - } - - scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Warnf("new scanner client failed, err: %s", err.Error()) - os.Exit(1) - } - - var ipfsCli *ipfs.IPFS + globals.InitLocal(&config.Cfg().Local) + globals.InitMQPool(&config.Cfg().RabbitMQ) + globals.InitAgentRPCPool(&config.Cfg().AgentGRPC) if config.Cfg().IPFS != nil { - log.Infof("IPFS config is not empty, so create a ipfs client") + logger.Infof("IPFS config is not empty, so create a ipfs client") - ipfsCli, err = ipfs.NewIPFS(config.Cfg().IPFS) - if err != nil { - log.Warnf("new ipfs client failed, err: %s", err.Error()) - os.Exit(1) - } + globals.InitIPFSPool(config.Cfg().IPFS) } distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock) if err != nil { - log.Warnf("new distlock service failed, err: %s", err.Error()) + logger.Warnf("new distlock service failed, err: %s", err.Error()) os.Exit(1) } go serveDistLock(distlockSvc) - taskMgr := task.NewManager(ipfsCli, distlockSvc, coorClient) + taskMgr := task.NewManager(distlockSvc) - svc, err := services.NewService(coorClient, ipfsCli, scanner, distlockSvc, &taskMgr) + svc, err := services.NewService(distlockSvc, &taskMgr) if err != nil { - log.Warnf("new services failed, err: %s", err.Error()) + logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) } - cmds, err := cmdline.NewCommandline(svc, distlockSvc, ipfsCli) + cmds, err := cmdline.NewCommandline(svc) if err != nil { - log.Warnf("new command line failed, err: %s", err.Error()) + logger.Warnf("new command line failed, err: %s", err.Error()) os.Exit(1) } cmds.DispatchCommand(os.Args[1:]) - /* - TO DO future: - 1. ls命令,显示用户指定桶下的所有对象,及相关的元数据 - 2. rm命令,用户指定bucket和object名,执行删除操作 - 3. update命令,用户发起对象更新命令,查询元数据,判断对象的冗余方式,删除旧对象(unpin所有的副本或编码块),写入新对象 - 4. ipfsStat命令,查看本地有无ipfsdaemon,ipfs目录的使用率 - 5. ipfsFlush命令,unpin本地ipfs目录中的所有cid(block) - 6. 改为交互式client,输入用户名及秘钥后进入交互界面 - 7. 支持纯缓存类型的IPFS节点,数据一律存在后端存储服务中 - */ } func serveDistLock(svc *distlocksvc.Service) { - log.Info("start serving distlock") + logger.Info("start serving distlock") err := svc.Serve() if err != nil { - log.Errorf("distlock stopped with error: %s", err.Error()) + logger.Errorf("distlock stopped with error: %s", err.Error()) } - log.Info("distlock stopped") + logger.Info("distlock stopped") }