Browse Source

调度功能改名为一般化的存储系统写入功能

feature_gxh
Sydonian 6 months ago
parent
commit
cffbd89127
22 changed files with 131 additions and 420 deletions
  1. +4
    -2
      client/internal/http/http.go
  2. +1
    -1
      client/internal/http/object.go
  3. +14
    -14
      client/internal/http/package.go
  4. +1
    -1
      client/internal/http/presigned.go
  5. +4
    -4
      client/internal/http/server.go
  6. +7
    -7
      client/internal/http/user_space.go
  7. +1
    -1
      client/internal/services/user_space.go
  8. +4
    -4
      client/internal/ticktock/change_redundancy.go
  9. +42
    -42
      client/internal/ticktock/redundancy_recover.go
  10. +10
    -10
      client/internal/uploader/create_load.go
  11. +4
    -4
      client/internal/uploader/update.go
  12. +14
    -14
      client/internal/uploader/uploader.go
  13. +2
    -2
      client/sdk/api/object.go
  14. +10
    -10
      client/sdk/api/package.go
  15. +2
    -2
      client/sdk/api/presigned.go
  16. +1
    -1
      client/sdk/api/storage_test.go
  17. +8
    -8
      client/sdk/api/userspace.go
  18. +1
    -1
      common/pkgs/ioswitch2/parser/opt/multipart.go
  19. +1
    -1
      common/pkgs/ioswitch2/parser/parser.go
  20. +0
    -92
      hub/internal/task/cache_move_package.go
  21. +0
    -150
      hub/internal/task/create_package.go
  22. +0
    -49
      hub/internal/task/task.go

+ 4
- 2
client/internal/http/http.go View File

@@ -1,6 +1,8 @@
package http

import (
"fmt"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
@@ -19,10 +21,10 @@ func OK(data any) Response {
}
}

func Failed(code string, msg string) Response {
func Failed(code string, format string, args ...any) Response {
return Response{
Code: code,
Message: msg,
Message: fmt.Sprintf(format, args...),
}
}



+ 1
- 1
client/internal/http/object.go View File

@@ -83,7 +83,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) {
return
}

up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.LoadTo, req.Info.LoadToPath)
up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.CopyTo, req.Info.CopyToPath)
if err != nil {
log.Warnf("begin update: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err)))


+ 14
- 14
client/internal/http/package.go View File

@@ -88,31 +88,31 @@ func (s *PackageService) Create(ctx *gin.Context) {
}))
}

type PackageCreateLoad struct {
Info cliapi.PackageCreateLoadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
type PackageCreateUpload struct {
Info cliapi.PackageCreateUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
}

func (s *PackageService) CreateLoad(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.CreateLoad")
log := logger.WithField("HTTP", "Package.CreateUpload")

var req PackageCreateLoad
var req PackageCreateUpload
if err := ctx.ShouldBind(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

if len(req.Info.LoadTo) != len(req.Info.LoadToPath) {
log.Warnf("load to and load to path count not match")
ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "load to and load to path count not match"))
if len(req.Info.CopyTo) != len(req.Info.CopyToPath) {
log.Warnf("CopyTo and CopyToPath count not match")
ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "CopyTo and CopyToPath count not match"))
return
}

up, err := s.svc.Uploader.BeginCreateLoad(req.Info.BucketID, req.Info.Name, req.Info.LoadTo, req.Info.LoadToPath)
up, err := s.svc.Uploader.BeginCreateUpload(req.Info.BucketID, req.Info.Name, req.Info.CopyTo, req.Info.CopyToPath)
if err != nil {
log.Warnf("begin package create load: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin package create load: %v", err)))
log.Warnf("begin package create upload: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "%v", err))
return
}
defer up.Abort()
@@ -145,8 +145,8 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) {

ret, err := up.Commit()
if err != nil {
log.Warnf("commit create load: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit create load: %v", err)))
log.Warnf("commit create upload: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit create upload: %v", err)))
return
}

