diff --git a/agent/internal/services/grpc/service.go b/agent/internal/services/grpc/service.go index 8944fae..1f060d2 100644 --- a/agent/internal/services/grpc/service.go +++ b/agent/internal/services/grpc/service.go @@ -6,7 +6,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" ) @@ -21,7 +21,7 @@ func NewService() *Service { func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { log.Debugf("client upload file") - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { log.Warnf("new ipfs client: %s", err.Error()) return fmt.Errorf("new ipfs client: %w", err) @@ -84,7 +84,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error { log.WithField("FileHash", req.FileHash).Debugf("client download file") - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { log.Warnf("new ipfs client: %s", err.Error()) return fmt.Errorf("new ipfs client: %w", err) diff --git a/agent/internal/services/mq/agent.go b/agent/internal/services/mq/agent.go index 0c52ba7..b442beb 100644 --- a/agent/internal/services/mq/agent.go +++ b/agent/internal/services/mq/agent.go @@ -4,14 +4,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { var ipfsState string - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { logger.Warnf("new ipfs client: %s", err.Error()) ipfsState = consts.IPFSStateUnavailable diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index cd053d3..13f6e57 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -12,12 +12,12 @@ import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { logger.Warnf("new ipfs client: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new ipfs client failed") diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 6498a97..e82cae9 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -13,7 +13,7 @@ import ( "gitlink.org.cn/cloudream/storage/agent/internal/config" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -21,7 +21,7 @@ import ( ) func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { logger.Warnf("new coordinator client: %s", err.Error()) @@ -157,7 +157,7 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs. } func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { logger.Warnf("new coordinator client: %s", err.Error()) diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 4793f0a..28e92fa 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -4,10 +4,10 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -17,7 +17,7 @@ type CacheMovePackage struct { userID int64 packageID int64 - ResultCacheInfos []models.ObjectCacheInfo + ResultCacheInfos []stgsdk.ObjectCacheInfo } func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { @@ -50,14 +50,14 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { Cache().CreateAny(). IPFS(). // pin文件 - CreateAnyRep(*globals.Local.NodeID). + CreateAnyRep(*stgglb.Local.NodeID). MutexLock(ctx.distlock) if err != nil { return fmt.Errorf("acquiring distlock: %w", err) } defer mutex.Unlock() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) } @@ -82,7 +82,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, return fmt.Errorf("getting package object rep data: %w", err) } - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { return fmt.Errorf("new ipfs client: %w", err) } @@ -95,10 +95,10 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, } fileHashes = append(fileHashes, rep.FileHash) - t.ResultCacheInfos = append(t.ResultCacheInfos, models.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash)) + t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash)) } - _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) + _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes)) if err != nil { return fmt.Errorf("reporting cache package moved: %w", err) } diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_ec_package.go index b651e95..c61bd58 100644 --- a/agent/internal/task/create_ec_package.go +++ b/agent/internal/task/create_ec_package.go @@ -3,9 +3,9 @@ package task import ( "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -18,7 +18,7 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } diff --git a/agent/internal/task/create_rep_package.go b/agent/internal/task/create_rep_package.go index 1fc7b59..7650bf1 100644 --- a/agent/internal/task/create_rep_package.go +++ b/agent/internal/task/create_rep_package.go @@ -3,9 +3,9 @@ package task import ( "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -18,7 +18,7 @@ type CreateRepPackage struct { Result *CreateRepPackageResult } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } diff --git a/agent/internal/task/ipfs_pin.go b/agent/internal/task/ipfs_pin.go index 29255c8..4807c99 100644 --- a/agent/internal/task/ipfs_pin.go +++ b/agent/internal/task/ipfs_pin.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" ) type IPFSPin struct { @@ -33,7 +33,7 @@ func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complet log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { err := fmt.Errorf("new ipfs client: %w", err) log.Warn(err.Error()) diff --git a/agent/internal/task/ipfs_read.go b/agent/internal/task/ipfs_read.go index bbce93a..aab48bf 100644 --- a/agent/internal/task/ipfs_read.go +++ b/agent/internal/task/ipfs_read.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" ) type IPFSRead struct { @@ -63,7 +63,7 @@ func (t *IPFSRead) Execute(task *task.Task[TaskContext], ctx TaskContext, comple } defer outputFile.Close() - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { err := fmt.Errorf("new ipfs client: %w", err) log.Warn(err.Error()) diff --git a/agent/main.go b/agent/main.go index aeab3b9..188c850 100644 --- a/agent/main.go +++ b/agent/main.go @@ -10,7 +10,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage/agent/internal/config" "gitlink.org.cn/cloudream/storage/agent/internal/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" @@ -45,12 +45,12 @@ func main() { os.Exit(1) } - globals.InitLocal(&config.Cfg().Local) - globals.InitMQPool(&config.Cfg().RabbitMQ) - globals.InitAgentRPCPool(&agtrpc.PoolConfig{ + stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{ Port: config.Cfg().GRPC.Port, }) - globals.InitIPFSPool(&config.Cfg().IPFS) + stgglb.InitIPFSPool(&config.Cfg().IPFS) distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 50db31f..b6cb836 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -8,7 +8,7 @@ import ( "time" "github.com/jedib0t/go-pretty/v6/table" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -112,7 +112,7 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount), nodeAff) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, stgsdk.NewRepRedundancyInfo(repCount), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) @@ -209,7 +209,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, stgsdk.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index 38e6e23..f727068 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error { @@ -31,7 +31,7 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, - models.NewTypedRepRedundancyInfo(repCount), nil) + stgsdk.NewTypedRepRedundancyInfo(repCount), nil) if err != nil { return fmt.Errorf("start storage uploading rep package: %w", err) } diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index a8635cd..c196751 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -6,8 +6,8 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type CacheService struct { @@ -26,7 +26,7 @@ type CacheMovePackageReq struct { NodeID *int64 `json:"nodeID" binding:"required"` } type CacheMovePackageResp struct { - CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"` + CacheInfos []stgsdk.ObjectCacheInfo `json:"cacheInfos"` } func (s *CacheService) MovePackage(ctx *gin.Context) { diff --git a/client/internal/http/package.go b/client/internal/http/package.go index da764a6..5a86f48 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -7,9 +7,9 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -33,7 +33,7 @@ type PackageUploadInfo struct { UserID *int64 `json:"userID" binding:"required"` BucketID *int64 `json:"bucketID" binding:"required"` Name string `json:"name" binding:"required"` - Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy" binding:"required"` NodeAffinity *int64 `json:"nodeAffinity"` } @@ -68,7 +68,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { log := logger.WithField("HTTP", "Package.Upload") var err error - var repInfo models.RepRedundancyInfo + var repInfo stgsdk.RepRedundancyInfo if repInfo, err = req.Info.Redundancy.ToRepInfo(); err != nil { log.Warnf("parsing rep redundancy config: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) @@ -112,7 +112,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { log := logger.WithField("HTTP", "Package.Upload") var err error - var ecInfo models.ECRedundancyInfo + var ecInfo stgsdk.ECRedundancyInfo if ecInfo, err = req.Info.Redundancy.ToECInfo(); err != nil { log.Warnf("parsing ec redundancy config: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) @@ -182,7 +182,7 @@ type GetCachedNodesReq struct { PackageID *int64 `json:"packageID" binding:"required"` } type GetCachedNodesResp struct { - models.PackageCachingInfo + stgsdk.PackageCachingInfo } func (s *PackageService) GetCachedNodes(ctx *gin.Context) { diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 60a5587..d16ff68 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -6,8 +6,8 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type StorageService struct { @@ -70,7 +70,7 @@ type StorageCreatePackageReq struct { Path string `json:"path" binding:"required"` BucketID *int64 `json:"bucketID" binding:"required"` Name string `json:"name" binding:"required"` - Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"` + Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy" binding:"required"` NodeAffinity *int64 `json:"nodeAffinity"` } diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 3c21676..6ef0932 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -3,7 +3,7 @@ package services import ( "fmt" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -23,7 +23,7 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, } func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -38,7 +38,7 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { } func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -53,7 +53,7 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod } func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return 0, fmt.Errorf("new coordinator client: %w", err) } @@ -81,7 +81,7 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, } func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) } diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index 9091520..415089c 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -4,8 +4,9 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/storage/common/globals" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) @@ -18,7 +19,7 @@ func (svc *Service) CacheSvc() *CacheService { } func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, nodeID int64) (string, error) { - agentCli, err := globals.AgentMQPool.Acquire(nodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { return "", fmt.Errorf("new agent client: %w", err) } @@ -32,8 +33,8 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no return startResp.TaskID, nil } -func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []models.ObjectCacheInfo, error) { - agentCli, err := globals.AgentMQPool.Acquire(nodeID) +func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []stgsdk.ObjectCacheInfo, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { return true, nil, fmt.Errorf("new agent client: %w", err) } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index f65907f..4f5ba28 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -4,9 +4,10 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + mytask "gitlink.org.cn/cloudream/storage/client/internal/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtcmd "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -23,7 +24,7 @@ func (svc *Service) PackageSvc() *PackageService { } func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -106,7 +107,7 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. return nil, fmt.Errorf("getting package object ec data: %w", err) } - var ecInfo models.ECRedundancyInfo + var ecInfo stgsdk.ECRedundancyInfo if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil { return nil, fmt.Errorf("get ec redundancy info: %w", err) } @@ -123,7 +124,7 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model. return iter, nil } -func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo, nodeAffinity *int64) (string, error) { +func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo stgsdk.RepRedundancyInfo, nodeAffinity *int64) (string, error) { tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity)) return tsk.ID(), nil } @@ -151,7 +152,7 @@ func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout tim return false, nil, nil } -func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo, nodeAffinity *int64) (string, error) { +func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo stgsdk.ECRedundancyInfo, nodeAffinity *int64) (string, error) { tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity)) return tsk.ID(), nil } @@ -180,7 +181,7 @@ func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time } func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) } @@ -214,19 +215,19 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { return nil } -func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (models.PackageCachingInfo, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() +func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (stgsdk.PackageCachingInfo, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return models.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err) + return stgsdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err) } defer coorCli.Close() resp, err := coorCli.GetPackageCachedNodes(coormq.NewGetPackageCachedNodes(userID, packageID)) if err != nil { - return models.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err) + return stgsdk.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err) } - tmp := models.PackageCachingInfo{ + tmp := stgsdk.PackageCachingInfo{ NodeInfos: resp.NodeInfos, PackageSize: resp.PackageSize, RedunancyType: resp.RedunancyType, @@ -235,7 +236,7 @@ func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (models } func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int64, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } diff --git a/client/internal/services/scanner.go b/client/internal/services/scanner.go index 2ad0869..1e060f3 100644 --- a/client/internal/services/scanner.go +++ b/client/internal/services/scanner.go @@ -3,7 +3,7 @@ package services import ( "fmt" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -17,7 +17,7 @@ func (svc *Service) ScannerSvc() *ScannerService { } func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMerge bool) error { - scCli, err := globals.ScannerMQPool.Acquire() + scCli, err := stgglb.ScannerMQPool.Acquire() if err != nil { return fmt.Errorf("new scacnner client: %w", err) } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 3083ed6..585eaa5 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -4,9 +4,10 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/client/internal/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -38,8 +39,8 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s } // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID -func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() +func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy stgsdk.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return 0, "", fmt.Errorf("new coordinator client: %w", err) } @@ -50,7 +51,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := globals.AgentMQPool.Acquire(stgResp.NodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -65,7 +66,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 } func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) { - agentCli, err := globals.AgentMQPool.Acquire(nodeID) + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { // TODO 失败是否要当做任务已经结束? return true, 0, fmt.Errorf("new agent client: %w", err) diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_ec_package.go index 2452676..1b35c14 100644 --- a/client/internal/task/create_ec_package.go +++ b/client/internal/task/create_ec_package.go @@ -3,8 +3,8 @@ package task import ( "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/task" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -17,7 +17,7 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } diff --git a/client/internal/task/create_rep_package.go b/client/internal/task/create_rep_package.go index 30c6131..9dff26b 100644 --- a/client/internal/task/create_rep_package.go +++ b/client/internal/task/create_rep_package.go @@ -3,8 +3,8 @@ package task import ( "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/task" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -17,7 +17,7 @@ type CreateRepPackage struct { Result *CreateRepPackageResult } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), } diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 73c88bb..65dd3cf 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -5,7 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -58,7 +58,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { } defer mutex.Unlock() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) } @@ -70,7 +70,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { } // 然后向代理端发送移动文件的请求 - agentClient, err := globals.AgentMQPool.Acquire(getStgResp.NodeID) + agentClient, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID) if err != nil { return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) } diff --git a/client/main.go b/client/main.go index 8c2bdfe..9fe03d5 100644 --- a/client/main.go +++ b/client/main.go @@ -11,7 +11,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/client/internal/services" "gitlink.org.cn/cloudream/storage/client/internal/task" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" ) @@ -28,13 +28,13 @@ func main() { os.Exit(1) } - globals.InitLocal(&config.Cfg().Local) - globals.InitMQPool(&config.Cfg().RabbitMQ) - globals.InitAgentRPCPool(&config.Cfg().AgentGRPC) + stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) if config.Cfg().IPFS != nil { logger.Infof("IPFS config is not empty, so create a ipfs client") - globals.InitIPFSPool(config.Cfg().IPFS) + stgglb.InitIPFSPool(config.Cfg().IPFS) } distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) diff --git a/common/globals/globals.go b/common/globals/globals.go index 8a74977..9bb2013 100644 --- a/common/globals/globals.go +++ b/common/globals/globals.go @@ -1,4 +1,4 @@ -package globals +package stgglb import ( stgmodels "gitlink.org.cn/cloudream/storage/common/models" diff --git a/common/globals/pools.go b/common/globals/pools.go index 466791a..9b4ef5e 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -1,4 +1,4 @@ -package globals +package stgglb import ( "gitlink.org.cn/cloudream/common/pkgs/ipfs" diff --git a/common/models/models.go b/common/models/models.go index 60e4f54..5ba1a08 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -1,4 +1,4 @@ -package models +package stgmod import "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index c6148c4..c6ea857 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -9,9 +9,10 @@ import ( "sync" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/storage/common/globals" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" @@ -24,7 +25,7 @@ type CreateECPackage struct { bucketID int64 name string objectIter iterator.UploadingObjectIterator - redundancy models.ECRedundancyInfo + redundancy stgsdk.ECRedundancyInfo nodeAffinity *int64 } @@ -39,7 +40,7 @@ type ECObjectUploadResult struct { ObjectID int64 } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { return &CreateECPackage{ userID: userID, bucketID: bucketID, @@ -53,7 +54,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageResult, error) { defer t.objectIter.Close() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -79,7 +80,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe defer mutex.Unlock() createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, - models.NewTypedRedundancyInfo(t.redundancy))) + stgsdk.NewTypedRedundancyInfo(t.redundancy))) if err != nil { return nil, fmt.Errorf("creating package: %w", err) } @@ -89,7 +90,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -109,11 +110,11 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if globals.Local.NodeID != nil { - ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + if stgglb.Local.NodeID != nil { + ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } for _, node := range uploadNodeInfos { - if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID { + if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { continue } @@ -138,8 +139,8 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe }, nil } -func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo models.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() +func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -184,7 +185,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje } // 上传文件 -func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo models.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) { +func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) { //生成纠删码的写入节点序列 nodes := make([]UploadNodeInfo, ec.EcN) numNodes := len(uploadNodes) diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go index 9814b14..9b31f16 100644 --- a/common/pkgs/cmd/create_rep_package.go +++ b/common/pkgs/cmd/create_rep_package.go @@ -7,13 +7,13 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/models" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -29,7 +29,7 @@ type CreateRepPackage struct { bucketID int64 name string objectIter iterator.UploadingObjectIterator - redundancy models.RepRedundancyInfo + redundancy stgsdk.RepRedundancyInfo nodeAffinity *int64 } @@ -49,7 +49,7 @@ type RepObjectUploadResult struct { ObjectID int64 } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { return &CreateRepPackage{ userID: userID, bucketID: bucketID, @@ -63,15 +63,15 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackageResult, error) { defer t.objectIter.Close() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } reqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if globals.Local.NodeID != nil { - reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + if stgglb.Local.NodeID != nil { + reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } mutex, err := reqBlder. Metadata(). @@ -94,7 +94,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage defer mutex.Unlock() createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, - models.NewTypedRedundancyInfo(t.redundancy))) + stgsdk.NewTypedRedundancyInfo(t.redundancy))) if err != nil { return nil, fmt.Errorf("creating package: %w", err) } @@ -104,7 +104,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -138,7 +138,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage } func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo) ([]RepObjectUploadResult, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -185,11 +185,11 @@ func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObj // 上传文件 func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { // 本地有IPFS,则直接从本地IPFS上传 - if globals.IPFSPool != nil { + if stgglb.IPFSPool != nil { logger.Infof("try to use local IPFS to upload file") // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 - fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, globals.Local.NodeID == nil) + fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil) if err == nil { return fileHash, nil @@ -236,7 +236,7 @@ func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity } func uploadToNode(file io.Reader, nodeIP string) (string, error) { - rpcCli, err := globals.AgentRPCPool.Acquire(nodeIP) + rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP) if err != nil { return "", fmt.Errorf("new agent rpc client: %w", err) } @@ -246,7 +246,7 @@ func uploadToNode(file io.Reader, nodeIP string) (string, error) { } func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, error) { - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { return "", fmt.Errorf("new ipfs client: %w", err) } @@ -271,7 +271,7 @@ func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, er } func pinIPFSFile(nodeID int64, fileHash string) error { - agtCli, err := globals.AgentMQPool.Acquire(nodeID) + agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { return fmt.Errorf("new agent client: %w", err) } diff --git a/common/pkgs/cmd/download_package.go b/common/pkgs/cmd/download_package.go index 93cf999..b7c6d3a 100644 --- a/common/pkgs/cmd/download_package.go +++ b/common/pkgs/cmd/download_package.go @@ -6,9 +6,10 @@ import ( "os" "path/filepath" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -33,7 +34,7 @@ func NewDownloadPackage(userID int64, packageID int64, outputPath string) *Downl } func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) } @@ -60,7 +61,7 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { } func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.DownloadingObjectIterator, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -84,7 +85,7 @@ func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.Dow } func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) { - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -100,7 +101,7 @@ func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Pack return nil, fmt.Errorf("getting package object ec data: %w", err) } - var ecInfo models.ECRedundancyInfo + var ecInfo stgsdk.ECRedundancyInfo if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil { return nil, fmt.Errorf("get ec redundancy info: %w", err) } diff --git a/common/pkgs/cmd/update_ec_package.go b/common/pkgs/cmd/update_ec_package.go index 680cbdd..efcd073 100644 --- a/common/pkgs/cmd/update_ec_package.go +++ b/common/pkgs/cmd/update_ec_package.go @@ -4,9 +4,10 @@ import ( "fmt" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/storage/common/globals" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" @@ -34,7 +35,7 @@ func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.Uploadin func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageResult, error) { defer t.objectIter.Close() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -67,7 +68,7 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -79,7 +80,7 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe } }) - var ecInfo models.ECRedundancyInfo + var ecInfo stgsdk.ECRedundancyInfo if ecInfo, err = getPkgResp.Package.Redundancy.ToECInfo(); err != nil { return nil, fmt.Errorf("get ec redundancy info: %w", err) } @@ -92,11 +93,11 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if globals.Local.NodeID != nil { - ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + if stgglb.Local.NodeID != nil { + ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } for _, node := range nodeInfos { - if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID { + if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { continue } diff --git a/common/pkgs/cmd/update_rep_package.go b/common/pkgs/cmd/update_rep_package.go index df0acf0..a42530b 100644 --- a/common/pkgs/cmd/update_rep_package.go +++ b/common/pkgs/cmd/update_rep_package.go @@ -7,7 +7,7 @@ import ( mysort "gitlink.org.cn/cloudream/common/utils/sort" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -39,15 +39,15 @@ func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.Uplo func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackageResult, error) { defer t.objectIter.Close() - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } reqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if globals.Local.NodeID != nil { - reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + if stgglb.Local.NodeID != nil { + reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } mutex, err := reqBlder. Metadata(). @@ -71,7 +71,7 @@ func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackage return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 2de791d..7e421f1 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -3,7 +3,7 @@ package model import ( "time" - "gitlink.org.cn/cloudream/common/models" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type Node struct { @@ -61,7 +61,7 @@ type Package struct { Name string `db:"Name" json:"name"` BucketID int64 `db:"BucketID" json:"bucketID"` State string `db:"State" json:"state"` - Redundancy models.TypedRedundancyInfo `db:"Redundancy" json:"redundancy"` + Redundancy stgsdk.TypedRedundancyInfo `db:"Redundancy" json:"redundancy"` } type Object struct { diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 96c1f4c..d35bedf 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -6,7 +6,7 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/models" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -84,14 +84,14 @@ func (db *ObjectBlockDB) GetBatchBlocksNodes(ctx SQLContext, hashs [][]string) ( return nodes, err } -func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]models.ObjectECData, error) { +func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]stgmod.ObjectECData, error) { var objs []model.Object err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) if err != nil { return nil, fmt.Errorf("query objectIDs: %w", err) } - rets := make([]models.ObjectECData, 0, len(objs)) + rets := make([]stgmod.ObjectECData, 0, len(objs)) for _, obj := range objs { var tmpRets []struct { @@ -111,9 +111,9 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) return nil, err } - blocks := make([]models.ObjectBlockData, 0, len(tmpRets)) + blocks := make([]stgmod.ObjectBlockData, 0, len(tmpRets)) for _, tmp := range tmpRets { - var block models.ObjectBlockData + var block stgmod.ObjectBlockData block.Index = tmp.Index block.FileHash = tmp.FileHash @@ -124,7 +124,7 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) blocks = append(blocks, block) } - rets = append(rets, models.NewObjectECData(obj, blocks)) + rets = append(rets, stgmod.NewObjectECData(obj, blocks)) } return rets, nil diff --git a/common/pkgs/db/object_rep.go b/common/pkgs/db/object_rep.go index e7cf0ae..a3ae054 100644 --- a/common/pkgs/db/object_rep.go +++ b/common/pkgs/db/object_rep.go @@ -8,7 +8,7 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/models" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -79,7 +79,7 @@ func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, return *maxRepCnt, err } -func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]models.ObjectRepData, error) { +func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]stgmod.ObjectRepData, error) { var tmpRets []struct { model.Object FileHash *string `db:"FileHash"` @@ -97,9 +97,9 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ( if err != nil { return nil, err } - rets := make([]models.ObjectRepData, 0, len(tmpRets)) + rets := make([]stgmod.ObjectRepData, 0, len(tmpRets)) for _, tmp := range tmpRets { - var repData models.ObjectRepData + var repData stgmod.ObjectRepData repData.Object = tmp.Object if tmp.FileHash != nil { diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 23d9a74..bf51c5a 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/jmoiron/sqlx" - "gitlink.org.cn/cloudream/common/models" + + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -78,7 +80,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int6 return ret, err } -func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redundancy models.TypedRedundancyInfo) (int64, error) { +func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redundancy stgsdk.TypedRedundancyInfo) (int64, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID) diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index 8ae5b1e..b9b94a1 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -7,9 +7,11 @@ import ( "os" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/storage/common/globals" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" @@ -24,13 +26,13 @@ type ECObjectIterator struct { currentIndex int inited bool - ecInfo models.ECRedundancyInfo + ecInfo stgsdk.ECRedundancyInfo ec model.Ec downloadCtx *DownloadContext cliLocation model.Location } -func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo models.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator { +func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator { return &ECObjectIterator{ objects: objects, objectECData: objectECData, @@ -42,7 +44,7 @@ func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.Object func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { // TODO 加锁 - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -51,7 +53,7 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { if !i.inited { i.inited = true - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index 540eff0..3ae9f28 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -6,11 +6,13 @@ import ( "math/rand" "github.com/samber/lo" + distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/models" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -22,7 +24,7 @@ type RepObjectIterator struct { OnClosing func() objects []model.Object - objectRepData []models.ObjectRepData + objectRepData []stgmod.ObjectRepData currentIndex int inited bool @@ -44,7 +46,7 @@ type DownloadContext struct { Distlock *distsvc.Service } -func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator { +func NewRepObjectIterator(objects []model.Object, objectRepData []stgmod.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator { return &RepObjectIterator{ objects: objects, objectRepData: objectRepData, @@ -54,7 +56,7 @@ func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectR func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { // TODO 加锁 - coorCli, err := globals.CoordinatorMQPool.Acquire() + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } @@ -63,7 +65,7 @@ func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { if !i.inited { i.inited = true - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -137,7 +139,7 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl } func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - if globals.IPFSPool != nil { + if stgglb.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") reader, err := downloadFromLocalIPFS(ctx, fileHash) @@ -162,7 +164,7 @@ func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHas } // 连接grpc - agtCli, err := globals.AgentRPCPool.Acquire(nodeIP) + agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP) if err != nil { return nil, fmt.Errorf("new agent grpc client: %w", err) } @@ -180,11 +182,11 @@ func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHas func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) { onClosed := func() {} - if globals.Local.NodeID != nil { + if stgglb.Local.NodeID != nil { // 二次获取锁 mutex, err := reqbuilder.NewBuilder(). // 用于从IPFS下载文件 - IPFS().ReadOneRep(*globals.Local.NodeID, fileHash). + IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash). MutexLock(ctx.Distlock) if err != nil { return nil, fmt.Errorf("acquire locks failed, err: %w", err) @@ -194,7 +196,7 @@ func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser } } - ipfsCli, err := globals.IPFSPool.Acquire() + ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { return nil, fmt.Errorf("new ipfs client: %w", err) } diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index 1fb24f3..fd194e7 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -1,8 +1,8 @@ package agent import ( - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -96,7 +96,7 @@ type WaitCacheMovePackageResp struct { mq.MessageBodyBase IsComplete bool `json:"isComplete"` Error string `json:"error"` - CacheInfos []models.ObjectCacheInfo `json:"cacheInfos"` + CacheInfos []stgsdk.ObjectCacheInfo `json:"cacheInfos"` } func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { @@ -105,7 +105,7 @@ func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMoveP WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []models.ObjectCacheInfo) *WaitCacheMovePackageResp { +func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []stgsdk.ObjectCacheInfo) *WaitCacheMovePackageResp { return &WaitCacheMovePackageResp{ IsComplete: isComplete, Error: err, diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index 9929b14..d5f692b 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -1,8 +1,9 @@ package agent import ( - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -139,7 +140,7 @@ type StartStorageCreatePackage struct { Name string `json:"name"` StorageID int64 `json:"storageID"` Path string `json:"path"` - Redundancy models.TypedRedundancyInfo `json:"redundancy"` + Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy"` NodeAffinity *int64 `json:"nodeAffinity"` } type StartStorageCreatePackageResp struct { @@ -147,7 +148,7 @@ type StartStorageCreatePackageResp struct { TaskID string `json:"taskID"` } -func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage { +func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy stgsdk.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage { return &StartStorageCreatePackage{ UserID: userID, BucketID: bucketID, diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 047d6d5..ae04e68 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -2,7 +2,8 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/common/models" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" ) type ObjectService interface { @@ -20,7 +21,7 @@ type GetPackageObjectRepData struct { } type GetPackageObjectRepDataResp struct { mq.MessageBodyBase - Data []models.ObjectRepData `json:"data"` + Data []stgmod.ObjectRepData `json:"data"` } func NewGetPackageObjectRepData(packageID int64) *GetPackageObjectRepData { @@ -28,7 +29,7 @@ func NewGetPackageObjectRepData(packageID int64) *GetPackageObjectRepData { PackageID: packageID, } } -func NewGetPackageObjectRepDataResp(data []models.ObjectRepData) *GetPackageObjectRepDataResp { +func NewGetPackageObjectRepDataResp(data []stgmod.ObjectRepData) *GetPackageObjectRepDataResp { return &GetPackageObjectRepDataResp{ Data: data, } @@ -46,7 +47,7 @@ type GetPackageObjectECData struct { } type GetPackageObjectECDataResp struct { mq.MessageBodyBase - Data []models.ObjectECData `json:"data"` + Data []stgmod.ObjectECData `json:"data"` } func NewGetPackageObjectECData(packageID int64) *GetPackageObjectECData { @@ -54,7 +55,7 @@ func NewGetPackageObjectECData(packageID int64) *GetPackageObjectECData { PackageID: packageID, } } -func NewGetPackageObjectECDataResp(data []models.ObjectECData) *GetPackageObjectECDataResp { +func NewGetPackageObjectECDataResp(data []stgmod.ObjectECData) *GetPackageObjectECDataResp { return &GetPackageObjectECDataResp{ Data: data, } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 69771c9..70b0999 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -1,8 +1,9 @@ package coordinator import ( - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -88,14 +89,14 @@ type CreatePackage struct { UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` Name string `json:"name"` - Redundancy models.TypedRedundancyInfo `json:"redundancy"` + Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy"` } type CreatePackageResp struct { mq.MessageBodyBase PackageID int64 `json:"packageID"` } -func NewCreatePackage(userID int64, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *CreatePackage { +func NewCreatePackage(userID int64, bucketID int64, name string, redundancy stgsdk.TypedRedundancyInfo) *CreatePackage { return &CreatePackage{ UserID: userID, BucketID: bucketID, @@ -236,7 +237,7 @@ type PackageCachedNodeInfo struct { type GetPackageCachedNodesResp struct { mq.MessageBodyBase - models.PackageCachingInfo + stgsdk.PackageCachingInfo } func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNodes { @@ -246,9 +247,9 @@ func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNo } } -func NewGetPackageCachedNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp { +func NewGetPackageCachedNodesResp(nodeInfos []stgsdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp { return &GetPackageCachedNodesResp{ - PackageCachingInfo: models.PackageCachingInfo{ + PackageCachingInfo: stgsdk.PackageCachingInfo{ NodeInfos: nodeInfos, PackageSize: packageSize, RedunancyType: redunancyType, diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 9da79c3..679d37d 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -7,9 +7,9 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -215,7 +215,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } var packageSize int64 - nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo) + nodeInfoMap := make(map[int64]*stgsdk.NodePackageCachingInfo) if pkg.Redundancy.IsRepInfo() { // 备份方式为rep objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) @@ -231,7 +231,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c nodeInfo, exists := nodeInfoMap[nodeID] if !exists { - nodeInfo = &models.NodePackageCachingInfo{ + nodeInfo = &stgsdk.NodePackageCachingInfo{ NodeID: nodeID, FileSize: data.Object.Size, ObjectCount: 1, @@ -259,7 +259,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c nodeInfo, exists := nodeInfoMap[nodeID] if !exists { - nodeInfo = &models.NodePackageCachingInfo{ + nodeInfo = &stgsdk.NodePackageCachingInfo{ NodeID: nodeID, FileSize: ecData.Object.Size, ObjectCount: 1, @@ -278,7 +278,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c return nil, mq.Failed(errorcode.OperationFailed, "redundancy type is wrong") } - var nodeInfos []models.NodePackageCachingInfo + var nodeInfos []stgsdk.NodePackageCachingInfo for _, nodeInfo := range nodeInfoMap { nodeInfos = append(nodeInfos, *nodeInfo) } diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index b003141..ec71eb9 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -7,7 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -126,7 +126,7 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca log := logger.WithType[AgentCheckCache]("Event") // 然后向代理端发送移动文件的请求 - agentClient, err := globals.AgentMQPool.Acquire(t.NodeID) + agentClient, err := stgglb.AgentMQPool.Acquire(t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index 2f66425..ac2c6ac 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -61,7 +61,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - agentClient, err := globals.AgentMQPool.Acquire(t.NodeID) + agentClient, err := stgglb.AgentMQPool.Acquire(t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index 3f37ce2..e6ee7d1 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -143,7 +143,7 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage log := logger.WithType[AgentCheckStorage]("Event") // 投递任务 - agentClient, err := globals.AgentMQPool.Acquire(stg.NodeID) + agentClient, err := stgglb.AgentMQPool.Acquire(stg.NodeID) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return diff --git a/scanner/main.go b/scanner/main.go index 09130c0..173af98 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -6,7 +6,7 @@ import ( "sync" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/storage/common/globals" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" @@ -34,7 +34,7 @@ func main() { logger.Fatalf("new db failed, err: %s", err.Error()) } - globals.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitMQPool(&config.Cfg().RabbitMQ) wg := sync.WaitGroup{} wg.Add(3)