Browse Source

增加控制调度时文件下载并发数量的参数

master
Sydonian 3 months ago
parent
commit
5a08be6e30
5 changed files with 27 additions and 26 deletions
  1. +2
    -2
      client/internal/http/v1/user_space.go
  2. +13
    -8
      client/internal/services/user_space.go
  3. +1
    -1
      client/sdk/api/v1/storage_test.go
  4. +4
    -3
      client/sdk/api/v1/user_space.go
  5. +7
    -12
      jcsctl/cmd/userspace/getp.go

+ 2
- 2
client/internal/http/v1/user_space.go View File

@@ -25,14 +25,14 @@ func (s *Server) UserSpace() *UserSpaceService {
func (s *UserSpaceService) DownloadPackage(ctx *gin.Context) { func (s *UserSpaceService) DownloadPackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "UserSpace.DownloadPackage") log := logger.WithField("HTTP", "UserSpace.DownloadPackage")


var req cliapi.UserSpaceDownloadPackageReq
var req cliapi.UserSpaceDownloadPackage
if err := ctx.ShouldBindJSON(&req); err != nil { if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument"))
return return
} }


err := s.svc.UserSpaceSvc().DownloadPackage(req.PackageID, req.UserSpaceID, req.RootPath)
err := s.svc.UserSpaceSvc().DownloadPackage(req)
if err != nil { if err != nil {
log.Warnf("downloading package: %s", err.Error()) log.Warnf("downloading package: %s", err.Error())
ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "%v", err)) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "%v", err))


+ 13
- 8
client/internal/services/user_space.go View File

@@ -188,32 +188,37 @@ func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTe
return &cliapi.UserSpaceTestResp{}, nil return &cliapi.UserSpaceTestResp{}, nil
} }


