Browse Source

调整下载器代码

gitlink
Sydonian 7 months ago
parent
commit
0b63bb345d
9 changed files with 224 additions and 149 deletions
  1. +17
    -17
      client/internal/config/config.go
  2. +40
    -0
      client/internal/db/db.go
  3. +52
    -0
      client/internal/db/object.go
  4. +13
    -25
      client/internal/downloader/downloader.go
  5. +14
    -18
      client/internal/downloader/iterator.go
  6. +4
    -4
      client/internal/downloader/lrc.go
  7. +2
    -2
      client/internal/downloader/lrc_strip_iterator.go
  8. +74
    -74
      client/internal/downloader/strategy/selector.go
  9. +8
    -9
      client/internal/downloader/strip_iterator.go

+ 17
- 17
client/internal/config/config.go View File

@@ -4,30 +4,30 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


type Config struct { type Config struct {
Local stgmodels.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
Logger logger.Config `json:"logger"`
RabbitMQ mq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
DownloadStrategy strategy.Config `json:"downloadStrategy"`
StorageID cdssdk.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。
AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法
AuthSecretKey string `json:"authSecretKey"`
MaxHTTPBodySize int64 `json:"maxHttpBodySize"`
DB db.Config `json:"db"`
Local types.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
Logger logger.Config `json:"logger"`
RabbitMQ mq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
DownloadStrategy strategy.Config `json:"downloadStrategy"`
StorageID cortypes.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。
AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法
AuthSecretKey string `json:"authSecretKey"`
MaxHTTPBodySize int64 `json:"maxHttpBodySize"`
DB db.Config `json:"db"`
} }


var cfg Config var cfg Config


+ 40
- 0
client/internal/db/db.go View File

@@ -29,6 +29,46 @@ func (db *DB) DoTx(do func(tx SQLContext) error) error {
}) })
} }


func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx})
return err
})
return ret, err
}

func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t)
return err
})
return ret, err
}

func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t1, t2)
return err
})
return ret, err
}

func DoTx32[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t1, t2, t3)
return err
})
return ret, err
}

type SQLContext struct { type SQLContext struct {
*gorm.DB *gorm.DB
} }


+ 52
- 0
client/internal/db/object.go View File

@@ -9,6 +9,7 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"


"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/client/types"
) )


@@ -584,3 +585,54 @@ func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID types.PackageID, oldPr
"Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1), "Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1),
}).Error }).Error
} }

func (db *ObjectDB) AppendPart(tx SQLContext, block types.ObjectBlock) error {
obj, err := db.Object().GetByID(tx, block.ObjectID)
if err != nil {
return fmt.Errorf("getting object by id: %w", err)
}

_, ok := obj.Redundancy.(*types.MultipartUploadRedundancy)
if !ok {
return fmt.Errorf("object is not a multipart upload object")
}

blks, err := db.ObjectBlock().BatchGetByObjectID(tx, []types.ObjectID{obj.ObjectID})
if err != nil {
return fmt.Errorf("batch getting object blocks: %w", err)
}

blks = lo.Reject(blks, func(blk types.ObjectBlock, idx int) bool { return blk.Index == block.Index })
blks = append(blks, block)

blks = sort2.Sort(blks, func(a, b types.ObjectBlock) int { return a.Index - b.Index })

totalSize := int64(0)
var hashes [][]byte
for _, blk := range blks {
totalSize += blk.Size
hashes = append(hashes, blk.FileHash.GetHashBytes())
}

newObjHash := types.CalculateCompositeHash(hashes)
obj.Size = totalSize
obj.FileHash = newObjHash
obj.UpdateTime = time.Now()

err = db.ObjectBlock().DeleteByObjectIDIndex(tx, block.ObjectID, block.Index)
if err != nil {
return fmt.Errorf("delete object block: %w", err)
}

err = db.ObjectBlock().Create(tx, block.ObjectID, block.Index, block.UserSpaceID, block.FileHash, block.Size)
if err != nil {
return fmt.Errorf("create object block: %w", err)
}

err = db.Object().BatchUpdate(tx, []types.Object{obj})
if err != nil {
return fmt.Errorf("update object: %w", err)
}

return nil
}

+ 13
- 25
client/internal/downloader/downloader.go View File

@@ -7,11 +7,9 @@ import (
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/client/types"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
) )


