Browse Source

Merge branch 'feature_gxh'

gitlink
Sydonian 2 years ago
parent
commit
29377d53f3
26 changed files with 91 additions and 91 deletions
  1. +3
    -3
      agent/internal/task/cache_move_package.go
  2. +2
    -2
      agent/internal/task/create_ec_package.go
  3. +2
    -2
      agent/internal/task/create_rep_package.go
  4. +3
    -3
      client/internal/cmdline/package.go
  5. +2
    -2
      client/internal/cmdline/storage.go
  6. +3
    -3
      client/internal/http/cacah.go
  7. +5
    -5
      client/internal/http/package.go
  8. +6
    -6
      client/internal/http/storage.go
  9. +3
    -3
      client/internal/services/cacah.go
  10. +8
    -8
      client/internal/services/package.go
  11. +2
    -2
      client/internal/services/storage.go
  12. +2
    -2
      client/internal/task/create_ec_package.go
  13. +2
    -2
      client/internal/task/create_rep_package.go
  14. +6
    -6
      common/pkgs/cmd/create_ec_package.go
  15. +4
    -4
      common/pkgs/cmd/create_rep_package.go
  16. +2
    -2
      common/pkgs/cmd/download_package.go
  17. +2
    -2
      common/pkgs/cmd/update_ec_package.go
  18. +4
    -4
      common/pkgs/db/model/model.go
  19. +5
    -5
      common/pkgs/db/object_rep.go
  20. +2
    -2
      common/pkgs/db/package.go
  21. +3
    -3
      common/pkgs/iterator/ec_object_iterator.go
  22. +3
    -3
      common/pkgs/mq/agent/cache.go
  23. +3
    -3
      common/pkgs/mq/agent/storage.go
  24. +3
    -3
      common/pkgs/mq/coordinator/cache.go
  25. +6
    -6
      common/pkgs/mq/coordinator/package.go
  26. +5
    -5
      coordinator/internal/services/package.go

+ 3
- 3
agent/internal/task/cache_move_package.go View File

@@ -6,7 +6,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
@@ -17,7 +17,7 @@ type CacheMovePackage struct {
userID int64
packageID int64

ResultCacheInfos []stgsdk.ObjectCacheInfo
ResultCacheInfos []cdssdk.ObjectCacheInfo
}

func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage {
@@ -96,7 +96,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg
}

fileHashes = append(fileHashes, rep.FileHash)
t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object, rep.FileHash))
t.ResultCacheInfos = append(t.ResultCacheInfos, cdssdk.NewObjectCacheInfo(rep.Object, rep.FileHash))
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes))


+ 2
- 2
agent/internal/task/create_ec_package.go View File

@@ -5,7 +5,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)
@@ -18,7 +18,7 @@ type CreateECPackage struct {
Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity),
}


+ 2
- 2
agent/internal/task/create_rep_package.go View File

@@ -5,7 +5,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)
@@ -18,7 +18,7 @@ type CreateRepPackage struct {
Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity),
}


+ 3
- 3
client/internal/cmdline/package.go View File

@@ -8,7 +8,7 @@ import (
"time"

"github.com/jedib0t/go-pretty/v6/table"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client/internal/config"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)
@@ -112,7 +112,7 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, stgsdk.NewRepRedundancyInfo(repCount), nodeAff)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, cdssdk.NewRepRedundancyInfo(repCount), nodeAff)

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
@@ -209,7 +209,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64,
}

objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, stgsdk.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff)
taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff)

if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)


+ 2
- 2
client/internal/cmdline/storage.go View File

