diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 1751947..b69a910 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -4,30 +4,30 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/config" - stgmodels "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - Logger logger.Config `json:"logger"` - RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` - Connectivity connectivity.Config `json:"connectivity"` - Downloader downloader.Config `json:"downloader"` - DownloadStrategy strategy.Config `json:"downloadStrategy"` - StorageID cdssdk.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 - AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 - AuthSecretKey string `json:"authSecretKey"` - MaxHTTPBodySize int64 `json:"maxHttpBodySize"` - DB db.Config `json:"db"` + Local types.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + Logger logger.Config `json:"logger"` + RabbitMQ mq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` + Downloader downloader.Config `json:"downloader"` + DownloadStrategy strategy.Config `json:"downloadStrategy"` + StorageID cortypes.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 + AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 + AuthSecretKey string `json:"authSecretKey"` + MaxHTTPBodySize int64 `json:"maxHttpBodySize"` + DB db.Config `json:"db"` } var cfg Config diff --git a/client/internal/db/db.go b/client/internal/db/db.go index 3d0fded..c9d6fbd 100644 --- a/client/internal/db/db.go +++ b/client/internal/db/db.go @@ -29,6 +29,46 @@ func (db *DB) DoTx(do func(tx SQLContext) error) error { }) } +func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}) + return err + }) + return ret, err +} + +func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t) + return err + }) + return ret, err +} + +func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t1, t2) + return err + }) + return ret, err +} + +func DoTx32[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t1, t2, t3) + return err + }) + return ret, err +} + type SQLContext struct { *gorm.DB } diff --git a/client/internal/db/object.go b/client/internal/db/object.go index 236481f..b7b675a 100644 --- a/client/internal/db/object.go +++ b/client/internal/db/object.go @@ -9,6 +9,7 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage2/client/types" ) @@ -584,3 +585,54 @@ func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID types.PackageID, oldPr "Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1), }).Error } + +func (db *ObjectDB) AppendPart(tx SQLContext, block types.ObjectBlock) error { + obj, err := db.Object().GetByID(tx, block.ObjectID) + if err != nil { + return fmt.Errorf("getting object by id: %w", err) + } + + _, ok := obj.Redundancy.(*types.MultipartUploadRedundancy) + if !ok { + return fmt.Errorf("object is not a multipart upload object") + } + + blks, err := db.ObjectBlock().BatchGetByObjectID(tx, []types.ObjectID{obj.ObjectID}) + if err != nil { + return fmt.Errorf("batch getting object blocks: %w", err) + } + + blks = lo.Reject(blks, func(blk types.ObjectBlock, idx int) bool { return blk.Index == block.Index }) + blks = append(blks, block) + + blks = sort2.Sort(blks, func(a, b types.ObjectBlock) int { return a.Index - b.Index }) + + totalSize := int64(0) + var hashes [][]byte + for _, blk := range blks { + totalSize += blk.Size + hashes = append(hashes, blk.FileHash.GetHashBytes()) + } + + newObjHash := types.CalculateCompositeHash(hashes) + obj.Size = totalSize + obj.FileHash = newObjHash + obj.UpdateTime = time.Now() + + err = db.ObjectBlock().DeleteByObjectIDIndex(tx, block.ObjectID, block.Index) + if err != nil { + return fmt.Errorf("delete object block: %w", err) + } + + err = db.ObjectBlock().Create(tx, block.ObjectID, block.Index, block.UserSpaceID, block.FileHash, block.Size) + if err != nil { + return fmt.Errorf("create object block: %w", err) + } + + err = db.Object().BatchUpdate(tx, []types.Object{obj}) + if err != nil { + return fmt.Errorf("update object: %w", err) + } + + return nil +} diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go index 9e728c2..433e6d6 100644 --- a/client/internal/downloader/downloader.go +++ b/client/internal/downloader/downloader.go @@ -7,11 +7,9 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/types" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" ) @@ -64,12 +62,6 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.Ag } func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - objIDs := make([]types.ObjectID, len(reqs)) for i, req := range reqs { objIDs[i] = req.ObjectID @@ -79,19 +71,21 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { return iterator.Empty[*Downloading]() } - // objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs)) - // if err != nil { - // return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) - // } - objDetails, err := d.db.GetObjectDetails(objIDs) + objDetails, err := db.DoTx12(d.db, d.db.Object().BatchGetDetails, objIDs) if err != nil { return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err)) } + detailsMap := make(map[types.ObjectID]*types.ObjectDetail) + for _, detail := range objDetails { + d := detail + detailsMap[detail.Object.ObjectID] = &d + } + req2s := make([]downloadReqeust2, len(reqs)) for i, req := range reqs { req2s[i] = downloadReqeust2{ - Detail: objDetails.Objects[i], + Detail: detailsMap[req.ObjectID], Raw: req, } } @@ -114,19 +108,13 @@ func (d *Downloader) DownloadObjectByDetail(detail types.ObjectDetail, off int64 } func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID)) + details, err := db.DoTx12(d.db, d.db.Object().GetPackageObjectDetails, pkgID) if err != nil { - return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) + return iterator.FuseError[*Downloading](fmt.Errorf("get package object details: %w", err)) } - req2s := make([]downloadReqeust2, len(pkgDetail.Objects)) - for i, objDetail := range pkgDetail.Objects { + req2s := make([]downloadReqeust2, len(details)) + for i, objDetail := range details { dt := objDetail req2s[i] = downloadReqeust2{ Detail: &dt, diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 2046bd6..df90cb1 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -8,23 +8,23 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/client/internal/publics" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage2/common/pkgs/iterator" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -type downloadStorageInfo struct { - Storage stgmod.StorageDetail +type downloadSpaceInfo struct { + Space types.UserSpaceDetail ObjectPinned bool - Blocks []stgmod.ObjectBlock + Blocks []types.ObjectBlock Distance float64 } @@ -59,16 +59,12 @@ func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) { }, nil } - destHub := cdssdk.HubID(0) - if stgglb.Local.HubID != nil { - destHub = *stgglb.Local.HubID - } - + destHub := cortypes.HubID(0) strg, err := i.downloader.selector.Select(strategy.Request{ Detail: *req.Detail, Range: math2.NewRange(req.Raw.Offset, req.Raw.Length), DestHub: destHub, - DestLocation: stgglb.Local.LocationID, + DestLocation: publics.Local.LocationID, }) if err != nil { return nil, fmt.Errorf("selecting download strategy: %w", err) @@ -113,7 +109,7 @@ func (i *DownloadObjectIterator) Close() { } func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) { - logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Storage.Storage.String()) + logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Space.Storage.String()) var strHandle *exec.DriverReadStream ft := ioswitch2.NewFromTo() @@ -127,7 +123,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat toExec.Range.Length = &len } - ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())).AddTo(toExec) + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Space.MasterHub, strg.Space, ioswitch2.RawStream())).AddTo(toExec) strHandle = handle plans := exec.NewPlanBuilder() @@ -150,15 +146,15 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String())) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Spaces[i].Storage.String())) } logger.Debug(logStrs...) downloadBlks := make([]downloadBlock, len(strg.Blocks)) for i, b := range strg.Blocks { downloadBlks[i] = downloadBlock{ - Block: b, - Storage: strg.Storages[i], + Block: b, + Space: strg.Spaces[i], } } diff --git a/client/internal/downloader/lrc.go b/client/internal/downloader/lrc.go index 4a1c007..bdafa9a 100644 --- a/client/internal/downloader/lrc.go +++ b/client/internal/downloader/lrc.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" ) func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, strg strategy.LRCReconstructStrategy) (io.ReadCloser, error) { @@ -18,15 +18,15 @@ func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String())) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Spaces[i].Storage.String())) } logger.Debug(logStrs...) downloadBlks := make([]downloadBlock, len(strg.Blocks)) for i, b := range strg.Blocks { downloadBlks[i] = downloadBlock{ - Block: b, - Storage: strg.Storages[i], + Block: b, + Space: strg.Spaces[i], } } diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index df40aca..26cad4f 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -8,8 +8,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" + cdssdk "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser" ) @@ -98,7 +98,7 @@ func (s *LRCStripIterator) Close() { func (s *LRCStripIterator) downloading() { var froms []ioswitchlrc.From for _, b := range s.blocks { - stg := b.Storage + stg := b.Space froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index)) } diff --git a/client/internal/downloader/strategy/selector.go b/client/internal/downloader/strategy/selector.go index 92570cb..d0964ac 100644 --- a/client/internal/downloader/strategy/selector.go +++ b/client/internal/downloader/strategy/selector.go @@ -8,55 +8,55 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/consts" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Request struct { - Detail stgmod.ObjectDetail + Detail types.ObjectDetail Range math2.Range - DestHub cdssdk.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。 - DestLocation cdssdk.LocationID // 可以为0 + DestHub cortypes.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。 + DestLocation cortypes.LocationID // 可以为0 } type Strategy interface { - GetDetail() stgmod.ObjectDetail + GetDetail() types.ObjectDetail } // 直接下载完整对象 type DirectStrategy struct { - Detail stgmod.ObjectDetail - Storage stgmod.StorageDetail + Detail types.ObjectDetail + Space types.UserSpaceDetail } -func (s *DirectStrategy) GetDetail() stgmod.ObjectDetail { +func (s *DirectStrategy) GetDetail() types.ObjectDetail { return s.Detail } // 从指定对象重建对象 type ECReconstructStrategy struct { - Detail stgmod.ObjectDetail - Redundancy cdssdk.ECRedundancy - Blocks []stgmod.ObjectBlock - Storages []stgmod.StorageDetail + Detail types.ObjectDetail + Redundancy types.ECRedundancy + Blocks []types.ObjectBlock + Spaces []types.UserSpaceDetail } -func (s *ECReconstructStrategy) GetDetail() stgmod.ObjectDetail { +func (s *ECReconstructStrategy) GetDetail() types.ObjectDetail { return s.Detail } type LRCReconstructStrategy struct { - Detail stgmod.ObjectDetail - Redundancy cdssdk.LRCRedundancy - Blocks []stgmod.ObjectBlock - Storages []stgmod.StorageDetail + Detail types.ObjectDetail + Redundancy types.LRCRedundancy + Blocks []types.ObjectBlock + Spaces []types.UserSpaceDetail } -func (s *LRCReconstructStrategy) GetDetail() stgmod.ObjectDetail { +func (s *LRCReconstructStrategy) GetDetail() types.ObjectDetail { return s.Detail } @@ -88,39 +88,39 @@ func (s *Selector) Select(req Request) (Strategy, error) { } switch red := req.Detail.Object.Redundancy.(type) { - case *cdssdk.NoneRedundancy: + case *types.NoneRedundancy: return s.selectForNoneOrRep(req2) - case *cdssdk.RepRedundancy: + case *types.RepRedundancy: return s.selectForNoneOrRep(req2) - case *cdssdk.ECRedundancy: + case *types.ECRedundancy: return s.selectForEC(req2, *red) - case *cdssdk.LRCRedundancy: + case *types.LRCRedundancy: return s.selectForLRC(req2, *red) } return nil, fmt.Errorf("unsupported redundancy type: %v of object %v", reflect.TypeOf(req.Detail.Object.Redundancy), req.Detail.Object.ObjectID) } -type downloadStorageInfo struct { - Storage stgmod.StorageDetail +type downloadSpaceInfo struct { + Space types.UserSpaceDetail ObjectPinned bool - Blocks []stgmod.ObjectBlock + Blocks []types.ObjectBlock Distance float64 } type downloadBlock struct { - Storage stgmod.StorageDetail - Block stgmod.ObjectBlock + Space types.UserSpaceDetail + Block types.ObjectBlock } type request2 struct { - Detail stgmod.ObjectDetail + Detail types.ObjectDetail Range math2.Range - DestHub *cdssdk.Hub - DestLocation cdssdk.LocationID + DestHub *cortypes.Hub + DestLocation cortypes.LocationID } func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) { @@ -135,12 +135,12 @@ func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) { } return &DirectStrategy{ - Detail: req.Detail, - Storage: sortedStgs[0].Storage, + Detail: req.Detail, + Space: sortedStgs[0].Space, }, nil } -func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, error) { +func (s *Selector) selectForEC(req request2, red types.ECRedundancy) (Strategy, error) { sortedStgs := s.sortDownloadStorages(req) if len(sortedStgs) == 0 { return nil, fmt.Errorf("no storage available for download") @@ -150,18 +150,18 @@ func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, osc, stg := s.getMinReadingObjectSolution(sortedStgs, red.K) if bsc < osc { - bs := make([]stgmod.ObjectBlock, len(blocks)) - ss := make([]stgmod.StorageDetail, len(blocks)) + bs := make([]types.ObjectBlock, len(blocks)) + ss := make([]types.UserSpaceDetail, len(blocks)) for i, b := range blocks { bs[i] = b.Block - ss[i] = b.Storage + ss[i] = b.Space } return &ECReconstructStrategy{ Detail: req.Detail, Redundancy: red, Blocks: bs, - Storages: ss, + Spaces: ss, }, nil } @@ -171,12 +171,12 @@ func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, } return &DirectStrategy{ - Detail: req.Detail, - Storage: stg, + Detail: req.Detail, + Space: stg, }, nil } -func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strategy, error) { +func (s *Selector) selectForLRC(req request2, red types.LRCRedundancy) (Strategy, error) { sortedStgs := s.sortDownloadStorages(req) if len(sortedStgs) == 0 { return nil, fmt.Errorf("no storage available for download") @@ -190,8 +190,8 @@ func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strateg continue } blocks = append(blocks, downloadBlock{ - Storage: stg.Storage, - Block: b, + Space: stg.Space, + Block: b, }) selectedBlkIdx[b.Index] = true } @@ -200,78 +200,78 @@ func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strateg return nil, fmt.Errorf("not enough blocks to download lrc object") } - bs := make([]stgmod.ObjectBlock, len(blocks)) - ss := make([]stgmod.StorageDetail, len(blocks)) + bs := make([]types.ObjectBlock, len(blocks)) + ss := make([]types.UserSpaceDetail, len(blocks)) for i, b := range blocks { bs[i] = b.Block - ss[i] = b.Storage + ss[i] = b.Space } return &LRCReconstructStrategy{ Detail: req.Detail, Redundancy: red, Blocks: bs, - Storages: ss, + Spaces: ss, }, nil } -func (s *Selector) sortDownloadStorages(req request2) []*downloadStorageInfo { - var stgIDs []cdssdk.StorageID +func (s *Selector) sortDownloadStorages(req request2) []*downloadSpaceInfo { + var spaceIDs []types.UserSpaceID for _, id := range req.Detail.PinnedAt { - if !lo.Contains(stgIDs, id) { - stgIDs = append(stgIDs, id) + if !lo.Contains(spaceIDs, id) { + spaceIDs = append(spaceIDs, id) } } for _, b := range req.Detail.Blocks { - if !lo.Contains(stgIDs, b.StorageID) { - stgIDs = append(stgIDs, b.StorageID) + if !lo.Contains(spaceIDs, b.UserSpaceID) { + spaceIDs = append(spaceIDs, b.UserSpaceID) } } - downloadStorageMap := make(map[cdssdk.StorageID]*downloadStorageInfo) + downloadSpaceMap := make(map[types.UserSpaceID]*downloadSpaceInfo) for _, id := range req.Detail.PinnedAt { - storage, ok := downloadStorageMap[id] + storage, ok := downloadSpaceMap[id] if !ok { mod := s.storageMeta.Get(id) if mod == nil || mod.MasterHub == nil { continue } - storage = &downloadStorageInfo{ - Storage: *mod, + storage = &downloadSpaceInfo{ + Space: *mod, ObjectPinned: true, Distance: s.getStorageDistance(req, *mod), } - downloadStorageMap[id] = storage + downloadSpaceMap[id] = storage } storage.ObjectPinned = true } for _, b := range req.Detail.Blocks { - storage, ok := downloadStorageMap[b.StorageID] + space, ok := downloadSpaceMap[b.UserSpaceID] if !ok { - mod := s.storageMeta.Get(b.StorageID) + mod := s.storageMeta.Get(b.UserSpaceID) if mod == nil || mod.MasterHub == nil { continue } - storage = &downloadStorageInfo{ - Storage: *mod, + space = &downloadSpaceInfo{ + Space: *mod, Distance: s.getStorageDistance(req, *mod), } - downloadStorageMap[b.StorageID] = storage + downloadSpaceMap[b.UserSpaceID] = space } - storage.Blocks = append(storage.Blocks, b) + space.Blocks = append(space.Blocks, b) } - return sort2.Sort(lo.Values(downloadStorageMap), func(left, right *downloadStorageInfo) int { + return sort2.Sort(lo.Values(downloadSpaceMap), func(left, right *downloadSpaceInfo) int { return sort2.Cmp(left.Distance, right.Distance) }) } -func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) float64 { +func (s *Selector) getStorageDistance(req request2, src types.UserSpaceDetail) float64 { if req.DestHub != nil { if src.MasterHub.HubID == req.DestHub.HubID { return consts.StorageDistanceSameStorage @@ -298,7 +298,7 @@ func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) fl return consts.StorageDistanceOther } -func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, k int) (float64, []downloadBlock) { +func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadSpaceInfo, k int) (float64, []downloadBlock) { gotBlocksMap := bitmap.Bitmap64(0) var gotBlocks []downloadBlock dist := float64(0.0) @@ -306,8 +306,8 @@ func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, for _, b := range n.Blocks { if !gotBlocksMap.Get(b.Index) { gotBlocks = append(gotBlocks, downloadBlock{ - Storage: n.Storage, - Block: b, + Space: n.Space, + Block: b, }) gotBlocksMap.Set(b.Index, true) dist += n.Distance @@ -322,16 +322,16 @@ func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, return math.MaxFloat64, gotBlocks } -func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadStorageInfo, k int) (float64, stgmod.StorageDetail) { +func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadSpaceInfo, k int) (float64, types.UserSpaceDetail) { dist := math.MaxFloat64 - var downloadStg stgmod.StorageDetail + var downloadSpace types.UserSpaceDetail for _, n := range sortedStgs { if n.ObjectPinned && float64(k)*n.Distance < dist { dist = float64(k) * n.Distance - stg := n.Storage - downloadStg = stg + stg := n.Space + downloadSpace = stg } } - return dist, downloadStg + return dist, downloadSpace } diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go index 0ae904f..45aa6f7 100644 --- a/client/internal/downloader/strip_iterator.go +++ b/client/internal/downloader/strip_iterator.go @@ -8,16 +8,15 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" ) type downloadBlock struct { - Storage stgmod.StorageDetail - Block stgmod.ObjectBlock + Space types.UserSpaceDetail + Block types.ObjectBlock } type Strip struct { @@ -27,9 +26,9 @@ type Strip struct { type StripIterator struct { downloader *Downloader - object cdssdk.Object + object types.Object blocks []downloadBlock - red cdssdk.ECRedundancy + red types.ECRedundancy curStripIndex int64 cache *StripCache dataChan chan dataChanEntry @@ -47,7 +46,7 @@ type dataChanEntry struct { Error error } -func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator { +func NewStripIterator(downloader *Downloader, object types.Object, blocks []downloadBlock, red types.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator { if maxPrefetch <= 0 { maxPrefetch = 1 } @@ -202,8 +201,8 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { ft := ioswitch2.NewFromTo() ft.ECParam = &s.red for _, b := range s.blocks { - stg := b.Storage - ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg, ioswitch2.ECStream(b.Block.Index))) + space := b.Space + ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *space.MasterHub, space, ioswitch2.ECStream(b.Block.Index))) } toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{