@@ -64,12 +62,6 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.Ag
} }


func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

objIDs := make([]types.ObjectID, len(reqs)) objIDs := make([]types.ObjectID, len(reqs))
for i, req := range reqs { for i, req := range reqs {
objIDs[i] = req.ObjectID objIDs[i] = req.ObjectID
@@ -79,19 +71,21 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
return iterator.Empty[*Downloading]() return iterator.Empty[*Downloading]()
} }


// objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs))
// if err != nil {
// return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
// }
objDetails, err := d.db.GetObjectDetails(objIDs)
objDetails, err := db.DoTx12(d.db, d.db.Object().BatchGetDetails, objIDs)
if err != nil { if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err)) return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err))
} }


detailsMap := make(map[types.ObjectID]*types.ObjectDetail)
for _, detail := range objDetails {
d := detail
detailsMap[detail.Object.ObjectID] = &d
}

req2s := make([]downloadReqeust2, len(reqs)) req2s := make([]downloadReqeust2, len(reqs))
for i, req := range reqs { for i, req := range reqs {
req2s[i] = downloadReqeust2{ req2s[i] = downloadReqeust2{
Detail: objDetails.Objects[i],
Detail: detailsMap[req.ObjectID],
Raw: req, Raw: req,
} }
} }
@@ -114,19 +108,13 @@ func (d *Downloader) DownloadObjectByDetail(detail types.ObjectDetail, off int64
} }


func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator { func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID))
details, err := db.DoTx12(d.db, d.db.Object().GetPackageObjectDetails, pkgID)
if err != nil { if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
return iterator.FuseError[*Downloading](fmt.Errorf("get package object details: %w", err))
} }


req2s := make([]downloadReqeust2, len(pkgDetail.Objects))
for i, objDetail := range pkgDetail.Objects {
req2s := make([]downloadReqeust2, len(details))
for i, objDetail := range details {
dt := objDetail dt := objDetail
req2s[i] = downloadReqeust2{ req2s[i] = downloadReqeust2{
Detail: &dt, Detail: &dt,


+ 14
- 18
client/internal/downloader/iterator.go View File

@@ -8,23 +8,23 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"


"gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/internal/publics"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage2/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage2/common/pkgs/iterator"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


type downloadStorageInfo struct {
Storage stgmod.StorageDetail
type downloadSpaceInfo struct {
Space types.UserSpaceDetail
ObjectPinned bool ObjectPinned bool
Blocks []stgmod.ObjectBlock
Blocks []types.ObjectBlock
Distance float64 Distance float64
} }


@@ -59,16 +59,12 @@ func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) {
}, nil }, nil
} }


destHub := cdssdk.HubID(0)
if stgglb.Local.HubID != nil {
destHub = *stgglb.Local.HubID
}

destHub := cortypes.HubID(0)
strg, err := i.downloader.selector.Select(strategy.Request{ strg, err := i.downloader.selector.Select(strategy.Request{
Detail: *req.Detail, Detail: *req.Detail,
Range: math2.NewRange(req.Raw.Offset, req.Raw.Length), Range: math2.NewRange(req.Raw.Offset, req.Raw.Length),
DestHub: destHub, DestHub: destHub,
DestLocation: stgglb.Local.LocationID,
DestLocation: publics.Local.LocationID,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("selecting download strategy: %w", err) return nil, fmt.Errorf("selecting download strategy: %w", err)
@@ -113,7 +109,7 @@ func (i *DownloadObjectIterator) Close() {
} }


func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) { func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) {
logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Storage.Storage.String())
logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Space.Storage.String())


var strHandle *exec.DriverReadStream var strHandle *exec.DriverReadStream
ft := ioswitch2.NewFromTo() ft := ioswitch2.NewFromTo()
@@ -127,7 +123,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat
toExec.Range.Length = &len toExec.Range.Length = &len
} }


ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())).AddTo(toExec)
ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Space.MasterHub, strg.Space, ioswitch2.RawStream())).AddTo(toExec)
strHandle = handle strHandle = handle


plans := exec.NewPlanBuilder() plans := exec.NewPlanBuilder()
@@ -150,15 +146,15 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str
logStrs = append(logStrs, ", ") logStrs = append(logStrs, ", ")
} }


logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String()))
logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Spaces[i].Storage.String()))
} }
logger.Debug(logStrs...) logger.Debug(logStrs...)