@@ -4,7 +4,7 @@ import (
"fmt"
"time"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error {
@@ -32,7 +32,7 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er

func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error {
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path,
stgsdk.NewTypedRepRedundancyInfo(repCount), nil)
cdssdk.NewTypedRepRedundancyInfo(repCount), nil)
if err != nil {
return fmt.Errorf("start storage uploading rep package: %w", err)
}


+ 3
- 3
client/internal/http/cacah.go View File

@@ -7,7 +7,7 @@ import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type CacheService struct {
@@ -26,7 +26,7 @@ type CacheMovePackageReq struct {
NodeID *int64 `json:"nodeID" binding:"required"`
}
type CacheMovePackageResp struct {
CacheInfos []stgsdk.ObjectCacheInfo `json:"cacheInfos"`
CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"`
}

func (s *CacheService) MovePackage(ctx *gin.Context) {
@@ -74,7 +74,7 @@ type CacheGetPackageObjectCacheInfosReq struct {
PackageID *int64 `form:"packageID" binding:"required"`
}

type CacheGetPackageObjectCacheInfosResp = stgsdk.CacheGetPackageObjectCacheInfosResp
type CacheGetPackageObjectCacheInfosResp = cdssdk.CacheGetPackageObjectCacheInfosResp

func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos")


+ 5
- 5
client/internal/http/package.go View File

@@ -9,7 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
@@ -62,7 +62,7 @@ type PackageUploadInfo struct {
UserID *int64 `json:"userID" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy" binding:"required"`
Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy" binding:"required"`
NodeAffinity *int64 `json:"nodeAffinity"`
}

@@ -97,7 +97,7 @@ func (s *PackageService) uploadRep(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var err error
var repInfo stgsdk.RepRedundancyInfo
var repInfo cdssdk.RepRedundancyInfo
if repInfo, err = req.Info.Redundancy.ToRepInfo(); err != nil {
log.Warnf("parsing rep redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
@@ -141,7 +141,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) {
log := logger.WithField("HTTP", "Package.Upload")

var err error
var ecInfo stgsdk.ECRedundancyInfo
var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = req.Info.Redundancy.ToECInfo(); err != nil {
log.Warnf("parsing ec redundancy config: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid rep redundancy config"))
@@ -211,7 +211,7 @@ type GetCachedNodesReq struct {
PackageID *int64 `json:"packageID" binding:"required"`
}
type GetCachedNodesResp struct {
stgsdk.PackageCachingInfo
cdssdk.PackageCachingInfo
}

func (s *PackageService) GetCachedNodes(ctx *gin.Context) {


+ 6
- 6
client/internal/http/storage.go View File

@@ -7,7 +7,7 @@ import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type StorageService struct {
@@ -27,7 +27,7 @@ type StorageLoadPackageReq struct {
}

type StorageLoadPackageResp struct {
stgsdk.StorageLoadPackageResp
cdssdk.StorageLoadPackageResp
}

func (s *StorageService) LoadPackage(ctx *gin.Context) {
@@ -57,7 +57,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}

ctx.JSON(http.StatusOK, OK(StorageLoadPackageResp{
StorageLoadPackageResp: stgsdk.StorageLoadPackageResp{
StorageLoadPackageResp: cdssdk.StorageLoadPackageResp{
FullPath: fullPath,
},
}))
@@ -78,7 +78,7 @@ type StorageCreatePackageReq struct {
Path string `json:"path" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy" binding:"required"`
Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy" binding:"required"`
NodeAffinity *int64 `json:"nodeAffinity"`
}

@@ -133,7 +133,7 @@ type StorageGetInfoReq struct {
}

type StorageGetInfoResp struct {
stgsdk.StorageGetInfoResp
cdssdk.StorageGetInfoResp
}

func (s *StorageService) GetInfo(ctx *gin.Context) {
@@ -154,7 +154,7 @@ func (s *StorageService) GetInfo(ctx *gin.Context) {
}

ctx.JSON(http.StatusOK, OK(StorageGetInfoResp{
StorageGetInfoResp: stgsdk.StorageGetInfoResp{
StorageGetInfoResp: cdssdk.StorageGetInfoResp{
Name: info.Name,
NodeID: info.NodeID,
Directory: info.Directory,


+ 3
- 3
client/internal/services/cacah.go View File

@@ -4,7 +4,7 @@ import (
"fmt"
"time"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -34,7 +34,7 @@ func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, no
return startResp.TaskID, nil
}

func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []stgsdk.ObjectCacheInfo, error) {
func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, []cdssdk.ObjectCacheInfo, error) {
agentCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
return true, nil, fmt.Errorf("new agent client: %w", err)
@@ -57,7 +57,7 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT
return true, waitResp.CacheInfos, nil
}

func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]cdssdk.ObjectCacheInfo, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)


+ 8
- 8
client/internal/services/package.go View File

@@ -4,7 +4,7 @@ import (
"fmt"
"time"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

mytask "gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -122,7 +122,7 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecInfo stgsdk.ECRedundancyInfo
var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}
@@ -139,7 +139,7 @@ func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.
return iter, nil
}

func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo stgsdk.RepRedundancyInfo, nodeAffinity *int64) (string, error) {
func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo cdssdk.RepRedundancyInfo, nodeAffinity *int64) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity))
return tsk.ID(), nil
}
@@ -167,7 +167,7 @@ func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout tim
return false, nil, nil
}

func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo stgsdk.ECRedundancyInfo, nodeAffinity *int64) (string, error) {
func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo cdssdk.ECRedundancyInfo, nodeAffinity *int64) (string, error) {
tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity))
return tsk.ID(), nil
}
@@ -230,19 +230,19 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error {
return nil
}

func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (stgsdk.PackageCachingInfo, error) {
func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (cdssdk.PackageCachingInfo, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return stgsdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err)
return cdssdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

resp, err := coorCli.GetPackageCachedNodes(coormq.NewGetPackageCachedNodes(userID, packageID))
if err != nil {
return stgsdk.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err)
return cdssdk.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err)
}

tmp := stgsdk.PackageCachingInfo{
tmp := cdssdk.PackageCachingInfo{
NodeInfos: resp.NodeInfos,
PackageSize: resp.PackageSize,
RedunancyType: resp.RedunancyType,


+ 2
- 2
client/internal/services/storage.go View File

@@ -4,7 +4,7 @@ import (
"fmt"
"time"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -41,7 +41,7 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s
}

// 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy stgsdk.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) {
func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy cdssdk.TypedRedundancyInfo, nodeAffinity *int64) (int64, string, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)


+ 2
- 2
client/internal/task/create_ec_package.go View File

@@ -4,7 +4,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)
@@ -17,7 +17,7 @@ type CreateECPackage struct {
Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity),
}


+ 2
- 2
client/internal/task/create_rep_package.go View File

@@ -4,7 +4,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)
@@ -17,7 +17,7 @@ type CreateRepPackage struct {
Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy, nodeAffinity),
}


