diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index cbbfcbf..e5aa0a8 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -39,5 +39,7 @@ func init() { parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackageRedundancy, myreflect.TypeNameOf[scevt.CheckPackageRedundancy]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewCleanPinned, myreflect.TypeNameOf[scevt.CleanPinned]()) + commands.MustAdd(ScannerPostEvent, "scanner", "event") } diff --git a/common/models/models.go b/common/models/models.go index 6430e7b..f3f56bb 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -13,14 +13,16 @@ type ObjectBlock struct { } type ObjectDetail struct { - Object cdssdk.Object `json:"object"` - Blocks []ObjectBlock `json:"blocks"` + Object cdssdk.Object `json:"object"` + PinnedAt []cdssdk.NodeID `json:"pinnedAt"` + Blocks []ObjectBlock `json:"blocks"` } -func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlock) ObjectDetail { +func NewObjectDetail(object cdssdk.Object, pinnedAt []cdssdk.NodeID, blocks []ObjectBlock) ObjectDetail { return ObjectDetail{ - Object: object, - Blocks: blocks, + Object: object, + PinnedAt: pinnedAt, + Blocks: blocks, } } diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 9816f78..af4369e 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -6,6 +6,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -94,6 +95,38 @@ func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ( return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err } +func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { + var objs []model.TempObject + err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) + if err != nil { + return nil, fmt.Errorf("getting objects: %w", err) + } + + rets := make([]stgmod.ObjectDetail, 0, len(objs)) + + for _, obj := range objs { + var blocks []stgmod.ObjectBlock + err = sqlx.Select(ctx, + &blocks, + "select * from ObjectBlock where ObjectID = ? order by `Index`", + obj.ObjectID, + ) + if err != nil { + return nil, err + } + + var pinnedAt []cdssdk.NodeID + err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID) + if err != nil { + return nil, err + } + + rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks)) + } + + return rets, nil +} + func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { objIDs := make([]cdssdk.ObjectID, 0, len(objs)) for _, obj := range objs { @@ -151,7 +184,6 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb } for _, block := range obj.Blocks { - // 首次上传默认使用不分块的rep模式 err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash) if err != nil { return fmt.Errorf("creating object block: %w", err) @@ -163,6 +195,11 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb return fmt.Errorf("creating cache: %w", err) } } + + err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt) + if err != nil { + return fmt.Errorf("creating pinned object: %w", err) + } } return nil diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 9abcc76..e560e03 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -2,14 +2,12 @@ package db import ( "database/sql" - "fmt" "strconv" "strings" "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) type ObjectBlockDB struct { @@ -73,32 +71,6 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in return cnt, err } -func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { - var objs []model.TempObject - err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) - if err != nil { - return nil, fmt.Errorf("getting objects: %w", err) - } - - rets := make([]stgmod.ObjectDetail, 0, len(objs)) - - for _, obj := range objs { - var blocks []stgmod.ObjectBlock - err = sqlx.Select(ctx, - &blocks, - "select * from ObjectBlock where ObjectID = ? order by `Index`", - obj.ObjectID, - ) - if err != nil { - return nil, err - } - - rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), blocks)) - } - - return rets, nil -} - // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 func splitConcatedNodeID(idStr string) []cdssdk.NodeID { diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 1e71872..026e5ca 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -34,6 +34,11 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds return err } +func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { + _, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime) + return err +} + func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { _, err := ctx.Exec( "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", @@ -44,6 +49,16 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag return err } +func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { + for _, id := range nodeIDs { + err := db.TryCreate(ctx, id, objectID, time.Now()) + if err != nil { + return err + } + } + return nil +} + func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error { _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID) return err diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 543d4f4..6c665d6 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -83,6 +83,7 @@ type ChangeObjectRedundancyResp struct { type ChangeObjectRedundancyEntry struct { ObjectID cdssdk.ObjectID `json:"objectID"` Redundancy cdssdk.Redundancy `json:"redundancy"` + PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } diff --git a/common/pkgs/mq/scanner/event/clean_pinned.go b/common/pkgs/mq/scanner/event/clean_pinned.go new file mode 100644 index 0000000..a2ce976 --- /dev/null +++ b/common/pkgs/mq/scanner/event/clean_pinned.go @@ -0,0 +1,18 @@ +package event + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type CleanPinned struct { + EventBase + PackageID cdssdk.PackageID `json:"nodeID"` +} + +func NewCleanPinned(packageID cdssdk.PackageID) *CleanPinned { + return &CleanPinned{ + PackageID: packageID, + } +} + +func init() { + Register[*CleanPinned]() +} diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 876644c..2af1ccc 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -35,7 +35,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return fmt.Errorf("getting package by id: %w", err) } - details, err = svc.db.ObjectBlock().GetPackageBlockDetails(tx, msg.PackageID) + details, err = svc.db.Object().GetPackageObjectDetails(tx, msg.PackageID) if err != nil { return fmt.Errorf("getting package block details: %w", err) } diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index d7bf332..ddab4ee 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -131,7 +131,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } // 这个函数只是统计哪些节点缓存了Package中的数据,不需要多么精确,所以可以不用事务 - objDetails, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID) + objDetails, err := svc.db.Object().GetPackageObjectDetails(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get package block details: %s", err.Error()) diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go new file mode 100644 index 0000000..fbe15f1 --- /dev/null +++ b/scanner/internal/event/clean_pinned.go @@ -0,0 +1,754 @@ +package event + +import ( + "fmt" + "math" + "math/rand" + "strconv" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + mymath "gitlink.org.cn/cloudream/common/utils/math" + myref "gitlink.org.cn/cloudream/common/utils/reflect" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" +) + +type CleanPinned struct { + *scevt.CleanPinned +} + +func NewCleanPinned(evt *scevt.CleanPinned) *CleanPinned { + return &CleanPinned{ + CleanPinned: evt, + } +} + +func (t *CleanPinned) TryMerge(other Event) bool { + event, ok := other.(*CleanPinned) + if !ok { + return false + } + + return t.PackageID == event.PackageID +} + +func (t *CleanPinned) Execute(execCtx ExecuteContext) { + log := logger.WithType[CleanPinned]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t.CleanPinned)) + defer log.Debugf("end") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + log.Warnf("new coordinator client: %s", err.Error()) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package objects: %s", err.Error()) + return + } + + getLoadLog, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package load log details: %s", err.Error()) + return + } + readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID }) + + var changeRedEntries []coormq.ChangeObjectRedundancyEntry + for _, obj := range getObjs.Objects { + entry, err := t.doOne(execCtx, readerNodeIDs, coorCli, obj) + if err != nil { + log.WithField("PackageID", obj).Warn(err.Error()) + continue + } + if entry != nil { + changeRedEntries = append(changeRedEntries, *entry) + } + } + + if len(changeRedEntries) > 0 { + _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changeRedEntries)) + if err != nil { + log.Warnf("changing object redundancy: %s", err.Error()) + return + } + } +} + +type doingContext struct { + execCtx ExecuteContext + readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 + nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 + nodeInfos map[cdssdk.NodeID]*model.Node + blockList []objectBlock // 排序后的块分布情况 + nodeBlockBitmaps map[cdssdk.NodeID]*bitmap // 用位图的形式表示每一个节点上有哪些块 + allBlockTypeCount int // object总共被分成了几块 + minBlockTypeCount int // 最少要几块才能恢复出完整的object + nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 + + maxScore float64 // 搜索过程中得到过的最大分数 + maxScoreRmBlocks []bool // 最大分数对应的删除方案 + + rmBlocks []bool // 当前删除方案 + inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的 + lastScore float64 // 上一次方案的分数 +} + +type objectBlock struct { + Index int + NodeID cdssdk.NodeID + HasEntity bool // 节点拥有实际的文件数据块 + HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块 + FileHash string // 只有在拥有实际文件数据块时,这个字段才有值 +} + +type nodeDist struct { + NodeID cdssdk.NodeID + Distance float64 +} + +type combinatorialTree struct { + nodes []combinatorialTreeNode + blocksMaps map[int]bitmap + nodeIDToLocalNodeID map[cdssdk.NodeID]int + localNodeIDToNodeID []cdssdk.NodeID +} + +const ( + iterActionNone = 0 + iterActionSkip = 1 + iterActionBreak = 2 +) + +func newCombinatorialTree(nodeBlocksMaps map[cdssdk.NodeID]*bitmap) combinatorialTree { + tree := combinatorialTree{ + blocksMaps: make(map[int]bitmap), + nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), + } + + tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMaps))) + for id, mp := range nodeBlocksMaps { + tree.nodeIDToLocalNodeID[id] = len(tree.localNodeIDToNodeID) + tree.blocksMaps[len(tree.localNodeIDToNodeID)] = *mp + tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, id) + } + + tree.nodes[0].localNodeID = -1 + index := 1 + tree.initNode(0, &tree.nodes[0], &index) + + return tree +} + +func (t *combinatorialTree) initNode(minAvaiLocalNodeID int, parent *combinatorialTreeNode, index *int) { + for i := minAvaiLocalNodeID; i < len(t.nodeIDToLocalNodeID); i++ { + curIndex := *index + *index++ + bitMp := t.blocksMaps[i] + bitMp.Or(&parent.blocksBitmap) + + t.nodes[curIndex] = combinatorialTreeNode{ + localNodeID: i, + parent: parent, + blocksBitmap: bitMp, + } + t.initNode(i+1, &t.nodes[curIndex], index) + } +} + +// 获得索引指定的节点所在的层 +func (t *combinatorialTree) GetDepth(index int) int { + depth := 0 + + // 反复判断节点在哪个子树。从左到右,子树节点的数量呈现8 4 2的变化,由此可以得到每个子树的索引值的范围 + subTreeCount := 1 << len(t.nodeIDToLocalNodeID) + for index > 0 { + if index < subTreeCount { + // 定位到一个子树后,深度+1,然后进入这个子树,使用同样的方法再进行定位。 + // 进入子树后需要将索引值-1,因为要去掉子树的根节点 + index-- + depth++ + } else { + // 如果索引值不在这个子树范围内,则将值减去子树的节点数量, + // 这样每一次都可以视为使用同样的逻辑对不同大小的树进行判断。 + index -= subTreeCount + } + subTreeCount >>= 1 + } + + return depth +} + +// 更新某一个算力中心节点的块分布位图,同时更新它对应组合树节点的所有子节点。 +// 如果更新到某个节点时,已有K个块,那么就不会再更新它的子节点 +func (t *combinatorialTree) UpdateBitmap(nodeID cdssdk.NodeID, mp bitmap, k int) { + t.blocksMaps[t.nodeIDToLocalNodeID[nodeID]] = mp + // 首先定义两种遍历树节点时的移动方式: + // 1. 竖直移动(深度增加):从一个节点移动到它最左边的子节点。每移动一步,index+1 + // 2. 水平移动:从一个节点移动到它右边的兄弟节点。每移动一步,根据它所在的深度,index+8,+4,+2 + // LocalNodeID从0开始,将其+1后得到移动步数steps。 + // 将移动步数拆成多部分,分配到上述的两种移动方式上,并进行任意组合,且保证第一次为至少进行一次的竖直移动,移动之后的节点都会是同一个计算中心节点。 + steps := t.nodeIDToLocalNodeID[nodeID] + 1 + for d := 1; d <= steps; d++ { + t.iterCombBits(len(t.nodeIDToLocalNodeID)-1, steps-d, 0, func(i int) { + index := d + i + node := &t.nodes[index] + + newMp := t.blocksMaps[node.localNodeID] + newMp.Or(&node.parent.blocksBitmap) + node.blocksBitmap = newMp + if newMp.Weight() >= k { + return + } + + t.iterChildren(index, func(index, parentIndex, depth int) int { + curNode := &t.nodes[index] + parentNode := t.nodes[parentIndex] + + newMp := t.blocksMaps[curNode.localNodeID] + newMp.Or(&parentNode.blocksBitmap) + curNode.blocksBitmap = newMp + if newMp.Weight() >= k { + return iterActionSkip + } + + return iterActionNone + }) + }) + } +} + +// 遍历树,找到至少拥有K个块的树节点的最大深度 +func (t *combinatorialTree) FindKBlocksMaxDepth(k int) int { + maxDepth := -1 + t.iterChildren(0, func(index, parentIndex, depth int) int { + if t.nodes[index].blocksBitmap.Weight() >= k { + if maxDepth < depth { + maxDepth = depth + } + return iterActionSkip + } + // 如果到了叶子节点,还没有找到K个块,那就认为要满足K个块,至少需要再多一个节点,即深度+1。 + // 由于遍历时采用的是深度优先的算法,因此遍历到这个叶子节点时,叶子节点再加一个节点的组合已经在前面搜索过, + // 所以用当前叶子节点深度+1来作为当前分支的结果就可以,即使当前情况下增加任意一个节点依然不够K块, + // 可以使用同样的思路去递推到当前叶子节点增加两个块的情况。 + if t.nodes[index].localNodeID == len(t.nodeIDToLocalNodeID)-1 { + if maxDepth < depth+1 { + maxDepth = depth + 1 + } + } + + return iterActionNone + }) + + if maxDepth == -1 || maxDepth > len(t.nodeIDToLocalNodeID) { + return len(t.nodeIDToLocalNodeID) + } + + return maxDepth +} + +func (t *combinatorialTree) iterCombBits(width int, count int, offset int, callback func(int)) { + if count == 0 { + callback(offset) + return + } + + for b := width; b >= count; b-- { + t.iterCombBits(b-1, count-1, offset+(1<>= 1 + } +} + +func (t *combinatorialTree) itering(index int, parentIndex int, depth int, do func(index int, parentIndex int, depth int) int) int { + act := do(index, parentIndex, depth) + if act == iterActionBreak { + return act + } + if act == iterActionSkip { + return iterActionNone + } + + curNode := &t.nodes[index] + childIndex := index + 1 + + childCounts := len(t.nodeIDToLocalNodeID) - 1 - curNode.localNodeID + if childCounts == 0 { + return iterActionNone + } + + childTreeNodeCnt := 1 << (childCounts - 1) + for c := 0; c < childCounts; c++ { + act = t.itering(childIndex, index, depth+1, do) + if act == iterActionBreak { + return act + } + + childIndex += childTreeNodeCnt + childTreeNodeCnt >>= 1 + } + + return iterActionNone +} + +type combinatorialTreeNode struct { + localNodeID int + parent *combinatorialTreeNode + blocksBitmap bitmap // 选择了这个中心之后,所有中心一共包含多少种块 +} + +type bitmap uint64 + +func (b *bitmap) Set(index int, val bool) { + if val { + *b |= 1 << index + } else { + *b &= ^(1 << index) + } +} + +func (b *bitmap) Or(other *bitmap) { + *b |= *other +} + +func (b *bitmap) Weight() int { + v := *b + cnt := 0 + for v > 0 { + cnt++ + v &= (v - 1) + } + return cnt +} + +func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { + if len(obj.PinnedAt) == 0 && len(obj.Blocks) == 0 { + return nil, nil + } + + ctx := doingContext{ + execCtx: execCtx, + readerNodeIDs: readerNodeIDs, + nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist), + nodeInfos: make(map[cdssdk.NodeID]*model.Node), + nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap), + } + + err := t.getNodeInfos(&ctx, coorCli, obj) + if err != nil { + return nil, err + } + + err = t.makeBlockList(&ctx, obj) + if err != nil { + return nil, err + } + + if ctx.blockList == nil { + return nil, nil + } + + t.makeNodeBlockBitmap(&ctx) + + t.sortNodeByReaderDistance(&ctx) + + ctx.rmBlocks = make([]bool, len(ctx.blockList)) + ctx.inversedIndex = -1 + ctx.nodeCombTree = newCombinatorialTree(ctx.nodeBlockBitmaps) + + ctx.lastScore = t.calcScore(&ctx) + ctx.maxScore = ctx.lastScore + ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + + // 模拟退火算法的温度 + curTemp := ctx.lastScore + // 结束温度 + finalTemp := curTemp * 0.2 + // 冷却率 + coolingRate := 0.95 + + for curTemp > finalTemp { + ctx.inversedIndex = rand.Intn(len(ctx.rmBlocks)) + block := ctx.blockList[ctx.inversedIndex] + ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] + ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) + ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + + curScore := t.calcScore(&ctx) + + dScore := curScore - ctx.lastScore + // 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去 + if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) { + ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] + ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) + ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + fmt.Printf("\n") + } else { + fmt.Printf(" accept!\n") + ctx.lastScore = curScore + if ctx.maxScore < curScore { + ctx.maxScore = ctx.lastScore + ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + } + } + curTemp *= coolingRate + } + + return t.applySolution(ctx, obj) +} + +func (t *CleanPinned) getNodeInfos(ctx *doingContext, coorCli *coormq.Client, obj stgmod.ObjectDetail) error { + var nodeIDs []cdssdk.NodeID + for _, b := range obj.Blocks { + nodeIDs = append(nodeIDs, b.NodeID) + } + nodeIDs = append(nodeIDs, obj.PinnedAt...) + + nodeIDs = append(nodeIDs, ctx.readerNodeIDs...) + + getNode, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Uniq(nodeIDs))) + if err != nil { + return fmt.Errorf("requesting to coordinator: %w", err) + } + + for _, n := range getNode.Nodes { + ctx.nodeInfos[n.NodeID] = &n + } + + return nil +} + +func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) error { + blockCnt := 1 + minBlockCnt := 1 + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + return nil + case *cdssdk.RepRedundancy: + blockCnt = 1 + minBlockCnt = 1 + case *cdssdk.ECRedundancy: + blockCnt = red.N + minBlockCnt = red.K + default: + return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) + } + + blocksMap := make(map[cdssdk.NodeID][]objectBlock) + + // 先生成所有的影子块 + for _, pinned := range obj.PinnedAt { + blocks := make([]objectBlock, 0, blockCnt) + for i := 0; i < blockCnt; i++ { + blocks = append(blocks, objectBlock{ + Index: i, + NodeID: pinned, + HasShadow: true, + }) + } + blocksMap[pinned] = blocks + } + + // 再填充实际块 + for _, b := range obj.Blocks { + blocks := blocksMap[b.NodeID] + + has := false + for i := range blocks { + if blocks[i].Index == b.Index { + blocks[i].HasEntity = true + blocks[i].FileHash = b.FileHash + has = true + break + } + } + + if has { + continue + } + + blocks = append(blocks, objectBlock{ + Index: b.Index, + NodeID: b.NodeID, + HasEntity: true, + FileHash: b.FileHash, + }) + blocksMap[b.NodeID] = blocks + } + + var sortedBlocks []objectBlock + for _, bs := range blocksMap { + sortedBlocks = append(sortedBlocks, bs...) + } + sortedBlocks = mysort.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { + d := left.NodeID - right.NodeID + if d != 0 { + return int(d) + } + + return left.Index - right.Index + }) + + ctx.allBlockTypeCount = blockCnt + ctx.minBlockTypeCount = minBlockCnt + ctx.blockList = sortedBlocks + return nil +} + +func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) { + for _, b := range ctx.blockList { + mp, ok := ctx.nodeBlockBitmaps[b.NodeID] + if !ok { + nb := bitmap(0) + mp = &nb + ctx.nodeBlockBitmaps[b.NodeID] = mp + } + mp.Set(b.Index, true) + } +} + +func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { + for _, r := range ctx.readerNodeIDs { + var nodeDists []nodeDist + + for n := range ctx.nodeBlockBitmaps { + if r == n { + // 同节点时距离视为0.1 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: 0.1, + }) + } else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID { + // 同地区时距离视为1 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: 1, + }) + } else { + // 不同地区时距离视为5 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: 5, + }) + } + } + + ctx.nodesSortedByReader[r] = mysort.Sort(nodeDists, func(left, right nodeDist) int { return mysort.Cmp(left.Distance, right.Distance) }) + } +} + +func (t *CleanPinned) calcScore(ctx *doingContext) float64 { + dt := t.calcDisasterTolerance(ctx) + ac := t.calcMinAccessCost(ctx) + sc := t.calcSpaceCost(ctx) + + dtSc := 1.0 + if dt < 1 { + dtSc = 0 + } else if dt >= 2 { + dtSc = 1.5 + } + + newSc := 0.0 + if dt == 0 || ac == 0 { + newSc = 0 + } else { + newSc = dtSc / (sc * ac) + } + + fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc) + return newSc +} + +// 计算容灾度 +func (t *CleanPinned) calcDisasterTolerance(ctx *doingContext) float64 { + if ctx.inversedIndex != -1 { + node := ctx.blockList[ctx.inversedIndex] + ctx.nodeCombTree.UpdateBitmap(node.NodeID, *ctx.nodeBlockBitmaps[node.NodeID], ctx.minBlockTypeCount) + } + return float64(len(ctx.nodeBlockBitmaps) - ctx.nodeCombTree.FindKBlocksMaxDepth(ctx.minBlockTypeCount)) +} + +// 计算最小访问数据的代价 +func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { + cost := math.MaxFloat64 + for _, reader := range ctx.readerNodeIDs { + tarNodes := ctx.nodesSortedByReader[reader] + gotBlocks := bitmap(0) + thisCost := 0.0 + + for _, tar := range tarNodes { + tarNodeMp := ctx.nodeBlockBitmaps[tar.NodeID] + + // 只需要从目的节点上获得缺少的块 + curWeigth := gotBlocks.Weight() + // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 + gotBlocks.Or(tarNodeMp) + willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, ctx.minBlockTypeCount-curWeigth) + thisCost += float64(willGetBlocks) * float64(tar.Distance) + + if gotBlocks.Weight() >= ctx.minBlockTypeCount { + break + } + } + if gotBlocks.Weight() >= ctx.minBlockTypeCount { + cost = math.Min(cost, thisCost) + } + } + + return cost +} + +// 计算冗余度 +func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { + blockCount := 0 + for i, b := range ctx.blockList { + if ctx.rmBlocks[i] { + continue + } + + if b.HasEntity { + blockCount++ + } + if b.HasShadow { + blockCount++ + } + } + // 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块 + return float64(blockCount) / float64(ctx.minBlockTypeCount) +} + +// 如果新方案得分比旧方案小,那么在一定概率内也接受新方案 +func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool { + v := math.Exp(dScore / curTemp / coolingRate) + fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) + return v > rand.Float64() +} + +func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { + entry := coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: obj.Object.Redundancy, + } + fmt.Printf("final solu: %v, score: %v\n", ctx.maxScoreRmBlocks, ctx.maxScore) + + reconstrct := make(map[cdssdk.NodeID]*[]int) + for i, f := range ctx.maxScoreRmBlocks { + block := ctx.blockList[i] + if !f { + entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: block.Index, + NodeID: block.NodeID, + FileHash: block.FileHash, + }) + + // 如果这个块是影子块,那么就要从完整对象里重建这个块 + if !block.HasEntity { + re, ok := reconstrct[block.NodeID] + if !ok { + re = &[]int{} + reconstrct[block.NodeID] = re + } + + *re = append(*re, block.Index) + } + } + } + + bld := reqbuilder.NewBuilder() + for id := range reconstrct { + bld.IPFS().Buzy(id) + } + + mutex, err := bld.MutexLock(ctx.execCtx.Args.DistLock) + if err != nil { + return nil, fmt.Errorf("acquiring distlock: %w", err) + } + defer mutex.Unlock() + + if ecRed, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { + for id, idxs := range reconstrct { + bld := plans.NewPlanBuilder() + agt := bld.AtAgent(*ctx.nodeInfos[id]) + + strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) + ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) + for i, s := range ss.Streams { + s.IPFSWrite(fmt.Sprintf("%d", (*idxs)[i])) + } + + plan, err := bld.Build() + if err != nil { + return nil, fmt.Errorf("building io switch plan: %w", err) + } + + exec, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("executing io switch plan: %w", err) + } + ret, err := exec.Wait() + if err != nil { + return nil, fmt.Errorf("executing io switch plan: %w", err) + } + + for k, v := range ret.ResultValues { + idx, err := strconv.ParseInt(k, 10, 32) + if err != nil { + return nil, fmt.Errorf("parsing plan result: %w", err) + } + + for i := range entry.Blocks { + if entry.Blocks[i].NodeID == id && entry.Blocks[i].Index == int(idx) { + entry.Blocks[i].FileHash = v.(string) + } + } + } + + } + } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { + // rep模式不分块,所以每一个Block的FileHash就是完整文件的FileHash + for i := range entry.Blocks { + entry.Blocks[i].FileHash = obj.Object.FileHash + } + } + + return &entry, nil +} + +func init() { + RegisterMessageConvertor(NewCleanPinned) +} diff --git a/scanner/internal/event/clean_pinned_test.go b/scanner/internal/event/clean_pinned_test.go new file mode 100644 index 0000000..6066c2a --- /dev/null +++ b/scanner/internal/event/clean_pinned_test.go @@ -0,0 +1,263 @@ +package event + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func newTreeTest(nodeBlocksMap []bitmap) combinatorialTree { + tree := combinatorialTree{ + blocksMaps: make(map[int]bitmap), + nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), + } + + tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMap))) + for id, mp := range nodeBlocksMap { + tree.nodeIDToLocalNodeID[cdssdk.NodeID(id)] = len(tree.localNodeIDToNodeID) + tree.blocksMaps[len(tree.localNodeIDToNodeID)] = mp + tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, cdssdk.NodeID(id)) + } + + tree.nodes[0].localNodeID = -1 + index := 1 + tree.initNode(0, &tree.nodes[0], &index) + + return tree +} + +func Test_iterCombBits(t *testing.T) { + testcases := []struct { + title string + width int + count int + expectedValues []int + }{ + { + title: "1 of 4", + width: 4, + count: 1, + expectedValues: []int{16, 8, 4, 2}, + }, + + { + title: "2 of 4", + width: 4, + count: 2, + expectedValues: []int{24, 20, 18, 12, 10, 6}, + }, + + { + title: "3 of 4", + width: 4, + count: 3, + expectedValues: []int{28, 26, 22, 14}, + }, + + { + title: "4 of 4", + width: 4, + count: 4, + expectedValues: []int{30}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + var ret []int + var t combinatorialTree + t.iterCombBits(test.width, test.count, 0, func(i int) { + ret = append(ret, i) + }) + + So(ret, ShouldResemble, test.expectedValues) + }) + } +} + +func Test_newCombinatorialTree(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap + expectedTreeNodeLocalIDs []int + expectedTreeNodeBitmaps []int + }{ + { + title: "1个节点", + nodeBlocks: []bitmap{1}, + expectedTreeNodeLocalIDs: []int{-1, 0}, + expectedTreeNodeBitmaps: []int{0, 1}, + }, + { + title: "2个节点", + nodeBlocks: []bitmap{1, 0}, + expectedTreeNodeLocalIDs: []int{-1, 0, 1, 1}, + expectedTreeNodeBitmaps: []int{0, 1, 1, 0}, + }, + { + title: "4个节点", + nodeBlocks: []bitmap{1, 2, 4, 8}, + expectedTreeNodeLocalIDs: []int{-1, 0, 1, 2, 3, 3, 2, 3, 3, 1, 2, 3, 3, 2, 3, 3}, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + + var localIDs []int + var bitmaps []int + for _, n := range t.nodes { + localIDs = append(localIDs, n.localNodeID) + bitmaps = append(bitmaps, int(n.blocksBitmap)) + } + + So(localIDs, ShouldResemble, test.expectedTreeNodeLocalIDs) + So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps) + }) + } +} + +func Test_UpdateBitmap(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap + updatedNodeID cdssdk.NodeID + updatedBitmap bitmap + k int + expectedTreeNodeBitmaps []int + }{ + + { + title: "4个节点,更新但值不变", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + + { + title: "4个节点,更新0", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap(2), + k: 4, + expectedTreeNodeBitmaps: []int{0, 2, 2, 6, 14, 10, 6, 14, 10, 2, 6, 14, 10, 4, 12, 8}, + }, + + { + title: "4个节点,更新1", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(1), + updatedBitmap: bitmap(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 1, 5, 13, 9, 5, 13, 9, 1, 5, 13, 9, 4, 12, 8}, + }, + + { + title: "4个节点,更新2", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(2), + updatedBitmap: bitmap(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 3, 11, 11, 1, 9, 9, 2, 3, 11, 10, 1, 9, 8}, + }, + + { + title: "4个节点,更新3", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(3), + updatedBitmap: bitmap(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 7, 3, 5, 5, 1, 2, 6, 7, 3, 4, 5, 1}, + }, + + { + title: "4个节点,k<4,更新0,0之前没有k个块,现在拥有", + nodeBlocks: []bitmap{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap(3), + k: 2, + expectedTreeNodeBitmaps: []int{0, 3, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + { + title: "4个节点,k<4,更新0,0之前有k个块,现在没有", + nodeBlocks: []bitmap{3, 4, 0, 0}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap(0), + k: 2, + expectedTreeNodeBitmaps: []int{0, 0, 4, 4, 4, 4, 0, 0, 0, 4, 4, 4, 4, 0, 0, 0}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + t.UpdateBitmap(test.updatedNodeID, test.updatedBitmap, test.k) + + var bitmaps []int + for _, n := range t.nodes { + bitmaps = append(bitmaps, int(n.blocksBitmap)) + } + + So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps) + }) + } +} + +func Test_FindKBlocksMaxDepth(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap + k int + expected int + }{ + { + title: "每个节点各有一个块", + nodeBlocks: []bitmap{1, 2, 4, 8}, + k: 2, + expected: 2, + }, + { + title: "所有节点加起来块数不足", + nodeBlocks: []bitmap{1, 1, 1, 1}, + k: 2, + expected: 4, + }, + { + title: "不同节点有相同块", + nodeBlocks: []bitmap{1, 1, 2, 4}, + k: 2, + expected: 3, + }, + { + title: "一个节点就拥有所有块", + nodeBlocks: []bitmap{3, 6, 12, 24}, + k: 2, + expected: 1, + }, + { + title: "只有一块,且只在某一个节点1", + nodeBlocks: []bitmap{1, 0}, + k: 1, + expected: 2, + }, + { + title: "只有一块,且只在某一个节点2", + nodeBlocks: []bitmap{0, 1}, + k: 1, + expected: 2, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + ret := t.FindKBlocksMaxDepth(test.k) + So(ret, ShouldResemble, test.expected) + }) + } +} diff --git a/scanner/internal/tickevent/batch_clean_pinned.go b/scanner/internal/tickevent/batch_clean_pinned.go new file mode 100644 index 0000000..00e2f5c --- /dev/null +++ b/scanner/internal/tickevent/batch_clean_pinned.go @@ -0,0 +1,48 @@ +package tickevent + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + evt "gitlink.org.cn/cloudream/storage/scanner/internal/event" +) + +type BatchCleanPinned struct { + lastCheckStart int +} + +func NewBatchCleanPinned() *BatchCleanPinned { + return &BatchCleanPinned{} +} + +func (e *BatchCleanPinned) Execute(ctx ExecuteContext) { + log := logger.WithType[BatchCleanPinned]("TickEvent") + log.Debugf("begin") + defer log.Debugf("end") + + // TODO 更好的策略 + nowHour := time.Now().Hour() + if nowHour > 6 { + return + } + + packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CheckPackageBatchSize) + if err != nil { + log.Warnf("batch get package ids failed, err: %s", err.Error()) + return + } + + for _, id := range packageIDs { + ctx.Args.EventExecutor.Post(evt.NewCleanPinned(event.NewCleanPinned(id))) + } + + // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 + if len(packageIDs) < CheckPackageBatchSize { + e.lastCheckStart = 0 + log.Debugf("all package clean pinned, next time will start check at offset 0") + + } else { + e.lastCheckStart += CheckPackageBatchSize + } +} diff --git a/scanner/main.go b/scanner/main.go index f3aee3d..88d6a5a 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -125,5 +125,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) { tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 10 * 60 * 1000}) + tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) + + tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) }