downloadBlks := make([]downloadBlock, len(strg.Blocks)) downloadBlks := make([]downloadBlock, len(strg.Blocks))
for i, b := range strg.Blocks { for i, b := range strg.Blocks {
downloadBlks[i] = downloadBlock{ downloadBlks[i] = downloadBlock{
Block: b,
Storage: strg.Storages[i],
Block: b,
Space: strg.Spaces[i],
} }
} }




+ 4
- 4
client/internal/downloader/lrc.go View File

@@ -8,7 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
) )


func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, strg strategy.LRCReconstructStrategy) (io.ReadCloser, error) { func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, strg strategy.LRCReconstructStrategy) (io.ReadCloser, error) {
@@ -18,15 +18,15 @@ func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2,
logStrs = append(logStrs, ", ") logStrs = append(logStrs, ", ")
} }


logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String()))
logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Spaces[i].Storage.String()))
} }
logger.Debug(logStrs...) logger.Debug(logStrs...)


downloadBlks := make([]downloadBlock, len(strg.Blocks)) downloadBlks := make([]downloadBlock, len(strg.Blocks))
for i, b := range strg.Blocks { for i, b := range strg.Blocks {
downloadBlks[i] = downloadBlock{ downloadBlks[i] = downloadBlock{
Block: b,
Storage: strg.Storages[i],
Block: b,
Space: strg.Spaces[i],
} }
} }




+ 2
- 2
client/internal/downloader/lrc_strip_iterator.go View File

@@ -8,8 +8,8 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
cdssdk "gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser"
) )
@@ -98,7 +98,7 @@ func (s *LRCStripIterator) Close() {
func (s *LRCStripIterator) downloading() { func (s *LRCStripIterator) downloading() {
var froms []ioswitchlrc.From var froms []ioswitchlrc.From
for _, b := range s.blocks { for _, b := range s.blocks {
stg := b.Storage
stg := b.Space
froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index)) froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index))
} }




+ 74
- 74
client/internal/downloader/strategy/selector.go View File

@@ -8,55 +8,55 @@ import (


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/bitmap"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/consts" "gitlink.org.cn/cloudream/storage2/common/consts"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


type Request struct { type Request struct {
Detail stgmod.ObjectDetail
Detail types.ObjectDetail
Range math2.Range Range math2.Range
DestHub cdssdk.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。
DestLocation cdssdk.LocationID // 可以为0
DestHub cortypes.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。
DestLocation cortypes.LocationID // 可以为0
} }


type Strategy interface { type Strategy interface {
GetDetail() stgmod.ObjectDetail
GetDetail() types.ObjectDetail
} }


// 直接下载完整对象 // 直接下载完整对象
type DirectStrategy struct { type DirectStrategy struct {
Detail stgmod.ObjectDetail
Storage stgmod.StorageDetail
Detail types.ObjectDetail
Space types.UserSpaceDetail
} }


func (s *DirectStrategy) GetDetail() stgmod.ObjectDetail {
func (s *DirectStrategy) GetDetail() types.ObjectDetail {
return s.Detail return s.Detail
} }


// 从指定对象重建对象 // 从指定对象重建对象
type ECReconstructStrategy struct { type ECReconstructStrategy struct {
Detail stgmod.ObjectDetail
Redundancy cdssdk.ECRedundancy
Blocks []stgmod.ObjectBlock
Storages []stgmod.StorageDetail
Detail types.ObjectDetail
Redundancy types.ECRedundancy
Blocks []types.ObjectBlock
Spaces []types.UserSpaceDetail
} }


func (s *ECReconstructStrategy) GetDetail() stgmod.ObjectDetail {
func (s *ECReconstructStrategy) GetDetail() types.ObjectDetail {
return s.Detail return s.Detail
} }


type LRCReconstructStrategy struct { type LRCReconstructStrategy struct {
Detail stgmod.ObjectDetail
Redundancy cdssdk.LRCRedundancy
Blocks []stgmod.ObjectBlock
Storages []stgmod.StorageDetail
Detail types.ObjectDetail
Redundancy types.LRCRedundancy
Blocks []types.ObjectBlock
Spaces []types.UserSpaceDetail
} }


func (s *LRCReconstructStrategy) GetDetail() stgmod.ObjectDetail {
func (s *LRCReconstructStrategy) GetDetail() types.ObjectDetail {
return s.Detail return s.Detail
} }


@@ -88,39 +88,39 @@ func (s *Selector) Select(req Request) (Strategy, error) {
} }