+ 6
- 6
common/pkgs/cmd/create_ec_package.go View File

@@ -10,7 +10,7 @@ import (

"github.com/samber/lo"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -25,7 +25,7 @@ type CreateECPackage struct {
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy stgsdk.ECRedundancyInfo
redundancy cdssdk.ECRedundancyInfo
nodeAffinity *int64
}

@@ -40,7 +40,7 @@ type ECObjectUploadResult struct {
ObjectID int64
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
return &CreateECPackage{
userID: userID,
bucketID: bucketID,
@@ -80,7 +80,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
stgsdk.NewTypedRedundancyInfo(t.redundancy)))
cdssdk.NewTypedRedundancyInfo(t.redundancy)))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}
@@ -139,7 +139,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe
}, nil
}

func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -185,7 +185,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje
}

// 上传文件
func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) {
func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) {
//生成纠删码的写入节点序列
nodes := make([]UploadNodeInfo, ec.EcN)
numNodes := len(uploadNodes)


+ 4
- 4
common/pkgs/cmd/create_rep_package.go View File

@@ -9,7 +9,7 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -29,7 +29,7 @@ type CreateRepPackage struct {
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy stgsdk.RepRedundancyInfo
redundancy cdssdk.RepRedundancyInfo
nodeAffinity *int64
}

@@ -49,7 +49,7 @@ type RepObjectUploadResult struct {
ObjectID int64
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy stgsdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage {
return &CreateRepPackage{
userID: userID,
bucketID: bucketID,
@@ -94,7 +94,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
stgsdk.NewTypedRedundancyInfo(t.redundancy)))
cdssdk.NewTypedRedundancyInfo(t.redundancy)))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}


+ 2
- 2
common/pkgs/cmd/download_package.go View File