@@ -155,7 +155,7 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) {
objs[i] = ret.Objects[pathes[i]]
}

ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateLoadResp{Package: ret.Package, Objects: objs}))
ctx.JSON(http.StatusOK, OK(cliapi.PackageCreateUploadResp{Package: ret.Package, Objects: objs}))

}
func (s *PackageService) Delete(ctx *gin.Context) {


+ 1
- 1
client/internal/http/presigned.go View File

@@ -154,7 +154,7 @@ func (s *PresignedService) ObjectUpload(ctx *gin.Context) {
return
}

up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.LoadTo, req.LoadToPath)
up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.CopyTo, req.CopyToPath)
if err != nil {
log.Warnf("begin update: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err)))


+ 4
- 4
client/internal/http/server.go View File

@@ -91,13 +91,13 @@ func (s *Server) initRouters(engine *gin.Engine) {
rt.GET(cliapi.PackageGetPath, s.Package().Get)
rt.GET(cliapi.PackageGetByFullNamePath, s.Package().GetByFullName)
rt.POST(cliapi.PackageCreatePath, s.Package().Create)
rt.POST(cliapi.PackageCreateLoadPath, s.Package().CreateLoad)
rt.POST(cliapi.PackageCreateUploadPath, s.Package().CreateLoad)
rt.POST(cliapi.PackageDeletePath, s.Package().Delete)
rt.POST(cliapi.PackageClonePath, s.Package().Clone)
rt.GET(cliapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages)
// rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages)

rt.POST(cliapi.UserSpaceLoadPackagePath, s.UserSpace().LoadPackage)
rt.POST(cliapi.UserSpaceDownloadPackagePath, s.UserSpace().DownloadPackage)
rt.POST(cliapi.UserSpaceCreatePackagePath, s.UserSpace().CreatePackage)
rt.GET(cliapi.UserSpaceGetPath, s.UserSpace().Get)

@@ -130,13 +130,13 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) {
v1.GET(cliapi.PackageGetPath, awsAuth.Auth, s.Package().Get)
v1.GET(cliapi.PackageGetByFullNamePath, awsAuth.Auth, s.Package().GetByFullName)
v1.POST(cliapi.PackageCreatePath, awsAuth.Auth, s.Package().Create)
v1.POST(cliapi.PackageCreateLoadPath, awsAuth.Auth, s.Package().CreateLoad)
v1.POST(cliapi.PackageCreateUploadPath, awsAuth.Auth, s.Package().CreateLoad)
v1.POST(cliapi.PackageDeletePath, awsAuth.Auth, s.Package().Delete)
v1.POST(cliapi.PackageClonePath, awsAuth.Auth, s.Package().Clone)
v1.GET(cliapi.PackageListBucketPackagesPath, awsAuth.Auth, s.Package().ListBucketPackages)
// v1.GET(cdsapi.PackageGetCachedStoragesPath, awsAuth.Auth, s.Package().GetCachedStorages)

v1.POST(cliapi.UserSpaceLoadPackagePath, awsAuth.Auth, s.UserSpace().LoadPackage)
v1.POST(cliapi.UserSpaceDownloadPackagePath, awsAuth.Auth, s.UserSpace().DownloadPackage)
v1.POST(cliapi.UserSpaceCreatePackagePath, awsAuth.Auth, s.UserSpace().CreatePackage)
v1.GET(cliapi.UserSpaceGetPath, awsAuth.Auth, s.UserSpace().Get)
rt.POST(cliapi.UserSpaceSpaceToSpacePath, s.UserSpace().SpaceToSpace)


+ 7
- 7
client/internal/http/user_space.go View File

@@ -20,24 +20,24 @@ func (s *Server) UserSpace() *UserSpaceService {
}
}

func (s *UserSpaceService) LoadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "UserSpace.LoadPackage")
func (s *UserSpaceService) DownloadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "UserSpace.DownloadPackage")

var req cliapi.UserSpaceLoadPackageReq
var req cliapi.UserSpaceDownloadPackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.UserSpaceSvc().LoadPackage(req.PackageID, req.UserSpaceID, req.RootPath)
err := s.svc.UserSpaceSvc().DownloadPackage(req.PackageID, req.UserSpaceID, req.RootPath)
if err != nil {
log.Warnf("loading package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "loading package failed"))
log.Warnf("downloading package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "%v", err))
return
}

ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceLoadPackageResp{}))
ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceDownloadPackageResp{}))
}