switch red := req.Detail.Object.Redundancy.(type) { switch red := req.Detail.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
case *types.NoneRedundancy:
return s.selectForNoneOrRep(req2) return s.selectForNoneOrRep(req2)


case *cdssdk.RepRedundancy:
case *types.RepRedundancy:
return s.selectForNoneOrRep(req2) return s.selectForNoneOrRep(req2)


case *cdssdk.ECRedundancy:
case *types.ECRedundancy:
return s.selectForEC(req2, *red) return s.selectForEC(req2, *red)


case *cdssdk.LRCRedundancy:
case *types.LRCRedundancy:
return s.selectForLRC(req2, *red) return s.selectForLRC(req2, *red)
} }


return nil, fmt.Errorf("unsupported redundancy type: %v of object %v", reflect.TypeOf(req.Detail.Object.Redundancy), req.Detail.Object.ObjectID) return nil, fmt.Errorf("unsupported redundancy type: %v of object %v", reflect.TypeOf(req.Detail.Object.Redundancy), req.Detail.Object.ObjectID)
} }


type downloadStorageInfo struct {
Storage stgmod.StorageDetail
type downloadSpaceInfo struct {
Space types.UserSpaceDetail
ObjectPinned bool ObjectPinned bool
Blocks []stgmod.ObjectBlock
Blocks []types.ObjectBlock
Distance float64 Distance float64
} }


type downloadBlock struct { type downloadBlock struct {
Storage stgmod.StorageDetail
Block stgmod.ObjectBlock
Space types.UserSpaceDetail
Block types.ObjectBlock
} }


type request2 struct { type request2 struct {
Detail stgmod.ObjectDetail
Detail types.ObjectDetail
Range math2.Range Range math2.Range
DestHub *cdssdk.Hub
DestLocation cdssdk.LocationID
DestHub *cortypes.Hub
DestLocation cortypes.LocationID
} }


func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) { func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) {
@@ -135,12 +135,12 @@ func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) {
} }


return &DirectStrategy{ return &DirectStrategy{
Detail: req.Detail,
Storage: sortedStgs[0].Storage,
Detail: req.Detail,
Space: sortedStgs[0].Space,
}, nil }, nil
} }


func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, error) {
func (s *Selector) selectForEC(req request2, red types.ECRedundancy) (Strategy, error) {
sortedStgs := s.sortDownloadStorages(req) sortedStgs := s.sortDownloadStorages(req)
if len(sortedStgs) == 0 { if len(sortedStgs) == 0 {
return nil, fmt.Errorf("no storage available for download") return nil, fmt.Errorf("no storage available for download")
@@ -150,18 +150,18 @@ func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy,
osc, stg := s.getMinReadingObjectSolution(sortedStgs, red.K) osc, stg := s.getMinReadingObjectSolution(sortedStgs, red.K)


if bsc < osc { if bsc < osc {
bs := make([]stgmod.ObjectBlock, len(blocks))
ss := make([]stgmod.StorageDetail, len(blocks))
bs := make([]types.ObjectBlock, len(blocks))
ss := make([]types.UserSpaceDetail, len(blocks))
for i, b := range blocks { for i, b := range blocks {
bs[i] = b.Block bs[i] = b.Block
ss[i] = b.Storage
ss[i] = b.Space
} }


return &ECReconstructStrategy{ return &ECReconstructStrategy{
Detail: req.Detail, Detail: req.Detail,
Redundancy: red, Redundancy: red,
Blocks: bs, Blocks: bs,
Storages: ss,
Spaces: ss,
}, nil }, nil
} }


@@ -171,12 +171,12 @@ func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy,
} }


return &DirectStrategy{ return &DirectStrategy{
Detail: req.Detail,
Storage: stg,
Detail: req.Detail,
Space: stg,
}, nil }, nil
} }


