Browse Source

增加复制Package接口

gitlink
Sydonian 11 months ago
parent
commit
4b88628c10
7 changed files with 156 additions and 11 deletions
  1. +22
    -0
      client/internal/http/package.go
  2. +1
    -0
      client/internal/http/server.go
  3. +15
    -0
      client/internal/services/package.go
  4. +10
    -0
      common/pkgs/db2/object_block.go
  5. +5
    -5
      common/pkgs/db2/package.go
  6. +35
    -0
      common/pkgs/mq/coordinator/package.go
  7. +68
    -6
      coordinator/internal/mq/package.go

+ 22
- 0
client/internal/http/package.go View File

@@ -172,6 +172,28 @@ func (s *PackageService) Delete(ctx *gin.Context) {
ctx.JSON(http.StatusOK, OK(nil)) ctx.JSON(http.StatusOK, OK(nil))
} }


func (s *PackageService) Clone(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Clone")

var req cdsapi.PackageClone
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

pkg, err := s.svc.PackageSvc().Clone(req.UserID, req.PackageID, req.BucketID, req.Name)
if err != nil {
log.Warnf("cloning package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "clone package failed"))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.PackageCloneResp{
Package: pkg,
}))
}

func (s *PackageService) ListBucketPackages(ctx *gin.Context) { func (s *PackageService) ListBucketPackages(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.ListBucketPackages") log := logger.WithField("HTTP", "Package.ListBucketPackages")




+ 1
- 0
client/internal/http/server.go View File

@@ -59,6 +59,7 @@ func (s *Server) initRouters() {
rt.POST(cdsapi.PackageCreatePath, s.Package().Create) rt.POST(cdsapi.PackageCreatePath, s.Package().Create)
rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad) rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad)
rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) rt.POST(cdsapi.PackageDeletePath, s.Package().Delete)
rt.POST(cdsapi.PackageClonePath, s.Package().Clone)
rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages)
rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages)
rt.GET(cdsapi.PackageGetLoadedStoragesPath, s.Package().GetLoadedStorages) rt.GET(cdsapi.PackageGetLoadedStoragesPath, s.Package().GetLoadedStorages)


+ 15
- 0
client/internal/services/package.go View File

@@ -106,6 +106,21 @@ func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.
return nil return nil
} }


func (svc *PackageService) Clone(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

resp, err := coorCli.ClonePackage(coormq.ReqClonePackage(userID, packageID, bucketID, name))
if err != nil {
return cdssdk.Package{}, fmt.Errorf("cloning package: %w", err)
}

return resp.Package, nil
}

// GetCachedStorages 获取指定包的缓存节点信息 // GetCachedStorages 获取指定包的缓存节点信息
func (svc *PackageService) GetCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) { func (svc *PackageService) GetCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) {
// 从协调器MQ池中获取客户端 // 从协调器MQ池中获取客户端


+ 10
- 0
common/pkgs/db2/object_block.go View File

@@ -33,6 +33,16 @@ func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.O
return blocks, err return blocks, err
} }


func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").
Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("ObjectBlock.ObjectID, ObjectBlock.`Index` ASC").
Find(&rets).Error
return rets, err
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash) error { func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash) error {
block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash} block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash}
return ctx.Table("ObjectBlock").Create(&block).Error return ctx.Table("ObjectBlock").Create(&block).Error


+ 5
- 5
common/pkgs/db2/package.go View File

@@ -110,7 +110,7 @@ func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, buc
return ret, err return ret, err
} }


func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) {
func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) {
var packageID int64 var packageID int64
err := ctx.Table("Package"). err := ctx.Table("Package").
Select("PackageID"). Select("PackageID").
@@ -118,18 +118,18 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin
Scan(&packageID).Error Scan(&packageID).Error


if err != nil { if err != nil {
return 0, err
return cdssdk.Package{}, err
} }
if packageID != 0 { if packageID != 0 {
return 0, gorm.ErrDuplicatedKey
return cdssdk.Package{}, gorm.ErrDuplicatedKey
} }


newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal}
if err := ctx.Create(&newPackage).Error; err != nil { if err := ctx.Create(&newPackage).Error; err != nil {
return 0, fmt.Errorf("insert package failed, err: %w", err)
return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err)
} }


return newPackage.PackageID, nil
return newPackage, nil
} }


// SoftDelete 设置一个对象被删除,并将相关数据删除 // SoftDelete 设置一个对象被删除,并将相关数据删除


+ 35
- 0
common/pkgs/mq/coordinator/package.go View File

@@ -20,6 +20,8 @@ type PackageService interface {


DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage) DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage)


ClonePackage(msg *ClonePackage) (*ClonePackageResp, *mq.CodeMessage)

GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, *mq.CodeMessage) GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, *mq.CodeMessage)


GetPackageLoadedStorages(msg *GetPackageLoadedStorages) (*GetPackageLoadedStoragesResp, *mq.CodeMessage) GetPackageLoadedStorages(msg *GetPackageLoadedStorages) (*GetPackageLoadedStoragesResp, *mq.CodeMessage)
@@ -186,6 +188,39 @@ func (client *Client) DeletePackage(msg *DeletePackage) (*DeletePackageResp, err
return mq.Request(Service.DeletePackage, client.rabbitCli, msg) return mq.Request(Service.DeletePackage, client.rabbitCli, msg)
} }


// 克隆Package
var _ = Register(Service.ClonePackage)

type ClonePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
BucketID cdssdk.BucketID `json:"bucketID"`
Name string `json:"name"`
}
type ClonePackageResp struct {
mq.MessageBodyBase
Package cdssdk.Package `json:"package"`
}

func ReqClonePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) *ClonePackage {
return &ClonePackage{
UserID: userID,
PackageID: packageID,
BucketID: bucketID,
Name: name,
}
}
func RespClonePackage(pkg cdssdk.Package) *ClonePackageResp {
return &ClonePackageResp{
Package: pkg,
}
}

func (client *Client) ClonePackage(msg *ClonePackage) (*ClonePackageResp, error) {
return mq.Request(Service.ClonePackage, client.rabbitCli, msg)
}

// 根据PackageID获取object分布情况 // 根据PackageID获取object分布情况
var _ = Register(Service.GetPackageCachedStorages) var _ = Register(Service.GetPackageCachedStorages)




+ 68
- 6
coordinator/internal/mq/package.go View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sort" "sort"


stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm" "gorm.io/gorm"


@@ -59,16 +60,11 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack
return fmt.Errorf("bucket is not avaiable to the user") return fmt.Errorf("bucket is not avaiable to the user")
} }


pkgID, err := svc.db2.Package().Create(tx, msg.BucketID, msg.Name)
pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name)
if err != nil { if err != nil {
return fmt.Errorf("creating package: %w", err) return fmt.Errorf("creating package: %w", err)
} }


pkg, err = svc.db2.Package().GetByID(tx, pkgID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

return nil return nil
}) })
if err != nil { if err != nil {
@@ -158,6 +154,72 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack
return mq.ReplyOK(coormq.NewDeletePackageResp()) return mq.ReplyOK(coormq.NewDeletePackageResp())
} }


func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackageResp, *mq.CodeMessage) {
var pkg cdssdk.Package
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID)
if !isAvai {
return fmt.Errorf("bucket is not avaiable to the user")
}

pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name)
if err != nil {
return fmt.Errorf("creating package: %w", err)
}

objs, err := svc.db2.Object().GetPackageObjects(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}

objBlks, err := svc.db2.ObjectBlock().GetInPackageID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting object blocks: %w", err)
}

clonedObjs := make([]cdssdk.Object, len(objs))
for i, obj := range objs {
clonedObjs[i] = obj
clonedObjs[i].ObjectID = 0
clonedObjs[i].PackageID = pkg.PackageID
}

err = svc.db2.Object().BatchCreate(tx, &clonedObjs)
if err != nil {
return fmt.Errorf("batch creating objects: %w", err)
}

oldToNew := make(map[cdssdk.ObjectID]cdssdk.ObjectID)
for i, obj := range clonedObjs {
oldToNew[objs[i].ObjectID] = obj.ObjectID
}

clonedBlks := make([]stgmod.ObjectBlock, len(objBlks))
for i, blk := range objBlks {
clonedBlks[i] = blk
clonedBlks[i].ObjectID = oldToNew[blk.ObjectID]
}

err = svc.db2.ObjectBlock().BatchCreate(tx, clonedBlks)
if err != nil {
return fmt.Errorf("batch creating object blocks: %w", err)
}

return nil
})
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, mq.Failed(errorcode.DataExists, "package already exists")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespClonePackage(pkg))
}

func (svc *Service) GetPackageCachedStorages(msg *coormq.GetPackageCachedStorages) (*coormq.GetPackageCachedStoragesResp, *mq.CodeMessage) { func (svc *Service) GetPackageCachedStorages(msg *coormq.GetPackageCachedStorages) (*coormq.GetPackageCachedStoragesResp, *mq.CodeMessage) {
isAva, err := svc.db2.Package().IsAvailable(svc.db2.DefCtx(), msg.UserID, msg.PackageID) isAva, err := svc.db2.Package().IsAvailable(svc.db2.DefCtx(), msg.UserID, msg.PackageID)
if err != nil { if err != nil {


Loading…
Cancel
Save