@@ -6,7 +6,7 @@ import (
"os"
"path/filepath"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -101,7 +101,7 @@ func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Pack
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecInfo stgsdk.ECRedundancyInfo
var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}


+ 2
- 2
common/pkgs/cmd/update_ec_package.go View File

@@ -5,7 +5,7 @@ import (

"github.com/samber/lo"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -80,7 +80,7 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe
}
})

var ecInfo stgsdk.ECRedundancyInfo
var ecInfo cdssdk.ECRedundancyInfo
if ecInfo, err = getPkgResp.Package.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}


+ 4
- 4
common/pkgs/db/model/model.go View File

@@ -3,10 +3,10 @@ package model
import (
"time"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

// TODO 可以考虑逐步迁移到stgsdk中。迁移思路:数据对象应该包含的字段都迁移到stgsdk中,内部使用的一些特殊字段则留在这里
// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里

type Node struct {
NodeID int64 `db:"NodeID" json:"nodeID"`
@@ -60,9 +60,9 @@ type Bucket struct {
CreatorID int64 `db:"CreatorID" json:"creatorID"`
}

type Package = stgsdk.Package
type Package = cdssdk.Package

type Object = stgsdk.Object
type Object = cdssdk.Object

type ObjectRep struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`


+ 5
- 5
common/pkgs/db/object_rep.go View File

@@ -7,7 +7,7 @@ import (
"strings"

"github.com/jmoiron/sqlx"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -117,9 +117,9 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) (
return rets, nil
}

func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]cdssdk.ObjectCacheInfo, error) {
var tmpRet []struct {
stgsdk.Object
cdssdk.Object
FileHash string `db:"FileHash"`
}

@@ -130,9 +130,9 @@ func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int6
return nil, err
}

ret := make([]stgsdk.ObjectCacheInfo, len(tmpRet))
ret := make([]cdssdk.ObjectCacheInfo, len(tmpRet))
for i, r := range tmpRet {
ret[i] = stgsdk.NewObjectCacheInfo(r.Object, r.FileHash)
ret[i] = cdssdk.NewObjectCacheInfo(r.Object, r.FileHash)
}

return ret, nil


+ 2
- 2
common/pkgs/db/package.go View File

@@ -7,7 +7,7 @@ import (

"github.com/jmoiron/sqlx"

stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"

"gitlink.org.cn/cloudream/storage/common/consts"
@@ -80,7 +80,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int6
return ret, err
}

func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redundancy stgsdk.TypedRedundancyInfo) (int64, error) {
func (db *PackageDB) Create(ctx SQLContext, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) (int64, error) {
// 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误
var packageID int64
err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID)


+ 3
- 3
common/pkgs/iterator/ec_object_iterator.go View File

@@ -9,7 +9,7 @@ import (
"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
@@ -26,13 +26,13 @@ type ECObjectIterator struct {
currentIndex int
inited bool

ecInfo stgsdk.ECRedundancyInfo
ecInfo cdssdk.ECRedundancyInfo
ec model.Ec
downloadCtx *DownloadContext
cliLocation model.Location
}

func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator {
func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator {
return &ECObjectIterator{
objects: objects,
objectECData: objectECData,


+ 3
- 3
common/pkgs/mq/agent/cache.go View File

@@ -2,7 +2,7 @@ package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

@@ -96,7 +96,7 @@ type WaitCacheMovePackageResp struct {
mq.MessageBodyBase
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
CacheInfos []stgsdk.ObjectCacheInfo `json:"cacheInfos"`
CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"`
}

func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage {
@@ -105,7 +105,7 @@ func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMoveP
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []stgsdk.ObjectCacheInfo) *WaitCacheMovePackageResp {
func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []cdssdk.ObjectCacheInfo) *WaitCacheMovePackageResp {
return &WaitCacheMovePackageResp{
IsComplete: isComplete,
Error: err,


+ 3
- 3
common/pkgs/mq/agent/storage.go View File

@@ -2,7 +2,7 @@ package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)
@@ -142,7 +142,7 @@ type StartStorageCreatePackage struct {
Name string `json:"name"`
StorageID int64 `json:"storageID"`
Path string `json:"path"`
Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy"`
Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"`
NodeAffinity *int64 `json:"nodeAffinity"`
}
type StartStorageCreatePackageResp struct {
@@ -150,7 +150,7 @@ type StartStorageCreatePackageResp struct {
TaskID string `json:"taskID"`
}

func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy stgsdk.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage {
func NewStartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy cdssdk.TypedRedundancyInfo, nodeAffinity *int64) *StartStorageCreatePackage {
return &StartStorageCreatePackage{
UserID: userID,
BucketID: bucketID,


+ 3
- 3
common/pkgs/mq/coordinator/cache.go View File

@@ -2,7 +2,7 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type CacheService interface {
@@ -48,7 +48,7 @@ type GetPackageObjectCacheInfos struct {
}
type GetPackageObjectCacheInfosResp struct {
mq.MessageBodyBase
Infos []stgsdk.ObjectCacheInfo
Infos []cdssdk.ObjectCacheInfo
}

func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObjectCacheInfos {
@@ -57,7 +57,7 @@ func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObj
PackageID: packageID,
}
}
func NewGetPackageObjectCacheInfosResp(infos []stgsdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp {
func NewGetPackageObjectCacheInfosResp(infos []cdssdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp {
return &GetPackageObjectCacheInfosResp{
Infos: infos,
}


+ 6
- 6
common/pkgs/mq/coordinator/package.go View File

@@ -2,7 +2,7 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)
@@ -59,14 +59,14 @@ type CreatePackage struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`
Name string `json:"name"`
Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy"`
Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"`
}
type CreatePackageResp struct {
mq.MessageBodyBase
PackageID int64 `json:"packageID"`
}

func NewCreatePackage(userID int64, bucketID int64, name string, redundancy stgsdk.TypedRedundancyInfo) *CreatePackage {
func NewCreatePackage(userID int64, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) *CreatePackage {
return &CreatePackage{
UserID: userID,
BucketID: bucketID,
@@ -207,7 +207,7 @@ type PackageCachedNodeInfo struct {

type GetPackageCachedNodesResp struct {
mq.MessageBodyBase
stgsdk.PackageCachingInfo
cdssdk.PackageCachingInfo
}

func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNodes {
@@ -217,9 +217,9 @@ func NewGetPackageCachedNodes(userID int64, packageID int64) *GetPackageCachedNo
}
}

func NewGetPackageCachedNodesResp(nodeInfos []stgsdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp {
func NewGetPackageCachedNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp {
return &GetPackageCachedNodesResp{
PackageCachingInfo: stgsdk.PackageCachingInfo{
PackageCachingInfo: cdssdk.PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
RedunancyType: redunancyType,


+ 5
- 5
coordinator/internal/services/package.go View File

@@ -9,7 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
@@ -215,7 +215,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c
}

var packageSize int64
nodeInfoMap := make(map[int64]*stgsdk.NodePackageCachingInfo)
nodeInfoMap := make(map[int64]*cdssdk.NodePackageCachingInfo)
if pkg.Redundancy.IsRepInfo() {
// 备份方式为rep
objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
@@ -231,7 +231,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c

nodeInfo, exists := nodeInfoMap[nodeID]
if !exists {
nodeInfo = &stgsdk.NodePackageCachingInfo{
nodeInfo = &cdssdk.NodePackageCachingInfo{
NodeID: nodeID,
FileSize: data.Object.Size,
ObjectCount: 1,
@@ -259,7 +259,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c

nodeInfo, exists := nodeInfoMap[nodeID]
if !exists {
nodeInfo = &stgsdk.NodePackageCachingInfo{
nodeInfo = &cdssdk.NodePackageCachingInfo{
NodeID: nodeID,
FileSize: ecData.Object.Size,
ObjectCount: 1,
@@ -278,7 +278,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c
return nil, mq.Failed(errorcode.OperationFailed, "redundancy type is wrong")
}

var nodeInfos []stgsdk.NodePackageCachingInfo
var nodeInfos []cdssdk.NodePackageCachingInfo
for _, nodeInfo := range nodeInfoMap {
nodeInfos = append(nodeInfos, *nodeInfo)
}


Loading…
Cancel
Save