func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strategy, error) {
func (s *Selector) selectForLRC(req request2, red types.LRCRedundancy) (Strategy, error) {
sortedStgs := s.sortDownloadStorages(req) sortedStgs := s.sortDownloadStorages(req)
if len(sortedStgs) == 0 { if len(sortedStgs) == 0 {
return nil, fmt.Errorf("no storage available for download") return nil, fmt.Errorf("no storage available for download")
@@ -190,8 +190,8 @@ func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strateg
continue continue
} }
blocks = append(blocks, downloadBlock{ blocks = append(blocks, downloadBlock{
Storage: stg.Storage,
Block: b,
Space: stg.Space,
Block: b,
}) })
selectedBlkIdx[b.Index] = true selectedBlkIdx[b.Index] = true
} }
@@ -200,78 +200,78 @@ func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strateg
return nil, fmt.Errorf("not enough blocks to download lrc object") return nil, fmt.Errorf("not enough blocks to download lrc object")
} }


bs := make([]stgmod.ObjectBlock, len(blocks))
ss := make([]stgmod.StorageDetail, len(blocks))
bs := make([]types.ObjectBlock, len(blocks))
ss := make([]types.UserSpaceDetail, len(blocks))
for i, b := range blocks { for i, b := range blocks {
bs[i] = b.Block bs[i] = b.Block
ss[i] = b.Storage
ss[i] = b.Space
} }


return &LRCReconstructStrategy{ return &LRCReconstructStrategy{
Detail: req.Detail, Detail: req.Detail,
Redundancy: red, Redundancy: red,
Blocks: bs, Blocks: bs,
Storages: ss,
Spaces: ss,
}, nil }, nil
} }


func (s *Selector) sortDownloadStorages(req request2) []*downloadStorageInfo {
var stgIDs []cdssdk.StorageID
func (s *Selector) sortDownloadStorages(req request2) []*downloadSpaceInfo {
var spaceIDs []types.UserSpaceID
for _, id := range req.Detail.PinnedAt { for _, id := range req.Detail.PinnedAt {
if !lo.Contains(stgIDs, id) {
stgIDs = append(stgIDs, id)
if !lo.Contains(spaceIDs, id) {
spaceIDs = append(spaceIDs, id)
} }
} }
for _, b := range req.Detail.Blocks { for _, b := range req.Detail.Blocks {
if !lo.Contains(stgIDs, b.StorageID) {
stgIDs = append(stgIDs, b.StorageID)
if !lo.Contains(spaceIDs, b.UserSpaceID) {
spaceIDs = append(spaceIDs, b.UserSpaceID)
} }
} }


downloadStorageMap := make(map[cdssdk.StorageID]*downloadStorageInfo)
downloadSpaceMap := make(map[types.UserSpaceID]*downloadSpaceInfo)
for _, id := range req.Detail.PinnedAt { for _, id := range req.Detail.PinnedAt {
storage, ok := downloadStorageMap[id]
storage, ok := downloadSpaceMap[id]
if !ok { if !ok {
mod := s.storageMeta.Get(id) mod := s.storageMeta.Get(id)
if mod == nil || mod.MasterHub == nil { if mod == nil || mod.MasterHub == nil {
continue continue
} }


storage = &downloadStorageInfo{
Storage: *mod,
storage = &downloadSpaceInfo{
Space: *mod,
ObjectPinned: true, ObjectPinned: true,
Distance: s.getStorageDistance(req, *mod), Distance: s.getStorageDistance(req, *mod),
} }
downloadStorageMap[id] = storage
downloadSpaceMap[id] = storage
} }


storage.ObjectPinned = true storage.ObjectPinned = true
} }


for _, b := range req.Detail.Blocks { for _, b := range req.Detail.Blocks {
storage, ok := downloadStorageMap[b.StorageID]
space, ok := downloadSpaceMap[b.UserSpaceID]
if !ok { if !ok {
mod := s.storageMeta.Get(b.StorageID)
mod := s.storageMeta.Get(b.UserSpaceID)
if mod == nil || mod.MasterHub == nil { if mod == nil || mod.MasterHub == nil {
continue continue
} }


storage = &downloadStorageInfo{
Storage: *mod,
space = &downloadSpaceInfo{
Space: *mod,
Distance: s.getStorageDistance(req, *mod), Distance: s.getStorageDistance(req, *mod),
} }
downloadStorageMap[b.StorageID] = storage
downloadSpaceMap[b.UserSpaceID] = space
} }


storage.Blocks = append(storage.Blocks, b)
space.Blocks = append(space.Blocks, b)
} }


return sort2.Sort(lo.Values(downloadStorageMap), func(left, right *downloadStorageInfo) int {
return sort2.Sort(lo.Values(downloadSpaceMap), func(left, right *downloadSpaceInfo) int {
return sort2.Cmp(left.Distance, right.Distance) return sort2.Cmp(left.Distance, right.Distance)
}) })
} }