func (s *UserSpaceService) CreatePackage(ctx *gin.Context) {


+ 1
- 1
client/internal/services/user_space.go View File

@@ -39,7 +39,7 @@ func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error)
return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
}

func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
coorCli := stgglb.CoordinatorRPCPool.Get()
defer coorCli.Release()



+ 4
- 4
client/internal/ticktock/change_redundancy.go View File

@@ -34,7 +34,7 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {

ctx := &changeRedundancyContext{
ticktock: t,
allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceLoadInfo),
allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceUsageInfo),
}

spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx())
@@ -52,7 +52,7 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {
continue
}

ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceLoadInfo{
ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceUsageInfo{
UserSpace: space,
}
}
@@ -85,11 +85,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {

type changeRedundancyContext struct {
ticktock *TickTock
allUserSpaces map[clitypes.UserSpaceID]*userSpaceLoadInfo
allUserSpaces map[clitypes.UserSpaceID]*userSpaceUsageInfo
mostBlockStgIDs []clitypes.UserSpaceID
}

type userSpaceLoadInfo struct {
type userSpaceUsageInfo struct {
UserSpace *clitypes.UserSpaceDetail
AccessAmount float64
}


+ 42
- 42
client/internal/ticktock/redundancy_recover.go View File

@@ -21,7 +21,7 @@ import (
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)

func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceLoadInfo) {
func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceUsageInfo) {
switch obj.Object.Redundancy.(type) {
case *clitypes.NoneRedundancy:
if obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold {
@@ -53,7 +53,7 @@ func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj cl
return nil, nil
}

func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
log := logger.WithType[ChangeRedundancy]("TickTock")

var updating *db.UpdatingObjectRedundancy
@@ -145,41 +145,41 @@ func (t *ChangeRedundancy) summaryRepObjectBlockUserSpaces(ctx *changeRedundancy
return ids
}

func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceLoadInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int {
func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyUserSpaces(red.RepCount, sortedUserSpaces)
}

func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceLoadInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int {
func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
}

func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceLoadInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int {
func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
}

func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceLoadInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceLoadInfo, right *userSpaceLoadInfo) int {
func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceUsageInfo {
sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyUserSpaces(segCount, sortedUserSpaces)
}

func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceLoadInfo {
func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
type rechooseUserSpace struct {
*userSpaceLoadInfo
*userSpaceUsageInfo
HasBlock bool
}

@@ -194,8 +194,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext
}

rechooseStgs = append(rechooseStgs, &rechooseUserSpace{
userSpaceLoadInfo: stg,
HasBlock: hasBlock,
userSpaceUsageInfo: stg,
HasBlock: hasBlock,
})
}

@@ -209,12 +209,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo }))
return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
}

func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceLoadInfo {
func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
type rechooseStg struct {
*userSpaceLoadInfo
*userSpaceUsageInfo
CachedBlockIndex int
}

@@ -229,8 +229,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext,
}

rechooseStgs = append(rechooseStgs, &rechooseStg{
userSpaceLoadInfo: stg,
CachedBlockIndex: cachedBlockIndex,
userSpaceUsageInfo: stg,
CachedBlockIndex: cachedBlockIndex,
})
}

@@ -245,12 +245,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext,
})

// TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo }))
return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
}

func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceLoadInfo {
func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
type rechooseStg struct {
*userSpaceLoadInfo
*userSpaceUsageInfo
CachedBlockIndex int
}

@@ -265,8 +265,8 @@ func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext
}

rechooseStgs = append(rechooseStgs, &rechooseStg{
userSpaceLoadInfo: stg,
CachedBlockIndex: cachedBlockIndex,
userSpaceUsageInfo: stg,
CachedBlockIndex: cachedBlockIndex,
})
}

@@ -281,12 +281,12 @@ func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext
})

// TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceLoadInfo { return userspace.userSpaceLoadInfo }))
return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
}

func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLoadInfo) []*userSpaceLoadInfo {
func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceUsageInfo) []*userSpaceUsageInfo {
repeateCount := (count + len(stgs) - 1) / len(stgs)
extendStgs := make([]*userSpaceLoadInfo, repeateCount*len(stgs))
extendStgs := make([]*userSpaceUsageInfo, repeateCount*len(stgs))

// 使用复制的方式将节点数扩充到要求的数量
// 复制之后的结构:ABCD -> AAABBBCCCDDD
@@ -298,7 +298,7 @@ func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLo
}
extendStgs = extendStgs[:count]

var chosen []*userSpaceLoadInfo
var chosen []*userSpaceUsageInfo
for len(chosen) < count {
// 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
chosenLocations := make(map[cortypes.LocationID]bool)
@@ -320,7 +320,7 @@ func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceLo
return chosen
}

func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
if len(obj.Blocks) == 0 {
return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
}
@@ -334,7 +334,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.
}

// 如果选择的备份节点都是同一个,那么就只要上传一次
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })

ft := ioswitch2.NewFromTo()
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace.MasterHub, *srcStg.UserSpace, ioswitch2.RawStream()))
@@ -394,7 +394,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.
}, nil
}

func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
if len(obj.Blocks) == 0 {
return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
}
@@ -479,7 +479,7 @@ func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.O
}, nil
}

func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
if len(obj.Blocks) == 0 {
return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
}
@@ -564,7 +564,7 @@ func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.
nil
}

func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
if len(obj.Blocks) == 0 {
return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
}
@@ -578,7 +578,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.
}

// 如果选择的备份节点都是同一个,那么就只要上传一次
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })

ft := ioswitch2.NewFromTo()
ft.SegmentParam = red
@@ -655,7 +655,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.
nil
}

func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
if len(obj.Blocks) == 0 {
return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
}
@@ -669,7 +669,7 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O
}

// 如果选择的备份节点都是同一个,那么就只要上传一次
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })

ft := ioswitch2.NewFromTo()
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace.MasterHub, *srcStg.UserSpace, ioswitch2.RawStream()))
@@ -731,11 +731,11 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O
nil
}

func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
return t.noneToEC(ctx, obj, red, uploadUserSpaces)
}

func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
var chosenBlocks []clitypes.GrouppedObjectBlock
var chosenBlockIndexes []int
var chosenBlockStg []clitypes.UserSpaceDetail
@@ -765,7 +765,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob
}

// 如果选择的备份节点都是同一个,那么就只要上传一次
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceLoadInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })

planBlder := exec.NewPlanBuilder()
ft := ioswitch2.NewFromTo()
@@ -863,7 +863,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob
nil
}

func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
grpBlocks := obj.GroupBlocks()

var chosenBlocks []clitypes.GrouppedObjectBlock
@@ -1016,7 +1016,7 @@ func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.Obj
nil
}

func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {

blocksGrpByIndex := obj.GroupBlocks()

@@ -1122,7 +1122,7 @@ TODO2 修复这一块的代码
}, nil
}
*/
func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceLoadInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
var chosenBlocks []clitypes.GrouppedObjectBlock
var chosenBlockStg []clitypes.UserSpaceDetail



+ 10
- 10
client/internal/uploader/create_load.go View File

@@ -17,10 +17,10 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
)

