diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_ec_package.go index 139467c..8b3df9b 100644 --- a/agent/internal/task/create_ec_package.go +++ b/agent/internal/task/create_ec_package.go @@ -18,9 +18,9 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { +func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage { return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity), } } diff --git a/agent/internal/task/create_rep_package.go b/agent/internal/task/create_rep_package.go deleted file mode 100644 index be011fe..0000000 --- a/agent/internal/task/create_rep_package.go +++ /dev/null @@ -1,40 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type CreateRepPackageResult = cmd.CreateRepPackageResult - -type CreateRepPackage struct { - cmd cmd.CreateRepPackage - - Result *CreateRepPackageResult -} - -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { - return &CreateRepPackage{ - cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), - } -} - -func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - log := logger.WithType[CreateRepPackage]("Task") - log.Debugf("begin") - defer log.Debugf("end") - - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index ef7da0f..79e0b70 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/task" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" ) @@ -12,7 +13,7 @@ type StorageLoadPackage struct { FullPath string } -func NewStorageLoadPackage(userID int64, packageID int64, outputPath string) *StorageLoadPackage { +func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *StorageLoadPackage { return &StorageLoadPackage{ cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), FullPath: outputPath, diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index 5c9342b..1435239 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -21,9 +21,9 @@ func (s *Server) CacheSvc() *CacheService { } type CacheMovePackageReq struct { - UserID *int64 `json:"userID" binding:"required"` - PackageID *int64 `json:"packageID" binding:"required"` - NodeID *int64 `json:"nodeID" binding:"required"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` + NodeID *cdssdk.NodeID `json:"nodeID" binding:"required"` } type CacheMovePackageResp struct { CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` @@ -70,8 +70,8 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } type CacheGetPackageObjectCacheInfosReq struct { - UserID *int64 `form:"userID" binding:"required"` - PackageID *int64 `form:"packageID" binding:"required"` + UserID *cdssdk.UserID `form:"userID" binding:"required"` + PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` } type CacheGetPackageObjectCacheInfosResp = cdssdk.CacheGetPackageObjectCacheInfosResp diff --git a/client/internal/http/object.go b/client/internal/http/object.go index ad84d6d..458b609 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" ) @@ -21,8 +22,8 @@ func (s *Server) ObjectSvc() *ObjectService { } type ObjectDownloadReq struct { - UserID *int64 `form:"userID" binding:"required"` - ObjectID *int64 `form:"objectID" binding:"required"` + UserID *cdssdk.UserID `form:"userID" binding:"required"` + ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` } func (s *ObjectService) Download(ctx *gin.Context) { diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 1fa3694..c259f1f 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -26,8 +26,8 @@ func (s *Server) PackageSvc() *PackageService { } type PackageGetReq struct { - UserID *int64 `form:"userID" binding:"required"` - PackageID *int64 `form:"packageID" binding:"required"` + UserID *cdssdk.UserID `form:"userID" binding:"required"` + PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` } type PackageGetResp struct { model.Package @@ -59,15 +59,14 @@ type PackageUploadReq struct { } type PackageUploadInfo struct { - UserID *int64 `json:"userID" binding:"required"` - BucketID *int64 `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy" binding:"required"` - NodeAffinity *int64 `json:"nodeAffinity"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + BucketID *cdssdk.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` } type PackageUploadResp struct { - PackageID int64 `json:"packageID,string"` + PackageID cdssdk.PackageID `json:"packageID,string"` } func (s *PackageService) Upload(ctx *gin.Context) { @@ -80,77 +79,17 @@ func (s *PackageService) Upload(ctx *gin.Context) { return } - if req.Info.Redundancy.IsRepInfo() { - s.uploadRep(ctx, &req) - return - } - - if req.Info.Redundancy.IsECInfo() { - s.uploadEC(ctx, &req) - return - } - - ctx.JSON(http.StatusForbidden, Failed(errorcode.OperationFailed, "not supported yet")) -} - -func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) { - log := logger.WithField("HTTP", "Package.Upload") - - var err error - var repInfo cdssdk.RepRedundancyInfo - if repInfo, err = req.Info.Redundancy.ToRepInfo(); err != nil { - log.Warnf("parsing rep redundancy config: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) - return - } - - objIter := mapMultiPartFileToUploadingObject(req.Files) - - taskID, err := s.svc.PackageSvc().StartCreatingRepPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, repInfo, req.Info.NodeAffinity) - - if err != nil { - log.Warnf("start uploading rep package task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) - return - } - - for { - complete, createResult, err := s.svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) - if complete { - if err != nil { - log.Warnf("uploading rep package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading rep package failed")) - return - } - - ctx.JSON(http.StatusOK, OK(PackageUploadResp{ - PackageID: createResult.PackageID, - })) - return - } - - if err != nil { - log.Warnf("waiting task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) - return - } - } + s.uploadEC(ctx, &req) } func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { log := logger.WithField("HTTP", "Package.Upload") var err error - var ecInfo cdssdk.ECRedundancyInfo - if ecInfo, err = req.Info.Redundancy.ToECInfo(); err != nil { - log.Warnf("parsing ec redundancy config: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config")) - return - } objIter := mapMultiPartFileToUploadingObject(req.Files) - taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, ecInfo, req.Info.NodeAffinity) + taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) if err != nil { log.Warnf("start uploading ec package task: %s", err.Error()) @@ -182,8 +121,8 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { } type PackageDeleteReq struct { - UserID *int64 `json:"userID" binding:"required"` - PackageID *int64 `json:"packageID" binding:"required"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` } func (s *PackageService) Delete(ctx *gin.Context) { @@ -207,8 +146,8 @@ func (s *PackageService) Delete(ctx *gin.Context) { } type GetCachedNodesReq struct { - UserID *int64 `json:"userID" binding:"required"` - PackageID *int64 `json:"packageID" binding:"required"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` } type GetCachedNodesResp struct { cdssdk.PackageCachingInfo @@ -235,12 +174,12 @@ func (s *PackageService) GetCachedNodes(ctx *gin.Context) { } type GetLoadedNodesReq struct { - UserID *int64 `json:"userID" binding:"required"` - PackageID *int64 `json:"packageID" binding:"required"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` } type GetLoadedNodesResp struct { - NodeIDs []int64 `json:"nodeIDs"` + NodeIDs []cdssdk.NodeID `json:"nodeIDs"` } func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index eed8771..d3d90e5 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -21,9 +21,9 @@ func (s *Server) StorageSvc() *StorageService { } type StorageLoadPackageReq struct { - UserID *int64 `json:"userID" binding:"required"` - PackageID *int64 `json:"packageID" binding:"required"` - StorageID *int64 `json:"storageID" binding:"required"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` + StorageID *cdssdk.StorageID `json:"storageID" binding:"required"` } type StorageLoadPackageResp struct { @@ -73,17 +73,16 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } type StorageCreatePackageReq struct { - UserID *int64 `json:"userID" binding:"required"` - StorageID *int64 `json:"storageID" binding:"required"` - Path string `json:"path" binding:"required"` - BucketID *int64 `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy" binding:"required"` - NodeAffinity *int64 `json:"nodeAffinity"` + UserID *cdssdk.UserID `json:"userID" binding:"required"` + StorageID *cdssdk.StorageID `json:"storageID" binding:"required"` + Path string `json:"path" binding:"required"` + BucketID *cdssdk.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` } type StorageCreatePackageResp struct { - PackageID int64 `json:"packageID"` + PackageID cdssdk.PackageID `json:"packageID"` } func (s *StorageService) CreatePackage(ctx *gin.Context) { @@ -97,7 +96,7 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage( - *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy, req.NodeAffinity) + *req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.NodeAffinity) if err != nil { log.Warnf("start storage create package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed")) @@ -128,8 +127,8 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) { } type StorageGetInfoReq struct { - UserID *int64 `form:"userID" binding:"required"` - StorageID *int64 `form:"storageID" binding:"required"` + UserID *cdssdk.UserID `form:"userID" binding:"required"` + StorageID *cdssdk.StorageID `form:"storageID" binding:"required"` } type StorageGetInfoResp struct { diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 8085496..9e05423 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -3,6 +3,7 @@ package services import ( "fmt" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -17,12 +18,12 @@ func (svc *Service) BucketSvc() *BucketService { return &BucketService{Service: svc} } -func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, error) { +func (svc *BucketService) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { // TODO panic("not implement yet") } -func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { +func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]model.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -37,7 +38,7 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { return resp.Buckets, nil } -func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) { +func (svc *BucketService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -52,7 +53,7 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod return resp.Packages, nil } -func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { +func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return 0, fmt.Errorf("new coordinator client: %w", err) @@ -80,7 +81,7 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, return resp.BucketID, nil } -func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { +func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index bc5c757..1e199c0 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -19,7 +19,7 @@ func (svc *Service) CacheSvc() *CacheService { return &CacheService{Service: svc} } -func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, nodeID int64) (string, error) { +func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) (string, error) { agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { return "", fmt.Errorf("new agent client: %w", err) @@ -34,7 +34,7 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no return startResp.TaskID, nil } -func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []cdssdk.ObjectCacheInfo, error) { +func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, []cdssdk.ObjectCacheInfo, error) { agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { return true, nil, fmt.Errorf("new agent client: %w", err) @@ -57,7 +57,7 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT return true, waitResp.CacheInfos, nil } -func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]cdssdk.ObjectCacheInfo, error) { +func (svc *CacheService) GetPackageObjectCacheInfos(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]cdssdk.ObjectCacheInfo, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 220fecb..2b988cf 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -4,6 +4,7 @@ import ( "fmt" "io" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -17,11 +18,11 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } -func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) { +func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { panic("not implement yet!") } -func (svc *ObjectService) GetPackageObjects(userID int64, packageID int64) ([]model.Object, error) { +func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]model.Object, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 71a847c..8977f92 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -21,7 +21,7 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } -func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64, storageID int64) (string, error) { +func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) { tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID)) return tsk.ID(), nil } @@ -41,7 +41,7 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s } // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID -func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy cdssdk.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) { +func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, nodeAffinity *cdssdk.NodeID) (cdssdk.NodeID, string, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return 0, "", fmt.Errorf("new coordinator client: %w", err) @@ -59,7 +59,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 } defer stgglb.AgentMQPool.Release(agentCli) - startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, redundancy, nodeAffinity)) + startResp, err := agentCli.StartStorageCreatePackage(agtmq.NewStartStorageCreatePackage(userID, bucketID, name, storageID, path, nodeAffinity)) if err != nil { return 0, "", fmt.Errorf("start storage upload package: %w", err) } @@ -67,7 +67,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 return stgResp.NodeID, startResp.TaskID, nil } -func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) { +func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) { agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { // TODO 失败是否要当做任务已经结束? @@ -92,7 +92,7 @@ func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, return true, waitResp.PackageID, nil } -func (svc *StorageService) GetInfo(userID int64, storageID int64) (*model.Storage, error) { +func (svc *StorageService) GetInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_ec_package.go index 26dc61e..4eaecc4 100644 --- a/client/internal/task/create_ec_package.go +++ b/client/internal/task/create_ec_package.go @@ -17,9 +17,9 @@ type CreateECPackage struct { Result *CreateECPackageResult } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { +func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage { return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity), } } diff --git a/client/internal/task/create_rep_package.go b/client/internal/task/create_rep_package.go deleted file mode 100644 index cce5b32..0000000 --- a/client/internal/task/create_rep_package.go +++ /dev/null @@ -1,35 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type CreateRepPackageResult = cmd.CreateRepPackageResult - -type CreateRepPackage struct { - cmd cmd.CreateRepPackage - - Result *CreateRepPackageResult -} - -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { - return &CreateRepPackage{ - cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity), - } -} - -func (t *CreateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go index 42e9a77..35ad828 100644 --- a/client/internal/task/storage_load_package.go +++ b/client/internal/task/storage_load_package.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/task" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -13,14 +14,14 @@ import ( // TODO 可以考虑不用Task来实现这些逻辑 type StorageLoadPackage struct { - userID int64 - packageID int64 - storageID int64 + userID cdssdk.UserID + packageID cdssdk.PackageID + storageID cdssdk.StorageID ResultFullPath string } -func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { +func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { return &StorageLoadPackage{ userID: userID, packageID: packageID, @@ -39,7 +40,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { mutex, err := reqbuilder.NewBuilder(). Metadata(). // 用于判断用户是否有Storage权限 - UserStorage().ReadOne(t.packageID, t.storageID). + UserStorage().ReadOne(t.userID, t.storageID). // 用于判断用户是否有对象权限 UserBucket().ReadAny(). // 用于读取包信息 diff --git a/client/internal/task/update_ec_package.go b/client/internal/task/update_ec_package.go index 02faaf3..15bf9f0 100644 --- a/client/internal/task/update_ec_package.go +++ b/client/internal/task/update_ec_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/task" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -16,7 +17,7 @@ type UpdateECPackage struct { Result *UpdateECPackageResult } -func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateECPackage { +func NewUpdateECPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdateECPackage { return &UpdateECPackage{ cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter), } diff --git a/client/internal/task/update_rep_package.go b/client/internal/task/update_rep_package.go deleted file mode 100644 index 2a83c51..0000000 --- a/client/internal/task/update_rep_package.go +++ /dev/null @@ -1,35 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type UpdateRepPackageResult = cmd.UpdateRepPackageResult - -type UpdateRepPackage struct { - cmd cmd.UpdateRepPackage - - Result *UpdateRepPackageResult -} - -func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage { - return &UpdateRepPackage{ - cmd: *cmd.NewUpdateRepPackage(userID, packageID, objectIter), - } -} - -func (t *UpdateRepPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/common/models/models.go b/common/models/models.go index 641ee70..2a223fd 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -1,50 +1,24 @@ package stgmod -import "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +) /// TODO 将分散在各处的公共结构体定义集中到这里来 -type EC struct { - ID int64 `json:"id"` - K int `json:"k"` - N int `json:"n"` - ChunkSize int `json:"chunkSize"` -} - -func NewEc(id int64, k int, n int, chunkSize int) EC { - return EC{ - ID: id, - K: k, - N: n, - ChunkSize: chunkSize, - } -} - type ObjectBlockData struct { - Index int `json:"index"` - FileHash string `json:"fileHash"` - NodeIDs []int64 `json:"nodeIDs"` + Index int `json:"index"` + FileHash string `json:"fileHash"` + NodeID cdssdk.NodeID `json:"nodeID"` + CachedNodeIDs []cdssdk.NodeID `json:"nodeIDs"` } -func NewObjectBlockData(index int, fileHash string, nodeIDs []int64) ObjectBlockData { +func NewObjectBlockData(index int, fileHash string, nodeID cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockData { return ObjectBlockData{ - Index: index, - FileHash: fileHash, - NodeIDs: nodeIDs, - } -} - -type ObjectRepData struct { - Object model.Object `json:"object"` - FileHash string `json:"fileHash"` - NodeIDs []int64 `json:"nodeIDs"` -} - -func NewObjectRepData(object model.Object, fileHash string, nodeIDs []int64) ObjectRepData { - return ObjectRepData{ - Object: object, - FileHash: fileHash, - NodeIDs: nodeIDs, + Index: index, + FileHash: fileHash, + CachedNodeIDs: cachedNodeIDs, } } @@ -61,7 +35,7 @@ func NewObjectECData(object model.Object, blocks []ObjectBlockData) ObjectECData } type LocalMachineInfo struct { - NodeID *int64 `json:"nodeID"` - ExternalIP string `json:"externalIP"` - LocalIP string `json:"localIP"` + NodeID *cdssdk.NodeID `json:"nodeID"` + ExternalIP string `json:"externalIP"` + LocalIP string `json:"localIP"` } diff --git a/common/pkgs/db/bucket.go b/common/pkgs/db/bucket.go index c7b65b8..cc1d208 100644 --- a/common/pkgs/db/bucket.go +++ b/common/pkgs/db/bucket.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -34,7 +35,7 @@ func (db *BucketDB) GetIDByName(bucketName string) (int64, error) { } // IsAvailable 判断用户是否有指定Bucekt的权限 -func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID int64, userID int64) (bool, error) { +func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) { _, err := db.GetUserBucket(ctx, userID, bucketID) if errors.Is(err, sql.ErrNoRows) { return false, nil @@ -47,7 +48,7 @@ func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID int64, userID int64) (b return true, nil } -func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (model.Bucket, error) { +func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { var ret model.Bucket err := sqlx.Get(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and"+ @@ -56,13 +57,13 @@ func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (mo return ret, err } -func (*BucketDB) GetUserBuckets(ctx SQLContext, userID int64) ([]model.Bucket, error) { +func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) { var ret []model.Bucket err := sqlx.Select(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and UserBucket.BucketID = Bucket.BucketID", userID) return ret, err } -func (db *BucketDB) Create(ctx SQLContext, userID int64, bucketName string) (int64, error) { +func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) { var bucketID int64 err := sqlx.Get(ctx, &bucketID, "select Bucket.BucketID from UserBucket, Bucket where UserBucket.UserID = ? and UserBucket.BucketID = Bucket.BucketID and Bucket.Name = ?", userID, bucketName) if err == nil { @@ -88,10 +89,10 @@ func (db *BucketDB) Create(ctx SQLContext, userID int64, bucketName string) (int return 0, fmt.Errorf("insert into user bucket failed, err: %w", err) } - return bucketID, err + return cdssdk.BucketID(bucketID), err } -func (db *BucketDB) Delete(ctx SQLContext, bucketID int64) error { +func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error { _, err := ctx.Exec("delete from UserBucket where BucketID = ?", bucketID) if err != nil { return fmt.Errorf("delete user bucket failed, err: %w", err) @@ -103,15 +104,15 @@ func (db *BucketDB) Delete(ctx SQLContext, bucketID int64) error { } // 删除Bucket内的Package - var objIDs []int64 - err = sqlx.Select(ctx, &objIDs, "select PackageID from Package where BucketID = ?", bucketID) + var pkgIDs []cdssdk.PackageID + err = sqlx.Select(ctx, &pkgIDs, "select PackageID from Package where BucketID = ?", bucketID) if err != nil { return fmt.Errorf("query package failed, err: %w", err) } - for _, objID := range objIDs { + for _, pkgID := range pkgIDs { // TODO 不一定所有的错误都要中断后续过程 - err = db.Package().SoftDelete(ctx, objID) + err = db.Package().SoftDelete(ctx, pkgID) if err != nil { return fmt.Errorf("set package seleted failed, err: %w", err) } diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 26449d2..c9bef4a 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -4,6 +4,7 @@ import ( "time" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -16,7 +17,7 @@ func (db *DB) Cache() *CacheDB { return &CacheDB{DB: db} } -func (*CacheDB) Get(ctx SQLContext, fileHash string, nodeID int64) (model.Cache, error) { +func (*CacheDB) Get(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) (model.Cache, error) { var ret model.Cache err := sqlx.Get(ctx, &ret, "select * from Cache where FileHash = ? and NodeID = ?", fileHash, nodeID) return ret, err @@ -28,14 +29,14 @@ func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]s return ret, err } -func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID int64) ([]model.Cache, error) { +func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cache, error) { var ret []model.Cache err := sqlx.Select(ctx, &ret, "select * from Cache where NodeID = ?", nodeID) return ret, err } // CreateNew 创建一条新的缓存记录 -func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID int64) error { +func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { _, err := ctx.Exec("insert into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now()) if err != nil { return err @@ -45,12 +46,12 @@ func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID int64) error { } // CreatePinned 创建一条缓存记录,如果已存在,但不是pinned状态,则将其设置为pin状态 -func (*CacheDB) CreatePinned(ctx SQLContext, fileHash string, nodeID int64, priority int) error { +func (*CacheDB) CreatePinned(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, priority int) error { _, err := ctx.Exec("replace into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now(), priority) return err } -func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID int64, priority int) error { +func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { var caches []model.Cache var nowTime = time.Now() for _, hash := range fileHashes { @@ -71,7 +72,7 @@ func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID in } // Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果 -func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID int64) error { +func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, time.Now()) return err } @@ -85,19 +86,19 @@ func (*CacheDB) GetCachingFileNodes(ctx SQLContext, fileHash string) ([]model.No } // DeleteTemp 删除一条Temp状态的记录 -func (*CacheDB) DeleteTemp(ctx SQLContext, fileHash string, nodeID int64) error { +func (*CacheDB) DeleteTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { _, err := ctx.Exec("delete from Cache where FileHash = ? and NodeID = ? and State = ?", fileHash, nodeID, consts.CacheStateTemp) return err } // DeleteNodeAll 删除一个节点所有的记录 -func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID int64) error { +func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID cdssdk.NodeID) error { _, err := ctx.Exec("delete from Cache where NodeID = ?", nodeID) return err } // FindCachingFileUserNodes 在缓存表中查询指定数据所在的节点 -func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash string) ([]model.Node, error) { +func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.NodeID, fileHash string) ([]model.Node, error) { var x []model.Node err := sqlx.Select(ctx, &x, "select Node.* from Cache, UserNode, Node where"+ @@ -106,7 +107,7 @@ func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash return x, err } -func (*CacheDB) SetTemp(ctx SQLContext, fileHash string, nodeID int64) error { +func (*CacheDB) SetTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { _, err := ctx.Exec("update Cache set State = ?, CacheTime = ? where FileHash = ? and NodeID = ?", consts.CacheStateTemp, time.Now(), diff --git a/common/pkgs/db/node.go b/common/pkgs/db/node.go index b43129f..c600348 100644 --- a/common/pkgs/db/node.go +++ b/common/pkgs/db/node.go @@ -4,6 +4,7 @@ import ( "time" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -15,7 +16,7 @@ func (db *DB) Node() *NodeDB { return &NodeDB{DB: db} } -func (db *NodeDB) GetByID(ctx SQLContext, nodeID int64) (model.Node, error) { +func (db *NodeDB) GetByID(ctx SQLContext, nodeID cdssdk.NodeID) (model.Node, error) { var ret model.Node err := sqlx.Get(ctx, &ret, "select * from Node where NodeID = ?", nodeID) return ret, err @@ -28,14 +29,14 @@ func (db *NodeDB) GetAllNodes(ctx SQLContext) ([]model.Node, error) { } // GetUserNodes 根据用户id查询可用node -func (db *NodeDB) GetUserNodes(ctx SQLContext, userID int64) ([]model.Node, error) { +func (db *NodeDB) GetUserNodes(ctx SQLContext, userID cdssdk.UserID) ([]model.Node, error) { var nodes []model.Node err := sqlx.Select(ctx, &nodes, "select Node.* from UserNode, Node where UserNode.NodeID = Node.NodeID and UserNode.UserID=?", userID) return nodes, err } // UpdateState 更新状态,并且设置上次上报时间为现在 -func (db *NodeDB) UpdateState(ctx SQLContext, nodeID int64, state string) error { +func (db *NodeDB) UpdateState(ctx SQLContext, nodeID cdssdk.NodeID, state string) error { _, err := ctx.Exec("update Node set State = ?, LastReportTime = ? where NodeID = ?", state, time.Now(), nodeID) return err } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 55ee3e5..dd0ab58 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -3,8 +3,11 @@ package db import ( "database/sql" "fmt" + "strconv" + "strings" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -18,17 +21,17 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { return &ObjectBlockDB{DB: db} } -func (db *ObjectBlockDB) Create(ctx SQLContext, objectID int64, index int, fileHash string) error { - _, err := ctx.Exec("insert into ObjectBlock values(?,?,?)", objectID, index, fileHash) +func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, fileHash string, nodeID cdssdk.NodeID) error { + _, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, fileHash, nodeID) return err } -func (db *ObjectBlockDB) DeleteObjectAll(ctx SQLContext, objectID int64) error { +func (db *ObjectBlockDB) DeleteObjectAll(ctx SQLContext, objectID cdssdk.ObjectID) error { _, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID) return err } -func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID int64) error { +func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID) return err } @@ -47,44 +50,7 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in return cnt, err } -func (db *ObjectBlockDB) GetBatchObjectBlocks(ctx SQLContext, objectIDs []int64) ([][]string, error) { - blocks := make([][]string, len(objectIDs)) - var err error - for i, objectID := range objectIDs { - var x []model.ObjectBlock - sql := "select * from ObjectBlock where ObjectID=?" - err = db.d.Select(&x, sql, objectID) - xx := make([]string, len(x)) - for ii := 0; ii < len(x); ii++ { - xx[x[ii].Index] = x[ii].FileHash - } - blocks[i] = xx - } - return blocks, err -} - -func (db *ObjectBlockDB) GetBatchBlocksNodes(ctx SQLContext, hashs [][]string) ([][][]int64, error) { - nodes := make([][][]int64, len(hashs)) - var err error - for i, hs := range hashs { - fileNodes := make([][]int64, len(hs)) - for j, h := range hs { - var x []model.Node - err = sqlx.Select(ctx, &x, - "select Node.* from Cache, Node where"+ - " Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned) - xx := make([]int64, len(x)) - for ii := 0; ii < len(x); ii++ { - xx[ii] = x[ii].NodeID - } - fileNodes[j] = xx - } - nodes[i] = fileNodes - } - return nodes, err -} - -func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]stgmod.ObjectECData, error) { +func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectECData, error) { var objs []model.Object err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) if err != nil { @@ -118,7 +84,7 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) block.FileHash = tmp.FileHash if tmp.NodeIDs != nil { - block.NodeIDs = splitIDStringUnsafe(*tmp.NodeIDs) + block.CachedNodeIDs = splitIDStringUnsafe(*tmp.NodeIDs) } blocks = append(blocks, block) @@ -129,3 +95,18 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) return rets, nil } + +// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 +// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 +func splitIDStringUnsafe(idStr string) []cdssdk.NodeID { + idStrs := strings.Split(idStr, ",") + ids := make([]cdssdk.NodeID, 0, len(idStrs)) + + for _, str := range idStrs { + // 假设传入的ID是正确的数字格式 + id, _ := strconv.ParseInt(str, 10, 64) + ids = append(ids, cdssdk.NodeID(id)) + } + + return ids +} diff --git a/common/pkgs/db/object_rep.go b/common/pkgs/db/object_rep.go deleted file mode 100644 index 62407cb..0000000 --- a/common/pkgs/db/object_rep.go +++ /dev/null @@ -1,154 +0,0 @@ -package db - -import ( - "database/sql" - "fmt" - "strconv" - "strings" - - "github.com/jmoiron/sqlx" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/consts" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type ObjectRepDB struct { - *DB -} - -func (db *DB) ObjectRep() *ObjectRepDB { - return &ObjectRepDB{DB: db} -} - -// GetObjectRep 查询对象副本表 -func (db *ObjectRepDB) GetByID(ctx SQLContext, objectID int64) (model.ObjectRep, error) { - var ret model.ObjectRep - err := sqlx.Get(ctx, &ret, "select * from ObjectRep where ObjectID = ?", objectID) - return ret, err -} - -func (db *ObjectRepDB) Create(ctx SQLContext, objectID int64, fileHash string) error { - _, err := ctx.Exec("insert into ObjectRep(ObjectID, FileHash) values(?,?)", objectID, fileHash) - return err -} - -func (db *ObjectRepDB) Update(ctx SQLContext, objectID int64, fileHash string) (int64, error) { - ret, err := ctx.Exec("update ObjectRep set FileHash = ? where ObjectID = ?", fileHash, objectID) - if err != nil { - return 0, err - } - - cnt, err := ret.RowsAffected() - if err != nil { - return 0, fmt.Errorf("get affected rows failed, err: %w", err) - } - - return cnt, nil -} - -func (db *ObjectRepDB) Delete(ctx SQLContext, objectID int64) error { - _, err := ctx.Exec("delete from ObjectRep where ObjectID = ?", objectID) - return err -} - -func (db *ObjectRepDB) DeleteInPackage(ctx SQLContext, packageID int64) error { - _, err := ctx.Exec("delete ObjectRep from ObjectRep inner join Object on ObjectRep.ObjectID = Object.ObjectID where PackageID = ?", packageID) - return err -} - -func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) { - var maxRepCnt *int - err := sqlx.Get(ctx, &maxRepCnt, - "select json_extract(Redundancy, '$.info.repCount') from ObjectRep, Object, Package where FileHash = ? and"+ - " ObjectRep.ObjectID = Object.ObjectID and"+ - " Object.PackageID = Package.PackageID and"+ - " Package.State = ?", fileHash, consts.PackageStateNormal) - - if err == sql.ErrNoRows { - return 0, nil - } - - if err != nil { - return 0, err - } - - if maxRepCnt == nil { - return 0, nil - } - - return *maxRepCnt, err -} - -func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]stgmod.ObjectRepData, error) { - var tmpRets []struct { - model.Object - FileHash *string `db:"FileHash"` - NodeIDs *string `db:"NodeIDs"` - } - - err := sqlx.Select(ctx, - &tmpRets, - "select Object.*, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object"+ - " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+ - " left join Cache on ObjectRep.FileHash = Cache.FileHash"+ - " where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc", - packageID, - ) - if err != nil { - return nil, err - } - rets := make([]stgmod.ObjectRepData, 0, len(tmpRets)) - for _, tmp := range tmpRets { - var repData stgmod.ObjectRepData - repData.Object = tmp.Object - - if tmp.FileHash != nil { - repData.FileHash = *tmp.FileHash - } - - if tmp.NodeIDs != nil { - repData.NodeIDs = splitIDStringUnsafe(*tmp.NodeIDs) - } - - rets = append(rets, repData) - } - - return rets, nil -} - -func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]cdssdk.ObjectCacheInfo, error) { - var tmpRet []struct { - cdssdk.Object - FileHash string `db:"FileHash"` - } - - err := sqlx.Select(ctx, &tmpRet, "select Object.*, ObjectRep.FileHash from Object"+ - " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+ - " where Object.PackageID = ? order by Object.ObjectID asc", packageID) - if err != nil { - return nil, err - } - - ret := make([]cdssdk.ObjectCacheInfo, len(tmpRet)) - for i, r := range tmpRet { - ret[i] = cdssdk.NewObjectCacheInfo(r.Object, r.FileHash) - } - - return ret, nil -} - -// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 -// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 -func splitIDStringUnsafe(idStr string) []int64 { - idStrs := strings.Split(idStr, ",") - ids := make([]int64, 0, len(idStrs)) - - for _, str := range idStrs { - // 假设传入的ID是正确的数字格式 - id, _ := strconv.ParseInt(str, 10, 64) - ids = append(ids, id) - } - - return ids -} diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index f7f40d7..58c6b28 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -8,8 +8,6 @@ import ( "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/serder" - "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -22,35 +20,35 @@ func (db *DB) Package() *PackageDB { return &PackageDB{DB: db} } -func (db *PackageDB) GetByID(ctx SQLContext, packageID int64) (model.Package, error) { +func (db *PackageDB) GetByID(ctx SQLContext, packageID cdssdk.PackageID) (model.Package, error) { var ret model.Package err := sqlx.Get(ctx, &ret, "select * from Package where PackageID = ?", packageID) return ret, err } -func (db *PackageDB) GetByName(ctx SQLContext, bucketID int64, name string) (model.Package, error) { +func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name string) (model.Package, error) { var ret model.Package err := sqlx.Get(ctx, &ret, "select * from Package where BucketID = ? and Name = ?", bucketID, name) return ret, err } -func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]int64, error) { - var ret []int64 +func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) { + var ret []cdssdk.PackageID err := sqlx.Select(ctx, &ret, "select PackageID from Package limit ?, ?", start, count) return ret, err } -func (db *PackageDB) GetBucketPackages(ctx SQLContext, userID int64, bucketID int64) ([]model.Package, error) { +func (db *PackageDB) GetBucketPackages(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) { var ret []model.Package err := sqlx.Select(ctx, &ret, "select Package.* from UserBucket, Package where UserID = ? and UserBucket.BucketID = ? and UserBucket.BucketID = Package.BucketID", userID, bucketID) return ret, err } // IsAvailable 判断一个用户是否拥有指定对象 -func (db *PackageDB) IsAvailable(ctx SQLContext, userID int64, packageID int64) (bool, error) { - var objID int64 +func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) { + var pkgID cdssdk.PackageID // 先根据PackageID找到Package,然后判断此Package所在的Bucket是不是归此用户所有 - err := sqlx.Get(ctx, &objID, + err := sqlx.Get(ctx, &pkgID, "select Package.PackageID from Package, UserBucket where "+ "Package.PackageID = ? and "+ "Package.BucketID = UserBucket.BucketID and "+ @@ -69,7 +67,7 @@ func (db *PackageDB) IsAvailable(ctx SQLContext, userID int64, packageID int64) } // GetUserPackage 获得Package,如果用户没有权限访问,则不会获得结果 -func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int64) (model.Package, error) { +func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (model.Package, error) { var ret model.Package err := sqlx.Get(ctx, &ret, "select Package.* from Package, UserBucket where"+ @@ -80,7 +78,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int6 return ret, err } -func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) (int64, error) { +func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID) @@ -93,13 +91,8 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redunda return 0, fmt.Errorf("query Package by PackageName and BucketID failed, err: %w", err) } - redundancyJSON, err := serder.ObjectToJSON(redundancy) - if err != nil { - return 0, fmt.Errorf("redundancy to json: %w", err) - } - - sql := "insert into Package(Name, BucketID, State, Redundancy) values(?,?,?,?)" - r, err := ctx.Exec(sql, name, bucketID, consts.PackageStateNormal, redundancyJSON) + sql := "insert into Package(Name, BucketID, State) values(?,?,?)" + r, err := ctx.Exec(sql, name, bucketID, consts.PackageStateNormal) if err != nil { return 0, fmt.Errorf("insert package failed, err: %w", err) } @@ -109,11 +102,11 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redunda return 0, fmt.Errorf("get id of inserted package failed, err: %w", err) } - return packageID, nil + return cdssdk.PackageID(packageID), nil } // SoftDelete 设置一个对象被删除,并将相关数据删除 -func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error { +func (db *PackageDB) SoftDelete(ctx SQLContext, packageID cdssdk.PackageID) error { obj, err := db.GetByID(ctx, packageID) if err != nil { return fmt.Errorf("get package failed, err: %w", err) @@ -130,16 +123,9 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error { return fmt.Errorf("change package state failed, err: %w", err) } - if obj.Redundancy.IsRepInfo() { - err = db.ObjectRep().DeleteInPackage(ctx, packageID) - if err != nil { - return fmt.Errorf("delete from object rep failed, err: %w", err) - } - } else { - err = db.ObjectBlock().DeleteInPackage(ctx, packageID) - if err != nil { - return fmt.Errorf("delete from object rep failed, err: %w", err) - } + err = db.ObjectBlock().DeleteInPackage(ctx, packageID) + if err != nil { + return fmt.Errorf("delete from object rep failed, err: %w", err) } if err := db.Object().DeleteInPackage(ctx, packageID); err != nil { @@ -155,7 +141,7 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error { } // DeleteUnused 删除一个已经是Deleted状态,且不再被使用的对象。目前可能被使用的地方只有StoragePackage -func (PackageDB) DeleteUnused(ctx SQLContext, packageID int64) error { +func (PackageDB) DeleteUnused(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete from Package where PackageID = ? and State = ? and "+ "not exists(select StorageID from StoragePackage where PackageID = ?)", packageID, @@ -166,7 +152,7 @@ func (PackageDB) DeleteUnused(ctx SQLContext, packageID int64) error { return err } -func (*PackageDB) ChangeState(ctx SQLContext, packageID int64, state string) error { +func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state string) error { _, err := ctx.Exec("update Package set State = ? where PackageID = ?", state, packageID) return err } diff --git a/common/pkgs/db/storage.go b/common/pkgs/db/storage.go index efd2133..93582d0 100644 --- a/common/pkgs/db/storage.go +++ b/common/pkgs/db/storage.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -16,19 +17,19 @@ func (db *DB) Storage() *StorageDB { return &StorageDB{DB: db} } -func (db *StorageDB) GetByID(ctx SQLContext, stgID int64) (model.Storage, error) { +func (db *StorageDB) GetByID(ctx SQLContext, stgID cdssdk.StorageID) (model.Storage, error) { var stg model.Storage err := sqlx.Get(ctx, &stg, "select * from Storage where StorageID = ?", stgID) return stg, err } -func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]int64, error) { - var ret []int64 +func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) { + var ret []cdssdk.StorageID err := sqlx.Select(ctx, &ret, "select StorageID from Storage limit ?, ?", start, count) return ret, err } -func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) (bool, error) { +func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { var stgID int64 err := sqlx.Get(ctx, &stgID, "select Storage.StorageID from Storage, UserStorage where"+ @@ -48,7 +49,7 @@ func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) return true, nil } -func (db *StorageDB) GetUserStorage(ctx SQLContext, userID int64, storageID int64) (model.Storage, error) { +func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (model.Storage, error) { var stg model.Storage err := sqlx.Get(ctx, &stg, "select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = ? and UserStorage.StorageID = Storage.StorageID", @@ -58,7 +59,7 @@ func (db *StorageDB) GetUserStorage(ctx SQLContext, userID int64, storageID int6 return stg, err } -func (db *StorageDB) ChangeState(ctx SQLContext, storageID int64, state string) error { +func (db *StorageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, state string) error { _, err := ctx.Exec("update Storage set State = ? where StorageID = ?", state, storageID) return err } diff --git a/common/pkgs/db/storage_package.go b/common/pkgs/db/storage_package.go index 550bcea..f201634 100644 --- a/common/pkgs/db/storage_package.go +++ b/common/pkgs/db/storage_package.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -16,36 +17,36 @@ func (db *DB) StoragePackage() *StoragePackageDB { return &StoragePackageDB{DB: db} } -func (*StoragePackageDB) Get(ctx SQLContext, storageID int64, packageID int64, userID int64) (model.StoragePackage, error) { +func (*StoragePackageDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackage, error) { var ret model.StoragePackage err := sqlx.Get(ctx, &ret, "select * from StoragePackage where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) return ret, err } -func (*StoragePackageDB) GetAllByStorageAndPackageID(ctx SQLContext, storageID int64, packageID int64) ([]model.StoragePackage, error) { +func (*StoragePackageDB) GetAllByStorageAndPackageID(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID) ([]model.StoragePackage, error) { var ret []model.StoragePackage err := sqlx.Select(ctx, &ret, "select * from StoragePackage where StorageID = ? and PackageID = ?", storageID, packageID) return ret, err } -func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID int64) ([]model.StoragePackage, error) { +func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.StorageID) ([]model.StoragePackage, error) { var ret []model.StoragePackage err := sqlx.Select(ctx, &ret, "select * from StoragePackage where StorageID = ?", storageID) return ret, err } -func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID int64, storageID int64, userID int64) error { +func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID, userID cdssdk.UserID) error { _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal) return err } -func (*StoragePackageDB) ChangeState(ctx SQLContext, storageID int64, packageID int64, userID int64, state string) error { +func (*StoragePackageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, state string) error { _, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ?", state, storageID, packageID, userID) return err } // SetStateNormal 将状态设置为Normal,如果记录状态是Deleted,则不进行操作 -func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID int64, packageID int64, userID int64) error { +func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { _, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ? and State <> ?", consts.StoragePackageStateNormal, storageID, @@ -56,7 +57,7 @@ func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID int64, package return err } -func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID int64, state string) (int64, error) { +func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID cdssdk.PackageID, state string) (int64, error) { ret, err := ctx.Exec( "update StoragePackage set State = ? where PackageID = ?", state, @@ -76,7 +77,7 @@ func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID int64, sta // SetAllPackageOutdated 将Storage中指定对象设置为已过期。 // 注:只会设置Normal状态的对象 -func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID int64) (int64, error) { +func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) { ret, err := ctx.Exec( "update StoragePackage set State = ? where State = ? and PackageID = ?", consts.StoragePackageStateOutdated, @@ -95,17 +96,17 @@ func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID int64) return cnt, nil } -func (db *StoragePackageDB) SetAllPackageDeleted(ctx SQLContext, packageID int64) (int64, error) { +func (db *StoragePackageDB) SetAllPackageDeleted(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) { return db.SetAllPackageState(ctx, packageID, consts.StoragePackageStateDeleted) } -func (*StoragePackageDB) Delete(ctx SQLContext, storageID int64, packageID int64, userID int64) error { +func (*StoragePackageDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { _, err := ctx.Exec("delete from StoragePackage where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) return err } // FindPackageStorages 查询存储了指定对象的Storage -func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID int64) ([]model.Storage, error) { +func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Storage, error) { var ret []model.Storage err := sqlx.Select(ctx, &ret, "select Storage.* from StoragePackage, Storage where PackageID = ? and"+ diff --git a/common/pkgs/distlock/reqbuilder/ipfs.go b/common/pkgs/distlock/reqbuilder/ipfs.go index d452860..a33c785 100644 --- a/common/pkgs/distlock/reqbuilder/ipfs.go +++ b/common/pkgs/distlock/reqbuilder/ipfs.go @@ -4,6 +4,7 @@ import ( "strconv" "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -14,7 +15,7 @@ type IPFSLockReqBuilder struct { func (b *LockRequestBuilder) IPFS() *IPFSLockReqBuilder { return &IPFSLockReqBuilder{LockRequestBuilder: b} } -func (b *IPFSLockReqBuilder) ReadOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder { +func (b *IPFSLockReqBuilder) ReadOneRep(nodeID cdssdk.NodeID, fileHash string) *IPFSLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(nodeID), Name: lockprovider.IPFS_ELEMENT_READ_LOCK, @@ -23,7 +24,7 @@ func (b *IPFSLockReqBuilder) ReadOneRep(nodeID int64, fileHash string) *IPFSLock return b } -func (b *IPFSLockReqBuilder) WriteOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder { +func (b *IPFSLockReqBuilder) WriteOneRep(nodeID cdssdk.NodeID, fileHash string) *IPFSLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(nodeID), Name: lockprovider.IPFS_ELEMENT_WRITE_LOCK, @@ -32,7 +33,7 @@ func (b *IPFSLockReqBuilder) WriteOneRep(nodeID int64, fileHash string) *IPFSLoc return b } -func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID int64) *IPFSLockReqBuilder { +func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID cdssdk.NodeID) *IPFSLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(nodeID), Name: lockprovider.IPFS_SET_READ_LOCK, @@ -41,7 +42,7 @@ func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID int64) *IPFSLockReqBuilder { return b } -func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID int64) *IPFSLockReqBuilder { +func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID cdssdk.NodeID) *IPFSLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(nodeID), Name: lockprovider.IPFS_SET_WRITE_LOCK, @@ -50,7 +51,7 @@ func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID int64) *IPFSLockReqBuilder { return b } -func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID int64) *IPFSLockReqBuilder { +func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID cdssdk.NodeID) *IPFSLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(nodeID), Name: lockprovider.IPFS_SET_CREATE_LOCK, @@ -59,6 +60,6 @@ func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID int64) *IPFSLockReqBuilder { return b } -func (b *IPFSLockReqBuilder) makePath(nodeID int64) []string { - return []string{lockprovider.IPFSLockPathPrefix, strconv.FormatInt(nodeID, 10)} +func (b *IPFSLockReqBuilder) makePath(nodeID cdssdk.NodeID) []string { + return []string{lockprovider.IPFSLockPathPrefix, strconv.FormatInt(int64(nodeID), 10)} } diff --git a/common/pkgs/distlock/reqbuilder/metadata_bucket.go b/common/pkgs/distlock/reqbuilder/metadata_bucket.go index 622ed02..c06d003 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_bucket.go +++ b/common/pkgs/distlock/reqbuilder/metadata_bucket.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) Bucket() *MetadataBucketLockReqBuilder { return &MetadataBucketLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataBucketLockReqBuilder) ReadOne(bucketID int64) *MetadataBucketLockReqBuilder { +func (b *MetadataBucketLockReqBuilder) ReadOne(bucketID cdssdk.BucketID) *MetadataBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Bucket"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataBucketLockReqBuilder) ReadOne(bucketID int64) *MetadataBucketLo }) return b } -func (b *MetadataBucketLockReqBuilder) WriteOne(bucketID int64) *MetadataBucketLockReqBuilder { +func (b *MetadataBucketLockReqBuilder) WriteOne(bucketID cdssdk.BucketID) *MetadataBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Bucket"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataBucketLockReqBuilder) WriteOne(bucketID int64) *MetadataBucketL }) return b } -func (b *MetadataBucketLockReqBuilder) CreateOne(userID int64, bucketName string) *MetadataBucketLockReqBuilder { +func (b *MetadataBucketLockReqBuilder) CreateOne(userID cdssdk.UserID, bucketName string) *MetadataBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Bucket"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_cache.go b/common/pkgs/distlock/reqbuilder/metadata_cache.go index e8f9d1b..9109057 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_cache.go +++ b/common/pkgs/distlock/reqbuilder/metadata_cache.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) Cache() *MetadataCacheLockReqBuilder { return &MetadataCacheLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataCacheLockReqBuilder) ReadOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { +func (b *MetadataCacheLockReqBuilder) ReadOne(nodeID cdssdk.NodeID, fileHash string) *MetadataCacheLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Cache"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataCacheLockReqBuilder) ReadOne(nodeID int64, fileHash string) *Me }) return b } -func (b *MetadataCacheLockReqBuilder) WriteOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { +func (b *MetadataCacheLockReqBuilder) WriteOne(nodeID cdssdk.NodeID, fileHash string) *MetadataCacheLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Cache"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataCacheLockReqBuilder) WriteOne(nodeID int64, fileHash string) *M }) return b } -func (b *MetadataCacheLockReqBuilder) CreateOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { +func (b *MetadataCacheLockReqBuilder) CreateOne(nodeID cdssdk.NodeID, fileHash string) *MetadataCacheLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Cache"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_node.go b/common/pkgs/distlock/reqbuilder/metadata_node.go index e0572e9..bf09061 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_node.go +++ b/common/pkgs/distlock/reqbuilder/metadata_node.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) Node() *MetadataNodeLockReqBuilder { return &MetadataNodeLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataNodeLockReqBuilder) ReadOne(nodeID int64) *MetadataNodeLockReqBuilder { +func (b *MetadataNodeLockReqBuilder) ReadOne(nodeID cdssdk.NodeID) *MetadataNodeLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Node"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataNodeLockReqBuilder) ReadOne(nodeID int64) *MetadataNodeLockReqB }) return b } -func (b *MetadataNodeLockReqBuilder) WriteOne(nodeID int64) *MetadataNodeLockReqBuilder { +func (b *MetadataNodeLockReqBuilder) WriteOne(nodeID cdssdk.NodeID) *MetadataNodeLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Node"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_package.go b/common/pkgs/distlock/reqbuilder/metadata_package.go index 37e576e..1f8eb1b 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_package.go +++ b/common/pkgs/distlock/reqbuilder/metadata_package.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) Package() *MetadataPackageLockReqBuilder { return &MetadataPackageLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataPackageLockReqBuilder) ReadOne(packageID int64) *MetadataPackageLockReqBuilder { +func (b *MetadataPackageLockReqBuilder) ReadOne(packageID cdssdk.PackageID) *MetadataPackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Package"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataPackageLockReqBuilder) ReadOne(packageID int64) *MetadataPackag }) return b } -func (b *MetadataPackageLockReqBuilder) WriteOne(packageID int64) *MetadataPackageLockReqBuilder { +func (b *MetadataPackageLockReqBuilder) WriteOne(packageID cdssdk.PackageID) *MetadataPackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Package"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataPackageLockReqBuilder) WriteOne(packageID int64) *MetadataPacka }) return b } -func (b *MetadataPackageLockReqBuilder) CreateOne(bucketID int64, packageName string) *MetadataPackageLockReqBuilder { +func (b *MetadataPackageLockReqBuilder) CreateOne(bucketID cdssdk.BucketID, packageName string) *MetadataPackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Package"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_storage_package.go b/common/pkgs/distlock/reqbuilder/metadata_storage_package.go index 964c70d..bb53f3c 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_storage_package.go +++ b/common/pkgs/distlock/reqbuilder/metadata_storage_package.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) StoragePackage() *MetadataStoragePackageLockReq return &MetadataStoragePackageLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataStoragePackageLockReqBuilder) ReadOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { +func (b *MetadataStoragePackageLockReqBuilder) ReadOne(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *MetadataStoragePackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("StoragePackage"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataStoragePackageLockReqBuilder) ReadOne(storageID int64, userID i }) return b } -func (b *MetadataStoragePackageLockReqBuilder) WriteOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { +func (b *MetadataStoragePackageLockReqBuilder) WriteOne(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *MetadataStoragePackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("StoragePackage"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataStoragePackageLockReqBuilder) WriteOne(storageID int64, userID }) return b } -func (b *MetadataStoragePackageLockReqBuilder) CreateOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { +func (b *MetadataStoragePackageLockReqBuilder) CreateOne(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *MetadataStoragePackageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("StoragePackage"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_user_bucket.go b/common/pkgs/distlock/reqbuilder/metadata_user_bucket.go index 846f434..a7220b4 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_user_bucket.go +++ b/common/pkgs/distlock/reqbuilder/metadata_user_bucket.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) UserBucket() *MetadataUserBucketLockReqBuilder return &MetadataUserBucketLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataUserBucketLockReqBuilder) ReadOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { +func (b *MetadataUserBucketLockReqBuilder) ReadOne(userID cdssdk.UserID, bucketID cdssdk.BucketID) *MetadataUserBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserBucket"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataUserBucketLockReqBuilder) ReadOne(userID int64, bucketID int64) }) return b } -func (b *MetadataUserBucketLockReqBuilder) WriteOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { +func (b *MetadataUserBucketLockReqBuilder) WriteOne(userID cdssdk.UserID, bucketID cdssdk.BucketID) *MetadataUserBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserBucket"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataUserBucketLockReqBuilder) WriteOne(userID int64, bucketID int64 }) return b } -func (b *MetadataUserBucketLockReqBuilder) CreateOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { +func (b *MetadataUserBucketLockReqBuilder) CreateOne(userID cdssdk.UserID, bucketID cdssdk.BucketID) *MetadataUserBucketLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserBucket"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/metadata_user_storage.go b/common/pkgs/distlock/reqbuilder/metadata_user_storage.go index 7a8b35d..74cda6e 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_user_storage.go +++ b/common/pkgs/distlock/reqbuilder/metadata_user_storage.go @@ -2,6 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -13,7 +14,7 @@ func (b *MetadataLockReqBuilder) UserStorage() *MetadataUserStorageLockReqBuilde return &MetadataUserStorageLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataUserStorageLockReqBuilder) ReadOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { +func (b *MetadataUserStorageLockReqBuilder) ReadOne(userID cdssdk.UserID, storageID cdssdk.StorageID) *MetadataUserStorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserStorage"), Name: lockprovider.METADATA_ELEMENT_READ_LOCK, @@ -21,7 +22,7 @@ func (b *MetadataUserStorageLockReqBuilder) ReadOne(userID int64, storageID int6 }) return b } -func (b *MetadataUserStorageLockReqBuilder) WriteOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { +func (b *MetadataUserStorageLockReqBuilder) WriteOne(userID cdssdk.UserID, storageID cdssdk.StorageID) *MetadataUserStorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserStorage"), Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, @@ -29,7 +30,7 @@ func (b *MetadataUserStorageLockReqBuilder) WriteOne(userID int64, storageID int }) return b } -func (b *MetadataUserStorageLockReqBuilder) CreateOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { +func (b *MetadataUserStorageLockReqBuilder) CreateOne(userID cdssdk.UserID, storageID cdssdk.StorageID) *MetadataUserStorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("UserStorage"), Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, diff --git a/common/pkgs/distlock/reqbuilder/storage.go b/common/pkgs/distlock/reqbuilder/storage.go index f19eb4c..e143743 100644 --- a/common/pkgs/distlock/reqbuilder/storage.go +++ b/common/pkgs/distlock/reqbuilder/storage.go @@ -4,6 +4,7 @@ import ( "strconv" "gitlink.org.cn/cloudream/common/pkgs/distlock" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/lockprovider" ) @@ -15,7 +16,7 @@ func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder { return &StorageLockReqBuilder{LockRequestBuilder: b} } -func (b *StorageLockReqBuilder) ReadOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) ReadOnePackage(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_ELEMENT_READ_LOCK, @@ -24,7 +25,7 @@ func (b *StorageLockReqBuilder) ReadOnePackage(storageID int64, userID int64, pa return b } -func (b *StorageLockReqBuilder) WriteOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) WriteOnePackage(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, @@ -33,7 +34,7 @@ func (b *StorageLockReqBuilder) WriteOnePackage(storageID int64, userID int64, p return b } -func (b *StorageLockReqBuilder) CreateOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) CreateOnePackage(storageID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, @@ -42,7 +43,7 @@ func (b *StorageLockReqBuilder) CreateOnePackage(storageID int64, userID int64, return b } -func (b *StorageLockReqBuilder) ReadAnyPackage(storageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) ReadAnyPackage(storageID cdssdk.StorageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_SET_READ_LOCK, @@ -51,7 +52,7 @@ func (b *StorageLockReqBuilder) ReadAnyPackage(storageID int64) *StorageLockReqB return b } -func (b *StorageLockReqBuilder) WriteAnyPackage(storageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) WriteAnyPackage(storageID cdssdk.StorageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_SET_WRITE_LOCK, @@ -60,7 +61,7 @@ func (b *StorageLockReqBuilder) WriteAnyPackage(storageID int64) *StorageLockReq return b } -func (b *StorageLockReqBuilder) CreateAnyPackage(storageID int64) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) CreateAnyPackage(storageID cdssdk.StorageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.STORAGE_SET_CREATE_LOCK, @@ -69,6 +70,6 @@ func (b *StorageLockReqBuilder) CreateAnyPackage(storageID int64) *StorageLockRe return b } -func (b *StorageLockReqBuilder) makePath(storageID int64) []string { - return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(storageID, 10)} +func (b *StorageLockReqBuilder) makePath(storageID cdssdk.StorageID) []string { + return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(int64(storageID), 10)} } diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index bfeb776..613bd2b 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -5,14 +5,14 @@ import ( "io" "sync" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" - stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type ECCompute struct { - EC stgmod.EC `json:"ec"` + EC cdssdk.ECRedundancy `json:"ec"` InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputIDs []ioswitch.StreamID `json:"outputIDs"` InputBlockIndexes []int `json:"inputBlockIndexes"` @@ -55,7 +55,7 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } type ECReconstruct struct { - EC stgmod.EC `json:"ec"` + EC cdssdk.ECRedundancy `json:"ec"` InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputIDs []ioswitch.StreamID `json:"outputIDs"` InputBlockIndexes []int `json:"inputBlockIndexes"` diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go index fed7efb..8bdcf0e 100644 --- a/common/pkgs/ioswitch/plans/agent_plan.go +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -1,7 +1,7 @@ package plans import ( - stgmod "gitlink.org.cn/cloudream/storage/common/models" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" @@ -102,7 +102,7 @@ func (b *AgentStream) FileWrite(filePath string) { }) } -func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { +func (b *AgentPlanBuilder) ECCompute(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { mstr := &MultiStream{} var inputStrIDs []ioswitch.StreamID @@ -131,7 +131,7 @@ func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBloc return mstr } -func (b *AgentPlanBuilder) ECReconstruct(ec stgmod.EC, inBlockIndexes []int, streams ...*AgentStream) *MultiStream { +func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes []int, streams ...*AgentStream) *MultiStream { mstr := &MultiStream{} var inputStrIDs []ioswitch.StreamID diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index e6207cd..771eb4a 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/google/uuid" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -14,7 +15,7 @@ type StreamInfo struct { type PlanBuilder struct { streams []*StreamInfo - agentPlans map[int64]*AgentPlanBuilder + agentPlans map[cdssdk.NodeID]*AgentPlanBuilder } func (b *PlanBuilder) Build() (*ComposedPlan, error) { @@ -48,7 +49,7 @@ func (b *PlanBuilder) newStream() *StreamInfo { func NewPlanBuilder() PlanBuilder { return PlanBuilder{ - agentPlans: make(map[int64]*AgentPlanBuilder), + agentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), } } diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index 96cdfdd..6976807 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -61,15 +61,15 @@ var _ = Register(Service.StartCacheMovePackage) type StartCacheMovePackage struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type StartCacheMovePackageResp struct { mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartCacheMovePackage(userID int64, packageID int64) *StartCacheMovePackage { +func NewStartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *StartCacheMovePackage { return &StartCacheMovePackage{ UserID: userID, PackageID: packageID, diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/agent/client.go index de7d134..d984e47 100644 --- a/common/pkgs/mq/agent/client.go +++ b/common/pkgs/mq/agent/client.go @@ -2,16 +2,17 @@ package agent import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Client struct { rabbitCli *mq.RabbitMQTransport - id int64 + id cdssdk.NodeID } -func NewClient(id int64, cfg *stgmq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(id), "") +func NewClient(id cdssdk.NodeID, cfg *stgmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(int64(id)), "") if err != nil { return nil, err } @@ -27,7 +28,7 @@ func (c *Client) Close() { } type Pool interface { - Acquire(id int64) (*Client, error) + Acquire(id cdssdk.NodeID) (*Client, error) Release(cli *Client) } @@ -40,7 +41,7 @@ func NewPool(mqcfg *stgmq.Config) Pool { mqcfg: mqcfg, } } -func (p *pool) Acquire(id int64) (*Client, error) { +func (p *pool) Acquire(id cdssdk.NodeID) (*Client, error) { return NewClient(id, p.mqcfg) } diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index a90349f..f31e956 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -24,16 +24,16 @@ var _ = Register(Service.StartStorageLoadPackage) type StartStorageLoadPackage struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StorageID int64 `json:"storageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + StorageID cdssdk.StorageID `json:"storageID"` } type StartStorageLoadPackageResp struct { mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartStorageLoadPackage(userID int64, packageID int64, storageID int64) *StartStorageLoadPackage { +func NewStartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StartStorageLoadPackage { return &StartStorageLoadPackage{ UserID: userID, PackageID: packageID, @@ -91,7 +91,7 @@ const ( type StorageCheck struct { mq.MessageBodyBase - StorageID int64 `json:"storageID"` + StorageID cdssdk.StorageID `json:"storageID"` Directory string `json:"directory"` IsComplete bool `json:"isComplete"` Packages []model.StoragePackage `json:"packages"` @@ -102,12 +102,12 @@ type StorageCheckResp struct { Entries []StorageCheckRespEntry `json:"entries"` } type StorageCheckRespEntry struct { - PackageID int64 `json:"packageID"` - UserID int64 `json:"userID"` - Operation string `json:"operation"` + PackageID cdssdk.PackageID `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + Operation string `json:"operation"` } -func NewStorageCheck(storageID int64, directory string, isComplete bool, packages []model.StoragePackage) *StorageCheck { +func NewStorageCheck(storageID cdssdk.StorageID, directory string, isComplete bool, packages []model.StoragePackage) *StorageCheck { return &StorageCheck{ StorageID: storageID, Directory: directory, @@ -121,7 +121,7 @@ func NewStorageCheckResp(dirState string, entries []StorageCheckRespEntry) *Stor Entries: entries, } } -func NewStorageCheckRespEntry(packageID int64, userID int64, op string) StorageCheckRespEntry { +func NewStorageCheckRespEntry(packageID cdssdk.PackageID, userID cdssdk.UserID, op string) StorageCheckRespEntry { return StorageCheckRespEntry{ PackageID: packageID, UserID: userID, @@ -137,27 +137,25 @@ var _ = Register(Service.StartStorageCreatePackage) type StartStorageCreatePackage struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - StorageID int64 `json:"storageID"` - Path string `json:"path"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"` - NodeAffinity *int64 `json:"nodeAffinity"` + UserID cdssdk.UserID `json:"userID"` + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` + StorageID cdssdk.StorageID `json:"storageID"` + Path string `json:"path"` + NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` } type StartStorageCreatePackageResp struct { mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy cdssdk.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage { +func NewStartStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, nodeAffinity *cdssdk.NodeID) *StartStorageCreatePackage { return &StartStorageCreatePackage{ UserID: userID, BucketID: bucketID, Name: name, StorageID: storageID, Path: path, - Redundancy: redundancy, NodeAffinity: nodeAffinity, } } @@ -180,9 +178,9 @@ type WaitStorageCreatePackage struct { } type WaitStorageCreatePackageResp struct { mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` - PackageID int64 `json:"packageID"` + IsComplete bool `json:"isComplete"` + Error string `json:"error"` + PackageID cdssdk.PackageID `json:"packageID"` } func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) *WaitStorageCreatePackage { @@ -191,7 +189,7 @@ func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) *WaitStorag WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID int64) *WaitStorageCreatePackageResp { +func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID cdssdk.PackageID) *WaitStorageCreatePackageResp { return &WaitStorageCreatePackageResp{ IsComplete: isComplete, Error: err, diff --git a/common/pkgs/mq/coordinator/bucket.go b/common/pkgs/mq/coordinator/bucket.go index 08828b0..a129bac 100644 --- a/common/pkgs/mq/coordinator/bucket.go +++ b/common/pkgs/mq/coordinator/bucket.go @@ -2,6 +2,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -20,14 +21,14 @@ var _ = Register(Service.GetUserBuckets) type GetUserBuckets struct { mq.MessageBodyBase - UserID int64 `json:"userID"` + UserID cdssdk.UserID `json:"userID"` } type GetUserBucketsResp struct { mq.MessageBodyBase Buckets []model.Bucket `json:"buckets"` } -func NewGetUserBuckets(userID int64) *GetUserBuckets { +func NewGetUserBuckets(userID cdssdk.UserID) *GetUserBuckets { return &GetUserBuckets{ UserID: userID, } @@ -46,15 +47,15 @@ var _ = Register(Service.GetBucketPackages) type GetBucketPackages struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` + UserID cdssdk.UserID `json:"userID"` + BucketID cdssdk.BucketID `json:"bucketID"` } type GetBucketPackagesResp struct { mq.MessageBodyBase Packages []model.Package `json:"packages"` } -func NewGetBucketPackages(userID int64, bucketID int64) *GetBucketPackages { +func NewGetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) *GetBucketPackages { return &GetBucketPackages{ UserID: userID, BucketID: bucketID, @@ -74,21 +75,21 @@ var _ = Register(Service.CreateBucket) type CreateBucket struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - BucketName string `json:"bucketName"` + UserID cdssdk.UserID `json:"userID"` + BucketName string `json:"bucketName"` } type CreateBucketResp struct { mq.MessageBodyBase - BucketID int64 `json:"bucketID"` + BucketID cdssdk.BucketID `json:"bucketID"` } -func NewCreateBucket(userID int64, bucketName string) *CreateBucket { +func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { return &CreateBucket{ UserID: userID, BucketName: bucketName, } } -func NewCreateBucketResp(bucketID int64) *CreateBucketResp { +func NewCreateBucketResp(bucketID cdssdk.BucketID) *CreateBucketResp { return &CreateBucketResp{ BucketID: bucketID, } @@ -102,14 +103,14 @@ var _ = Register(Service.DeleteBucket) type DeleteBucket struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` + UserID cdssdk.UserID `json:"userID"` + BucketID cdssdk.BucketID `json:"bucketID"` } type DeleteBucketResp struct { mq.MessageBodyBase } -func NewDeleteBucket(userID int64, bucketID int64) *DeleteBucket { +func NewDeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) *DeleteBucket { return &DeleteBucket{ UserID: userID, BucketID: bucketID, diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index 6683f2a..facda76 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -16,15 +16,15 @@ var _ = Register(Service.CachePackageMoved) type CachePackageMoved struct { mq.MessageBodyBase - PackageID int64 `json:"packageID"` - NodeID int64 `json:"nodeID"` - FileHashes []string `json:"fileHashes"` + PackageID cdssdk.PackageID `json:"packageID"` + NodeID cdssdk.NodeID `json:"nodeID"` + FileHashes []string `json:"fileHashes"` } type CachePackageMovedResp struct { mq.MessageBodyBase } -func NewCachePackageMoved(packageID int64, nodeID int64, fileHashes []string) *CachePackageMoved { +func NewCachePackageMoved(packageID cdssdk.PackageID, nodeID cdssdk.NodeID, fileHashes []string) *CachePackageMoved { return &CachePackageMoved{ PackageID: packageID, NodeID: nodeID, @@ -43,15 +43,15 @@ var _ = Register(Service.GetPackageObjectCacheInfos) type GetPackageObjectCacheInfos struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type GetPackageObjectCacheInfosResp struct { mq.MessageBodyBase Infos []cdssdk.ObjectCacheInfo } -func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObjectCacheInfos { +func NewGetPackageObjectCacheInfos(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjectCacheInfos { return &GetPackageObjectCacheInfos{ UserID: userID, PackageID: packageID, diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 530341f..dd3b042 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -2,6 +2,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -16,14 +17,14 @@ var _ = Register(Service.GetUserNodes) type GetUserNodes struct { mq.MessageBodyBase - UserID int64 `json:"userID"` + UserID cdssdk.UserID `json:"userID"` } type GetUserNodesResp struct { mq.MessageBodyBase Nodes []model.Node `json:"nodes"` } -func NewGetUserNodes(userID int64) *GetUserNodes { +func NewGetUserNodes(userID cdssdk.UserID) *GetUserNodes { return &GetUserNodes{ UserID: userID, } @@ -42,14 +43,14 @@ var _ = Register(Service.GetNodes) type GetNodes struct { mq.MessageBodyBase - NodeIDs []int64 `json:"nodeIDs"` + NodeIDs []cdssdk.NodeID `json:"nodeIDs"` } type GetNodesResp struct { mq.MessageBodyBase Nodes []model.Node `json:"nodes"` } -func NewGetNodes(nodeIDs []int64) *GetNodes { +func NewGetNodes(nodeIDs []cdssdk.NodeID) *GetNodes { return &GetNodes{ NodeIDs: nodeIDs, } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 29eac75..c355a01 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -2,6 +2,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -10,8 +11,6 @@ import ( type ObjectService interface { GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) - GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, *mq.CodeMessage) - GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage) } @@ -20,15 +19,15 @@ var _ = Register(Service.GetPackageObjects) type GetPackageObjects struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type GetPackageObjectsResp struct { mq.MessageBodyBase Objects []model.Object `json:"objects"` } -func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects { +func NewGetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjects { return &GetPackageObjects{ UserID: userID, PackageID: packageID, @@ -43,45 +42,19 @@ func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObje return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) } -// 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetPackageObjectRepData) - -type GetPackageObjectRepData struct { - mq.MessageBodyBase - PackageID int64 `json:"packageID"` -} -type GetPackageObjectRepDataResp struct { - mq.MessageBodyBase - Data []stgmod.ObjectRepData `json:"data"` -} - -func NewGetPackageObjectRepData(packageID int64) *GetPackageObjectRepData { - return &GetPackageObjectRepData{ - PackageID: packageID, - } -} -func NewGetPackageObjectRepDataResp(data []stgmod.ObjectRepData) *GetPackageObjectRepDataResp { - return &GetPackageObjectRepDataResp{ - Data: data, - } -} -func (client *Client) GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, error) { - return mq.Request(Service.GetPackageObjectRepData, client.rabbitCli, msg) -} - // 获取指定Object的EC数据,返回的Objects会按照ObjectID升序 var _ = Register(Service.GetPackageObjectECData) type GetPackageObjectECData struct { mq.MessageBodyBase - PackageID int64 `json:"packageID"` + PackageID cdssdk.PackageID `json:"packageID"` } type GetPackageObjectECDataResp struct { mq.MessageBodyBase Data []stgmod.ObjectECData `json:"data"` } -func NewGetPackageObjectECData(packageID int64) *GetPackageObjectECData { +func NewGetPackageObjectECData(packageID cdssdk.PackageID) *GetPackageObjectECData { return &GetPackageObjectECData{ PackageID: packageID, } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 56caf6e..d9e402e 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -4,6 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -12,9 +13,7 @@ type PackageService interface { CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) - UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, *mq.CodeMessage) - - UpdateECPackage(msg *UpdateECPackage) (*UpdateECPackageResp, *mq.CodeMessage) + UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage) @@ -28,15 +27,15 @@ var _ = Register(Service.GetPackage) type GetPackage struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type GetPackageResp struct { mq.MessageBodyBase model.Package } -func NewGetPackage(userID int64, packageID int64) *GetPackage { +func NewGetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackage { return &GetPackage{ UserID: userID, PackageID: packageID, @@ -56,25 +55,23 @@ var _ = Register(Service.CreatePackage) type CreatePackage struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"` + UserID cdssdk.UserID `json:"userID"` + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` } type CreatePackageResp struct { mq.MessageBodyBase - PackageID int64 `json:"packageID"` + PackageID cdssdk.PackageID `json:"packageID"` } -func NewCreatePackage(userID int64, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) *CreatePackage { +func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) *CreatePackage { return &CreatePackage{ - UserID: userID, - BucketID: bucketID, - Name: name, - Redundancy: redundancy, + UserID: userID, + BucketID: bucketID, + Name: name, } } -func NewCreatePackageResp(packageID int64) *CreatePackageResp { +func NewCreatePackageResp(packageID cdssdk.PackageID) *CreatePackageResp { return &CreatePackageResp{ PackageID: packageID, } @@ -83,85 +80,44 @@ func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, err return mq.Request(Service.CreatePackage, client.rabbitCli, msg) } -// 更新Rep备份模式的Package -var _ = Register(Service.UpdateRepPackage) - -type UpdateRepPackage struct { - mq.MessageBodyBase - PackageID int64 `json:"packageID"` - Adds []AddRepObjectInfo `json:"objects"` - Deletes []int64 `json:"deletes"` -} -type UpdateRepPackageResp struct { - mq.MessageBodyBase -} -type AddRepObjectInfo struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash string `json:"fileHash"` - NodeIDs []int64 `json:"nodeIDs"` -} - -func NewUpdateRepPackage(packageID int64, adds []AddRepObjectInfo, deletes []int64) *UpdateRepPackage { - return &UpdateRepPackage{ - PackageID: packageID, - Adds: adds, - Deletes: deletes, - } -} -func NewUpdateRepPackageResp() *UpdateRepPackageResp { - return &UpdateRepPackageResp{} -} -func NewAddRepObjectInfo(path string, size int64, fileHash string, nodeIDs []int64) AddRepObjectInfo { - return AddRepObjectInfo{ - Path: path, - Size: size, - FileHash: fileHash, - NodeIDs: nodeIDs, - } -} -func (client *Client) UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, error) { - return mq.Request(Service.UpdateRepPackage, client.rabbitCli, msg) -} - // 更新EC备份模式的Package var _ = Register(Service.UpdateECPackage) -type UpdateECPackage struct { +type UpdatePackage struct { mq.MessageBodyBase - PackageID int64 `json:"packageID"` - Adds []AddECObjectInfo `json:"objects"` - Deletes []int64 `json:"deletes"` + PackageID cdssdk.PackageID `json:"packageID"` + Adds []AddObjectInfo `json:"objects"` + Deletes []cdssdk.ObjectID `json:"deletes"` } -type UpdateECPackageResp struct { +type UpdatePackageResp struct { mq.MessageBodyBase } -type AddECObjectInfo struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHashes []string `json:"fileHashes"` - NodeIDs []int64 `json:"nodeIDs"` +type AddObjectInfo struct { + Path string `json:"path"` + Size int64 `json:"size,string"` + Redundancy cdssdk.Redundancy `json:"redundancy"` + Blocks []stgmod.ObjectBlockData `json:"blocks"` } -func NewUpdateECPackage(packageID int64, adds []AddECObjectInfo, deletes []int64) *UpdateECPackage { - return &UpdateECPackage{ +func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes []cdssdk.ObjectID) *UpdatePackage { + return &UpdatePackage{ PackageID: packageID, Adds: adds, Deletes: deletes, } } -func NewUpdateECPackageResp() *UpdateECPackageResp { - return &UpdateECPackageResp{} +func NewUpdatePackageResp() *UpdatePackageResp { + return &UpdatePackageResp{} } -func NewAddECObjectInfo(path string, size int64, fileHashes []string, nodeIDs []int64) AddECObjectInfo { - return AddECObjectInfo{ +func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockData) AddObjectInfo { + return AddObjectInfo{ Path: path, Size: size, - FileHashes: fileHashes, - NodeIDs: nodeIDs, + Redundancy: redundancy, + Blocks: blocks, } } -func (client *Client) UpdateECPackage(msg *UpdateECPackage) (*UpdateECPackageResp, error) { +func (client *Client) UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, error) { return mq.Request(Service.UpdateECPackage, client.rabbitCli, msg) } @@ -170,14 +126,14 @@ var _ = Register(Service.DeletePackage) type DeletePackage struct { mq.MessageBodyBase - UserID int64 `db:"userID"` - PackageID int64 `db:"packageID"` + UserID cdssdk.UserID `db:"userID"` + PackageID cdssdk.PackageID `db:"packageID"` } type DeletePackageResp struct { mq.MessageBodyBase } -func NewDeletePackage(userID int64, packageID int64) *DeletePackage { +func NewDeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *DeletePackage { return &DeletePackage{ UserID: userID, PackageID: packageID, @@ -195,8 +151,8 @@ var _ = Register(Service.GetPackageCachedNodes) type GetPackageCachedNodes struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type PackageCachedNodeInfo struct { @@ -210,7 +166,7 @@ type GetPackageCachedNodesResp struct { cdssdk.PackageCachingInfo } -func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNodes { +func NewGetPackageCachedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageCachedNodes { return &GetPackageCachedNodes{ UserID: userID, PackageID: packageID, @@ -236,23 +192,23 @@ var _ = Register(Service.GetPackageLoadedNodes) type GetPackageLoadedNodes struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type GetPackageLoadedNodesResp struct { mq.MessageBodyBase - NodeIDs []int64 `json:"nodeIDs"` + NodeIDs []cdssdk.NodeID `json:"nodeIDs"` } -func NewGetPackageLoadedNodes(userID int64, packageID int64) *GetPackageLoadedNodes { +func NewGetPackageLoadedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageLoadedNodes { return &GetPackageLoadedNodes{ UserID: userID, PackageID: packageID, } } -func NewGetPackageLoadedNodesResp(nodeIDs []int64) *GetPackageLoadedNodesResp { +func NewGetPackageLoadedNodesResp(nodeIDs []cdssdk.NodeID) *GetPackageLoadedNodesResp { return &GetPackageLoadedNodesResp{ NodeIDs: nodeIDs, } diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index d36f164..f6ff828 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -2,6 +2,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -16,21 +17,21 @@ var _ = Register(Service.GetStorageInfo) type GetStorageInfo struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - StorageID int64 `json:"storageID"` + UserID cdssdk.UserID `json:"userID"` + StorageID cdssdk.StorageID `json:"storageID"` } type GetStorageInfoResp struct { mq.MessageBodyBase model.Storage } -func NewGetStorageInfo(userID int64, storageID int64) *GetStorageInfo { +func NewGetStorageInfo(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorageInfo { return &GetStorageInfo{ UserID: userID, StorageID: storageID, } } -func NewGetStorageInfoResp(storageID int64, name string, nodeID int64, dir string, state string) *GetStorageInfoResp { +func NewGetStorageInfoResp(storageID cdssdk.StorageID, name string, nodeID cdssdk.NodeID, dir string, state string) *GetStorageInfoResp { return &GetStorageInfoResp{ Storage: model.Storage{ StorageID: storageID, @@ -50,15 +51,15 @@ var _ = Register(Service.StoragePackageLoaded) type StoragePackageLoaded struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StorageID int64 `json:"storageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + StorageID cdssdk.StorageID `json:"storageID"` } type StoragePackageLoadedResp struct { mq.MessageBodyBase } -func NewStoragePackageLoaded(userID int64, packageID int64, stgID int64) *StoragePackageLoaded { +func NewStoragePackageLoaded(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *StoragePackageLoaded { return &StoragePackageLoaded{ UserID: userID, PackageID: packageID, diff --git a/common/pkgs/mq/scanner/event/agent_check_cache.go b/common/pkgs/mq/scanner/event/agent_check_cache.go index 4cdaec2..19d44a1 100644 --- a/common/pkgs/mq/scanner/event/agent_check_cache.go +++ b/common/pkgs/mq/scanner/event/agent_check_cache.go @@ -1,12 +1,14 @@ package event +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type AgentCheckCache struct { EventBase - NodeID int64 `json:"nodeID"` - FileHashes []string `json:"fileHashes"` // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 + NodeID cdssdk.NodeID `json:"nodeID"` + FileHashes []string `json:"fileHashes"` // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { +func NewAgentCheckCache(nodeID cdssdk.NodeID, fileHashes []string) *AgentCheckCache { return &AgentCheckCache{ NodeID: nodeID, FileHashes: fileHashes, diff --git a/common/pkgs/mq/scanner/event/agent_check_state.go b/common/pkgs/mq/scanner/event/agent_check_state.go index 91d34be..3e06f26 100644 --- a/common/pkgs/mq/scanner/event/agent_check_state.go +++ b/common/pkgs/mq/scanner/event/agent_check_state.go @@ -1,11 +1,13 @@ package event +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type AgentCheckState struct { EventBase - NodeID int64 `json:"nodeID"` + NodeID cdssdk.NodeID `json:"nodeID"` } -func NewAgentCheckState(nodeID int64) *AgentCheckState { +func NewAgentCheckState(nodeID cdssdk.NodeID) *AgentCheckState { return &AgentCheckState{ NodeID: nodeID, } diff --git a/common/pkgs/mq/scanner/event/agent_check_storage.go b/common/pkgs/mq/scanner/event/agent_check_storage.go index aeaffa1..f76ebc8 100644 --- a/common/pkgs/mq/scanner/event/agent_check_storage.go +++ b/common/pkgs/mq/scanner/event/agent_check_storage.go @@ -1,12 +1,14 @@ package event +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type AgentCheckStorage struct { EventBase - StorageID int64 `json:"storageID"` - PackageIDs []int64 `json:"packageIDs"` // 需要检查的Package文件列表,如果为nil(不是为空),则代表进行全量检查 + StorageID cdssdk.StorageID `json:"storageID"` + PackageIDs []cdssdk.PackageID `json:"packageIDs"` // 需要检查的Package文件列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage { +func NewAgentCheckStorage(storageID cdssdk.StorageID, packageIDs []cdssdk.PackageID) *AgentCheckStorage { return &AgentCheckStorage{ StorageID: storageID, PackageIDs: packageIDs, diff --git a/common/pkgs/mq/scanner/event/check_cache.go b/common/pkgs/mq/scanner/event/check_cache.go index d6f7633..8fa4036 100644 --- a/common/pkgs/mq/scanner/event/check_cache.go +++ b/common/pkgs/mq/scanner/event/check_cache.go @@ -1,11 +1,13 @@ package event +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type CheckCache struct { EventBase - NodeID int64 `json:"nodeID"` + NodeID cdssdk.NodeID `json:"nodeID"` } -func NewCheckCache(nodeID int64) *CheckCache { +func NewCheckCache(nodeID cdssdk.NodeID) *CheckCache { return &CheckCache{ NodeID: nodeID, } diff --git a/common/pkgs/mq/scanner/event/check_package.go b/common/pkgs/mq/scanner/event/check_package.go index 50fb664..f6bdea8 100644 --- a/common/pkgs/mq/scanner/event/check_package.go +++ b/common/pkgs/mq/scanner/event/check_package.go @@ -1,11 +1,13 @@ package event +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type CheckPackage struct { EventBase - PackageIDs []int64 `json:"packageIDs"` + PackageIDs []cdssdk.PackageID `json:"packageIDs"` } -func NewCheckPackage(packageIDs []int64) *CheckPackage { +func NewCheckPackage(packageIDs []cdssdk.PackageID) *CheckPackage { return &CheckPackage{ PackageIDs: packageIDs, } diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/services/bucket.go index ba9f13d..e755dbf 100644 --- a/coordinator/internal/services/bucket.go +++ b/coordinator/internal/services/bucket.go @@ -7,11 +7,12 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { +func (svc *Service) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { // TODO panic("not implement yet") } @@ -42,7 +43,7 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge } func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { - var bucketID int64 + var bucketID cdssdk.BucketID var err error svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { // 这里用的是外部的err diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index bda0c42..e18dbd7 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -7,6 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -19,7 +20,7 @@ type AgentCheckCache struct { *scevt.AgentCheckCache } -func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { +func NewAgentCheckCache(nodeID cdssdk.NodeID, fileHashes []string) *AgentCheckCache { return &AgentCheckCache{ AgentCheckCache: scevt.NewAgentCheckCache(nodeID, fileHashes), } diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index bd24769..e16f091 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -7,6 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -20,7 +21,7 @@ type AgentCheckState struct { *scevt.AgentCheckState } -func NewAgentCheckState(nodeID int64) *AgentCheckState { +func NewAgentCheckState(nodeID cdssdk.NodeID) *AgentCheckState { return &AgentCheckState{ AgentCheckState: scevt.NewAgentCheckState(nodeID), } diff --git a/scanner/internal/event/agent_check_storage.go b/scanner/internal/event/agent_check_storage.go index 02854e4..55d8277 100644 --- a/scanner/internal/event/agent_check_storage.go +++ b/scanner/internal/event/agent_check_storage.go @@ -7,6 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -19,7 +20,7 @@ type AgentCheckStorage struct { *scevt.AgentCheckStorage } -func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage { +func NewAgentCheckStorage(storageID cdssdk.StorageID, packageIDs []cdssdk.PackageID) *AgentCheckStorage { return &AgentCheckStorage{ AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, packageIDs), } @@ -157,7 +158,7 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage } // 根据返回结果修改数据库 - var chkObjIDs []int64 + var chkObjIDs []cdssdk.PackageID for _, entry := range checkResp.Entries { switch entry.Operation { case agtmq.CHECK_STORAGE_RESP_OP_DELETE: diff --git a/scanner/internal/event/check_cache.go b/scanner/internal/event/check_cache.go index b4e95d2..ee94132 100644 --- a/scanner/internal/event/check_cache.go +++ b/scanner/internal/event/check_cache.go @@ -5,6 +5,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -15,7 +16,7 @@ type CheckCache struct { *scevt.CheckCache } -func NewCheckCache(nodeID int64) *CheckCache { +func NewCheckCache(nodeID cdssdk.NodeID) *CheckCache { return &CheckCache{ CheckCache: scevt.NewCheckCache(nodeID), } diff --git a/scanner/internal/event/check_package.go b/scanner/internal/event/check_package.go index e93dc7a..b4edcf6 100644 --- a/scanner/internal/event/check_package.go +++ b/scanner/internal/event/check_package.go @@ -3,6 +3,7 @@ package event import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -11,9 +12,9 @@ type CheckPackage struct { *scevt.CheckPackage } -func NewCheckPackage(objIDs []int64) *CheckPackage { +func NewCheckPackage(pkgIDs []cdssdk.PackageID) *CheckPackage { return &CheckPackage{ - CheckPackage: scevt.NewCheckPackage(objIDs), + CheckPackage: scevt.NewCheckPackage(pkgIDs), } } diff --git a/scanner/internal/tickevent/batch_all_agent_check_cache.go b/scanner/internal/tickevent/batch_all_agent_check_cache.go index 52c6546..0018e40 100644 --- a/scanner/internal/tickevent/batch_all_agent_check_cache.go +++ b/scanner/internal/tickevent/batch_all_agent_check_cache.go @@ -3,6 +3,7 @@ package tickevent import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/scanner/internal/event" ) @@ -10,7 +11,7 @@ import ( const AGENT_CHECK_CACHE_BATCH_SIZE = 2 type BatchAllAgentCheckCache struct { - nodeIDs []int64 + nodeIDs []cdssdk.NodeID } func NewBatchAllAgentCheckCache() *BatchAllAgentCheckCache { @@ -29,7 +30,7 @@ func (e *BatchAllAgentCheckCache) Execute(ctx ExecuteContext) { return } - e.nodeIDs = lo.Map(nodes, func(node model.Node, index int) int64 { return node.NodeID }) + e.nodeIDs = lo.Map(nodes, func(node model.Node, index int) cdssdk.NodeID { return node.NodeID }) log.Debugf("new check start, get all nodes") }