func (svc *UserSpaceService) DownloadPackage(packageID jcstypes.PackageID, userspaceID jcstypes.UserSpaceID, rootPath string) error {
destSpace := svc.UserSpaceMeta.Get(userspaceID)
func (svc *UserSpaceService) DownloadPackage(req cliapi.UserSpaceDownloadPackage) error {
destSpace := svc.UserSpaceMeta.Get(req.UserSpaceID)
if destSpace == nil { if destSpace == nil {
return fmt.Errorf("userspace not found: %d", userspaceID)
return fmt.Errorf("userspace not found: %d", req.UserSpaceID)
} }


details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, req.PackageID)
if err != nil { if err != nil {
return err return err
} }


mutex, err := svc.PubLock.BeginMutex(). mutex, err := svc.PubLock.BeginMutex().
UserSpace().Buzy(userspaceID).End().
UserSpace().Buzy(req.UserSpaceID).End().
Lock() Lock()
if err != nil { if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err) return fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex.Unlock() defer mutex.Unlock()


rootJPath := jcstypes.PathFromJcsPathString(rootPath)
rootJPath := jcstypes.PathFromJcsPathString(req.RootPath)

concy := req.Concurrency
if concy == 0 {
concy = 5
}


dIndex := 0 dIndex := 0
var pinned []jcstypes.PinnedObject var pinned []jcstypes.PinnedObject
for dIndex < len(details) { for dIndex < len(details) {
plans := exec.NewPlanBuilder() plans := exec.NewPlanBuilder()
for i := 0; i < 10 && dIndex < len(details); i++ {
for i := 0; i < concy && dIndex < len(details); i++ {
strg, err := svc.StrategySelector.Select(strategy.Request{ strg, err := svc.StrategySelector.Select(strategy.Request{
Detail: details[dIndex], Detail: details[dIndex],
DestLocation: destSpace.UserSpace.Storage.GetLocation(), DestLocation: destSpace.UserSpace.Storage.GetLocation(),
@@ -280,7 +285,7 @@ func (svc *UserSpaceService) DownloadPackage(packageID jcstypes.PackageID, users


// 记录访问统计 // 记录访问统计
for _, obj := range details { for _, obj := range details {
svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, req.PackageID, req.UserSpaceID, 1)
} }
exeCtx := exec.NewExecContext() exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, svc.StgPool) exec.SetValueByType(exeCtx, svc.StgPool)


+ 1
- 1
client/sdk/api/v1/storage_test.go View File

@@ -169,7 +169,7 @@ func Test_Storage(t *testing.T) {
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)


_, err = cli.UserSpace().DownloadPackage(UserSpaceDownloadPackageReq{
_, err = cli.UserSpace().DownloadPackage(UserSpaceDownloadPackage{
PackageID: createResp.Package.PackageID, PackageID: createResp.Package.PackageID,
UserSpaceID: 1, UserSpaceID: 1,
}) })


+ 4
- 3
client/sdk/api/v1/user_space.go View File

@@ -17,13 +17,14 @@ func (c *Client) UserSpace() *UserSpaceService {


const UserSpaceDownloadPackagePath = "/userSpace/downloadPackage" const UserSpaceDownloadPackagePath = "/userSpace/downloadPackage"


type UserSpaceDownloadPackageReq struct {
type UserSpaceDownloadPackage struct {
PackageID jcstypes.PackageID `json:"packageID" binding:"required"` PackageID jcstypes.PackageID `json:"packageID" binding:"required"`
UserSpaceID jcstypes.UserSpaceID `json:"userSpaceID" binding:"required"` UserSpaceID jcstypes.UserSpaceID `json:"userSpaceID" binding:"required"`
RootPath string `json:"rootPath"` RootPath string `json:"rootPath"`
Concurrency int `json:"concurrency"`
} }


func (r *UserSpaceDownloadPackageReq) MakeParam() *sdks.RequestParam {
func (r *UserSpaceDownloadPackage) MakeParam() *sdks.RequestParam {
return sdks.MakeJSONParam(http.MethodPost, UserSpaceDownloadPackagePath, r) return sdks.MakeJSONParam(http.MethodPost, UserSpaceDownloadPackagePath, r)
} }


@@ -33,7 +34,7 @@ func (r *UserSpaceDownloadPackageResp) ParseResponse(resp *http.Response) error
return sdks.ParseCodeDataJSONResponse(resp, r) return sdks.ParseCodeDataJSONResponse(resp, r)
} }


func (c *UserSpaceService) DownloadPackage(req UserSpaceDownloadPackageReq) (*UserSpaceDownloadPackageResp, error) {
func (c *UserSpaceService) DownloadPackage(req UserSpaceDownloadPackage) (*UserSpaceDownloadPackageResp, error) {
return JSONAPI(&c.cfg, c.httpCli, &req, &UserSpaceDownloadPackageResp{}) return JSONAPI(&c.cfg, c.httpCli, &req, &UserSpaceDownloadPackageResp{})
} }




+ 7
- 12
jcsctl/cmd/userspace/getp.go View File

@@ -11,7 +11,7 @@ import (
) )


func init() { func init() {
var opt option
var opt getpOpt
c := &cobra.Command{ c := &cobra.Command{
Use: "getp <bucket_name>/<package_name> <space_name>:<root_path>", Use: "getp <bucket_name>/<package_name> <space_name>:<root_path>",
Short: "download package all files to user space", Short: "download package all files to user space",
@@ -21,21 +21,15 @@ func init() {
return getp(c, ctx, opt, args) return getp(c, ctx, opt, args)
}, },
} }
// c.Flags().StringVar(&opt.Prefix, "prefix", "", "download objects with this prefix")
// c.Flags().StringVar(&opt.NewPrefix, "new", "", "replace prefix specified by --prefix with this prefix")
// c.Flags().BoolVar(&opt.Zip, "zip", false, "download as zip file")
// c.Flags().StringVarP(&opt.Output, "output", "o", "", "output zip file name")
c.Flags().IntVarP(&opt.Concurrency, "concurrency", "c", 5, "concurrency of download files")
UserSpaceCmd.AddCommand(c) UserSpaceCmd.AddCommand(c)
} }


type option struct {
// Prefix string
// NewPrefix string
// Zip bool
// Output string
type getpOpt struct {
Concurrency int
} }


func getp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string) error {
func getp(c *cobra.Command, ctx *cmd.CommandContext, opt getpOpt, args []string) error {
comps := strings.Split(args[0], "/") comps := strings.Split(args[0], "/")
if len(comps) != 2 { if len(comps) != 2 {
return fmt.Errorf("invalid package name: %s", args[0]) return fmt.Errorf("invalid package name: %s", args[0])
@@ -67,10 +61,11 @@ func getp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string)


startTime := time.Now() startTime := time.Now()


_, err = ctx.Client.UserSpace().DownloadPackage(cliapi.UserSpaceDownloadPackageReq{
_, err = ctx.Client.UserSpace().DownloadPackage(cliapi.UserSpaceDownloadPackage{
PackageID: getPkg.Package.PackageID, PackageID: getPkg.Package.PackageID,
UserSpaceID: getSpace.UserSpace.UserSpaceID, UserSpaceID: getSpace.UserSpace.UserSpaceID,
RootPath: rootPath, RootPath: rootPath,
Concurrency: opt.Concurrency,
}) })
if err != nil { if err != nil {
return fmt.Errorf("download package %v to user space %v: %w", args[0], spaceName, err) return fmt.Errorf("download package %v to user space %v: %w", args[0], spaceName, err)


Loading…
Cancel
Save