type CreateLoadUploader struct {
type CreateUploader struct {
pkg types.Package
targetSpaces []types.UserSpaceDetail
loadRoots []string
copyRoots []string
uploader *Uploader
pubLock *distlock.Mutex
successes []db.AddObjectEntry
@@ -28,12 +28,12 @@ type CreateLoadUploader struct {
commited bool
}

type CreateLoadResult struct {
type CreateUploadResult struct {
Package types.Package
Objects map[string]types.Object
}

func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error {
func (u *CreateUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error {
opt := UploadOption{}
if len(opts) > 0 {
opt = opts[0]
@@ -50,7 +50,7 @@ func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadO
ft.AddFrom(fromExec)
for i, space := range u.targetSpaces {
ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shardInfo"))
ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(u.loadRoots[i], pa)))
ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(u.copyRoots[i], pa)))
spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID)
}

@@ -84,12 +84,12 @@ func (u *CreateLoadUploader) Upload(pa string, stream io.Reader, opts ...UploadO
return nil
}

func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) {
func (u *CreateUploader) Commit() (CreateUploadResult, error) {
u.lock.Lock()
defer u.lock.Unlock()

if u.commited {
return CreateLoadResult{}, fmt.Errorf("package already commited")
return CreateUploadResult{}, fmt.Errorf("package already commited")
}
u.commited = true

@@ -102,10 +102,10 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) {
return err
})
if err != nil {
return CreateLoadResult{}, fmt.Errorf("adding objects: %w", err)
return CreateUploadResult{}, fmt.Errorf("adding objects: %w", err)
}

ret := CreateLoadResult{
ret := CreateUploadResult{
Package: u.pkg,
Objects: make(map[string]types.Object),
}
@@ -117,7 +117,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) {
return ret, nil
}

func (u *CreateLoadUploader) Abort() {
func (u *CreateUploader) Abort() {
u.lock.Lock()
defer u.lock.Unlock()



+ 4
- 4
client/internal/uploader/update.go View File

@@ -23,8 +23,8 @@ type UpdateUploader struct {
pkgID types.PackageID
targetSpace types.UserSpaceDetail
pubLock *distlock.Mutex
loadToSpaces []types.UserSpaceDetail
loadToPath []string
copyToSpaces []types.UserSpaceDetail
copyToPath []string
successes []db.AddObjectEntry
lock sync.Mutex
commited bool
@@ -60,8 +60,8 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOpti
ft.AddFrom(fromExec).
AddTo(ioswitch2.NewToShardStore(*w.targetSpace.MasterHub, w.targetSpace, ioswitch2.RawStream(), "shardInfo"))

for i, space := range w.loadToSpaces {
ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(w.loadToPath[i], pat)))
for i, space := range w.copyToSpaces {
ft.AddTo(ioswitch2.NewToPublicStore(*space.MasterHub, space, path.Join(w.copyToPath[i], pat)))
}

plans := exec.NewPlanBuilder()


+ 14
- 14
client/internal/uploader/uploader.go View File

@@ -43,7 +43,7 @@ func NewUploader(pubLock *distlock.Service, connectivity *connectivity.Collector
}
}

func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, loadTo []clitypes.UserSpaceID, loadToPath []string) (*UpdateUploader, error) {
func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, copyTo []clitypes.UserSpaceID, copyToPath []string) (*UpdateUploader, error) {
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
return nil, fmt.Errorf("getting user space ids: %w", err)
@@ -77,19 +77,19 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS
return nil, fmt.Errorf("user no available userspaces")
}

loadToSpaces := make([]clitypes.UserSpaceDetail, len(loadTo))
for i, spaceID := range loadTo {
copyToSpaces := make([]clitypes.UserSpaceDetail, len(copyTo))
for i, spaceID := range copyTo {
space, ok := lo.Find(spaceDetails, func(space *clitypes.UserSpaceDetail) bool {
return space.UserSpace.UserSpaceID == spaceID
})
if !ok {
return nil, fmt.Errorf("load to storage %v not found", spaceID)
return nil, fmt.Errorf("user space %v not found", spaceID)
}
if space.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", spaceID)
return nil, fmt.Errorf("user space %v has no master hub", spaceID)
}

loadToSpaces[i] = *space
copyToSpaces[i] = *space
}

target := u.chooseUploadStorage(uploadSpaces, affinity)
@@ -105,8 +105,8 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS
pkgID: pkgID,
targetSpace: target.Space,
pubLock: pubLock,
loadToSpaces: loadToSpaces,
loadToPath: loadToPath,
copyToSpaces: copyToSpaces,
copyToPath: copyToPath,
}, nil
}

@@ -133,13 +133,13 @@ func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity c
return spaces[0]
}