func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) float64 {
func (s *Selector) getStorageDistance(req request2, src types.UserSpaceDetail) float64 {
if req.DestHub != nil { if req.DestHub != nil {
if src.MasterHub.HubID == req.DestHub.HubID { if src.MasterHub.HubID == req.DestHub.HubID {
return consts.StorageDistanceSameStorage return consts.StorageDistanceSameStorage
@@ -298,7 +298,7 @@ func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) fl
return consts.StorageDistanceOther return consts.StorageDistanceOther
} }


func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, k int) (float64, []downloadBlock) {
func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadSpaceInfo, k int) (float64, []downloadBlock) {
gotBlocksMap := bitmap.Bitmap64(0) gotBlocksMap := bitmap.Bitmap64(0)
var gotBlocks []downloadBlock var gotBlocks []downloadBlock
dist := float64(0.0) dist := float64(0.0)
@@ -306,8 +306,8 @@ func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo,
for _, b := range n.Blocks { for _, b := range n.Blocks {
if !gotBlocksMap.Get(b.Index) { if !gotBlocksMap.Get(b.Index) {
gotBlocks = append(gotBlocks, downloadBlock{ gotBlocks = append(gotBlocks, downloadBlock{
Storage: n.Storage,
Block: b,
Space: n.Space,
Block: b,
}) })
gotBlocksMap.Set(b.Index, true) gotBlocksMap.Set(b.Index, true)
dist += n.Distance dist += n.Distance
@@ -322,16 +322,16 @@ func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo,
return math.MaxFloat64, gotBlocks return math.MaxFloat64, gotBlocks
} }


func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadStorageInfo, k int) (float64, stgmod.StorageDetail) {
func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadSpaceInfo, k int) (float64, types.UserSpaceDetail) {
dist := math.MaxFloat64 dist := math.MaxFloat64
var downloadStg stgmod.StorageDetail
var downloadSpace types.UserSpaceDetail
for _, n := range sortedStgs { for _, n := range sortedStgs {
if n.ObjectPinned && float64(k)*n.Distance < dist { if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance dist = float64(k) * n.Distance
stg := n.Storage
downloadStg = stg
stg := n.Space
downloadSpace = stg
} }
} }


return dist, downloadStg
return dist, downloadSpace
} }

+ 8
- 9
client/internal/downloader/strip_iterator.go View File

@@ -8,16 +8,15 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
) )


type downloadBlock struct { type downloadBlock struct {
Storage stgmod.StorageDetail
Block stgmod.ObjectBlock
Space types.UserSpaceDetail
Block types.ObjectBlock
} }


type Strip struct { type Strip struct {
@@ -27,9 +26,9 @@ type Strip struct {


type StripIterator struct { type StripIterator struct {
downloader *Downloader downloader *Downloader
object cdssdk.Object
object types.Object
blocks []downloadBlock blocks []downloadBlock
red cdssdk.ECRedundancy
red types.ECRedundancy
curStripIndex int64 curStripIndex int64
cache *StripCache cache *StripCache
dataChan chan dataChanEntry dataChan chan dataChanEntry
@@ -47,7 +46,7 @@ type dataChanEntry struct {
Error error Error error
} }


func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
func NewStripIterator(downloader *Downloader, object types.Object, blocks []downloadBlock, red types.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
if maxPrefetch <= 0 { if maxPrefetch <= 0 {
maxPrefetch = 1 maxPrefetch = 1
} }
@@ -202,8 +201,8 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
ft := ioswitch2.NewFromTo() ft := ioswitch2.NewFromTo()
ft.ECParam = &s.red ft.ECParam = &s.red
for _, b := range s.blocks { for _, b := range s.blocks {
stg := b.Storage
ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg, ioswitch2.ECStream(b.Block.Index)))
space := b.Space
ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *space.MasterHub, space, ioswitch2.ECStream(b.Block.Index)))
} }


toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{ toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{


Loading…
Cancel
Save