func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, loadTo []clitypes.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) {
getSpaces := u.spaceMeta.GetMany(loadTo)
func (u *Uploader) BeginCreateUpload(bktID clitypes.BucketID, pkgName string, copyTo []clitypes.UserSpaceID, copyToPath []string) (*CreateUploader, error) {
getSpaces := u.spaceMeta.GetMany(copyTo)

spacesStgs := make([]clitypes.UserSpaceDetail, len(loadTo))
spacesStgs := make([]clitypes.UserSpaceDetail, len(copyTo))
for i, stg := range getSpaces {
if stg == nil {
return nil, fmt.Errorf("storage %v not found", loadTo[i])
return nil, fmt.Errorf("storage %v not found", copyTo[i])
}
spacesStgs[i] = *stg
}
@@ -165,10 +165,10 @@ func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, load
return nil, fmt.Errorf("acquire lock: %w", err)
}

return &CreateLoadUploader{
return &CreateUploader{
pkg: pkg,
targetSpaces: spacesStgs,
loadRoots: loadToPath,
copyRoots: copyToPath,
uploader: u,
pubLock: lock,
}, nil


+ 2
- 2
client/sdk/api/object.go View File

@@ -92,8 +92,8 @@ type ObjectUpload struct {
type ObjectUploadInfo struct {
PackageID types.PackageID `json:"packageID" binding:"required"`
Affinity types.UserSpaceID `json:"affinity"`
LoadTo []types.UserSpaceID `json:"loadTo"`
LoadToPath []string `json:"loadToPath"`
CopyTo []types.UserSpaceID `json:"copyTo"`
CopyToPath []string `json:"copyToPath"`
}

type UploadingObject struct {


+ 10
- 10
client/sdk/api/package.go View File

@@ -89,25 +89,25 @@ func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) {
return JSONAPI(s.cfg, http.DefaultClient, &req, &PackageCreateResp{})
}

const PackageCreateLoadPath = "/package/createLoad"
const PackageCreateUploadPath = "/package/createUpload"

type PackageCreateLoad struct {
PackageCreateLoadInfo
type PackageCreateUpload struct {
PackageCreateUploadInfo
Files UploadObjectIterator `json:"-"`
}
type PackageCreateLoadInfo struct {
type PackageCreateUploadInfo struct {
BucketID clitypes.BucketID `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
LoadTo []clitypes.UserSpaceID `json:"loadTo"`
LoadToPath []string `json:"loadToPath"`
CopyTo []clitypes.UserSpaceID `json:"copyTo"`
CopyToPath []string `json:"copyToPath"`
}
type PackageCreateLoadResp struct {
type PackageCreateUploadResp struct {
Package clitypes.Package `json:"package"`
Objects []clitypes.Object `json:"objects"`
}

func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadResp, error) {
url, err := url.JoinPath(c.cfg.URL, PackageCreateLoadPath)
func (c *PackageService) CreateUpload(req PackageCreateUpload) (*PackageCreateUploadResp, error) {
url, err := url.JoinPath(c.cfg.URL, PackageCreateUploadPath)
if err != nil {
return nil, err
}
@@ -130,7 +130,7 @@ func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadRe
return nil, err
}

codeResp, err := ParseJSONResponse[response[PackageCreateLoadResp]](resp)
codeResp, err := ParseJSONResponse[response[PackageCreateUploadResp]](resp)
if err != nil {
return nil, err
}


+ 2
- 2
client/sdk/api/presigned.go View File

@@ -64,8 +64,8 @@ type PresignedObjectUpload struct {
PackageID clitypes.PackageID `form:"packageID" binding:"required" url:"packageID"`
Path string `form:"path" binding:"required" url:"path"`
Affinity clitypes.UserSpaceID `form:"affinity" url:"affinity,omitempty"`
LoadTo []clitypes.UserSpaceID `form:"loadTo" url:"loadTo,omitempty"`
LoadToPath []string `form:"loadToPath" url:"loadToPath,omitempty"`
CopyTo []clitypes.UserSpaceID `form:"copyTo" url:"copyTo,omitempty"`
CopyToPath []string `form:"copyToPath" url:"copyToPath,omitempty"`
}

type PresignedObjectUploadResp struct {


+ 1
- 1
client/sdk/api/storage_test.go View File

@@ -168,7 +168,7 @@ func Test_Storage(t *testing.T) {
})
So(err, ShouldBeNil)

_, err = cli.UserSpaceLoadPackage(UserSpaceLoadPackageReq{
_, err = cli.UserSpaceDownloadPackage(UserSpaceDownloadPackageReq{
PackageID: createResp.Package.PackageID,
UserSpaceID: 1,
})


+ 8
- 8
client/sdk/api/userspace.go View File

@@ -7,26 +7,26 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
)

const UserSpaceLoadPackagePath = "/userspace/loadPackage"
const UserSpaceDownloadPackagePath = "/userspace/downloadPackage"

type UserSpaceLoadPackageReq struct {
type UserSpaceDownloadPackageReq struct {
PackageID clitypes.PackageID `json:"packageID" binding:"required"`
UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"`
RootPath string `json:"rootPath"`
}

func (r *UserSpaceLoadPackageReq) MakeParam() *sdks.RequestParam {
return sdks.MakeJSONParam(http.MethodPost, UserSpaceLoadPackagePath, r)
func (r *UserSpaceDownloadPackageReq) MakeParam() *sdks.RequestParam {
return sdks.MakeJSONParam(http.MethodPost, UserSpaceDownloadPackagePath, r)
}

type UserSpaceLoadPackageResp struct{}
type UserSpaceDownloadPackageResp struct{}

func (r *UserSpaceLoadPackageResp) ParseResponse(resp *http.Response) error {
func (r *UserSpaceDownloadPackageResp) ParseResponse(resp *http.Response) error {
return sdks.ParseCodeDataJSONResponse(resp, r)
}

func (c *Client) UserSpaceLoadPackage(req UserSpaceLoadPackageReq) (*UserSpaceLoadPackageResp, error) {
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceLoadPackageResp{})
func (c *Client) UserSpaceDownloadPackage(req UserSpaceDownloadPackageReq) (*UserSpaceDownloadPackageResp, error) {
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceDownloadPackageResp{})
}

const UserSpaceCreatePackagePath = "/userspace/createPackage"


+ 1
- 1
common/pkgs/ioswitch2/parser/opt/multipart.go View File

@@ -9,7 +9,7 @@ import (
)

// 将SegmentJoin指令替换成分片上传指令
func UseMultipartUploadToShardStore(ctx *state.GenerateState) {
func UseMultipartUpcopyToShardStore(ctx *state.GenerateState) {
dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(joinNode *ops2.SegmentJoinNode) bool {
if joinNode.Joined().Dst.Len() != 1 {
return true


+ 1
- 1
common/pkgs/ioswitch2/parser/parser.go View File

@@ -79,7 +79,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
opt.RemoveUnusedFromNode(state)
opt.UseECMultiplier(state)
opt.UseS2STransfer(state)
opt.UseMultipartUploadToShardStore(state)
opt.UseMultipartUpcopyToShardStore(state)
opt.DropUnused(state)
opt.StoreShardWriteResult(state)
opt.GenerateRange(state)


+ 0
- 92
hub/internal/task/cache_move_package.go View File

@@ -1,92 +0,0 @@
package task

/*
import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
)

type CacheMovePackage struct {
userID cdssdk.UserID
packageID cdssdk.PackageID
storageID cdssdk.StorageID
}

func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *CacheMovePackage) do(ctx TaskContext) error {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

store, err := ctx.stgHubs.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}

mutex, err := reqbuilder.NewBuilder().
// 保护解码出来的Object数据
Shard().Buzy(t.storageID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquiring distlock: %w", err)
}
defer mutex.Unlock()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

// TODO 可以考虑优化,比如rep类型的直接pin就可以
objIter := ctx.downloader.DownloadPackage(t.packageID)
defer objIter.Close()

for {
obj, err := objIter.MoveNext()
if err != nil {
if err == iterator.ErrNoMoreItem {
break
}
return err
}
defer obj.File.Close()

_, err = store.Create(obj.File)
if err != nil {
return fmt.Errorf("writing to store: %w", err)
}

ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1)
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, t.storageID))
if err != nil {
return fmt.Errorf("request to coordinator: %w", err)
}

return nil
}
*/

+ 0
- 150
hub/internal/task/create_package.go View File

@@ -1,150 +0,0 @@
package task

/*
import (
"fmt"
"path/filepath"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
)

// CreatePackageResult 定义创建包的结果结构
// 包含包的ID和上传的对象列表
type CreatePackageResult struct {
PackageID cdssdk.PackageID
Objects []cdssdk.Object
}

// CreatePackage 定义创建包的任务结构
// 包含用户ID、存储桶ID、包名称、上传对象的迭代器、节点亲和性以及任务结果
type CreatePackage struct {
userID cdssdk.UserID
bucketID cdssdk.BucketID
name string
objIter iterator.UploadingObjectIterator
stgAffinity cdssdk.StorageID
Result CreatePackageResult
}

// NewCreatePackage 创建一个新的CreatePackage实例
// userID: 用户ID
// bucketID: 存储桶ID
// name: 包名称
// objIter: 上传对象的迭代器
// stgAffinity: 节点亲和性,指定包应该创建在哪个节点上(可选)
// 返回CreatePackage实例的指针
func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, stgAffinity cdssdk.StorageID) *CreatePackage {
return &CreatePackage{
userID: userID,
bucketID: bucketID,
name: name,
objIter: objIter,
stgAffinity: stgAffinity,
}
}

// Execute 执行创建包的任务
// task: 任务实例,携带任务上下文
// ctx: 任务上下文,包含分布式锁和网络连接性等信息
// complete: 任务完成的回调函数
func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
// 获取任务日志记录器
log := logger.WithType[CreatePackage]("Task")

log.Debugf("begin")
defer log.Debugf("end")

// 从MQ池中获取协调器客户端
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
err = fmt.Errorf("new coordinator client: %w", err)
log.Warn(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

// 向协调器创建包
createResp, err := coorCli.CreatePackage(coordinator.NewCreatePackage(t.userID, t.bucketID, t.name))
if err != nil {
err = fmt.Errorf("creating package: %w", err)
log.Error(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

up, err := ctx.uploader.BeginUpdate(t.userID, createResp.Package.PackageID, t.stgAffinity, nil, nil)
if err != nil {
err = fmt.Errorf("begin update: %w", err)
log.Error(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer up.Abort()

for {
obj, err := t.objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
break
}
if err != nil {
log.Error(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
path := filepath.ToSlash(obj.Path)

// 上传对象
err = up.Upload(path, obj.File)
if err != nil {
err = fmt.Errorf("uploading object: %w", err)
log.Error(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

}

// 结束上传
uploadRet, err := up.Commit()
if err != nil {
err = fmt.Errorf("uploading objects: %w", err)
log.Error(err.Error())
// 完成任务并设置移除延迟
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

t.Result.PackageID = createResp.Package.PackageID
t.Result.Objects = lo.Values(uploadRet.Objects)

// 完成任务并设置移除延迟
complete(nil, CompleteOption{
RemovingDelay: time.Minute,
})
}
*/

+ 0
- 49
hub/internal/task/task.go View File

@@ -1,49 +0,0 @@
package task

/*
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/downloader"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/hubpool"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/uploader"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgHubs *hubpool.HubPool
uploader *uploader.Uploader
}

// CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果
type CompleteFn = task.CompleteFn

// Manager 类型代表任务管理器,用于创建、管理和调度任务
type Manager = task.Manager[TaskContext]

// TaskBody 类型定义了任务体,包含了任务的具体执行逻辑
type TaskBody = task.TaskBody[TaskContext]

// Task 类型代表一个具体的任务,包含了任务的上下文、执行体和其它相关信息
type Task = task.Task[TaskContext]

// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgHubs *hubpool.HubPool, uploader *uploader.Uploader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
stgHubs: stgHubs,
uploader: uploader,
})
}
*/

Loading…
Cancel
Save