From 03a3701b47eb26ba2c3b23f5bde58ca0435399e4 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 14 Jul 2025 11:00:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=86=97=E4=BD=99=E7=AD=96?= =?UTF-8?q?=E7=95=A5=E8=B0=83=E6=95=B4=E7=9A=84=E6=97=B6=E9=95=BF=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/ticktock/change_redundancy.go | 11 ++++-- client/internal/ticktock/redundancy_shrink.go | 39 ++++++++++++------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index c57e824..1495ac7 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -59,6 +59,8 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { } lastPkgID := clitypes.PackageID(0) + +loop: for { pkgs, err := db.DoTx21(t.db, t.db.Package().BatchGetDetailPaged, lastPkgID, BatchGetPackageDetailCount) if err != nil { @@ -71,6 +73,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { lastPkgID = pkgs[len(pkgs)-1].Package.PackageID for _, p := range pkgs { + // 如果执行超过两个小时,则停止 + if time.Since(startTime) > time.Hour*2 { + break loop + } + err := j.changeOne(ctx, p) if err != nil { log.Warnf("change redundancy: %v", err) @@ -78,10 +85,6 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { } } - // 如果执行超过两个小时,则停止 - if time.Since(startTime) > time.Hour*2 { - break - } } } diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index b032d2c..4be3c11 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -47,7 +47,6 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, } } - planBld := exec.NewPlanBuilder() planningStgIDs := make(map[clitypes.UserSpaceID]bool) var sysEvents []datamap.SysEventBody @@ -61,14 +60,26 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, pinnedAt: repMostHubIDs, blocks: nil, }) - for _, obj := range repObjects { - repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(execCtx, solu, obj, planBld, planningStgIDs)) - sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, obj)...) + + iRepObj := 0 + for iRepObj < len(repObjects) { + planBld := exec.NewPlanBuilder() + for c := 0; c < 10 && iRepObj < len(repObjects); c++ { + repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(execCtx, solu, repObjects[iRepObj], planBld, planningStgIDs)) + sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, repObjects[iRepObj])...) + iRepObj++ + } + + _, err := t.executePlans(execCtx, planBld, planningStgIDs, reen) + if err != nil { + log.Warn(err.Error()) + return nil, nil, fmt.Errorf("execute plans: %w", err) + } } // 对于ec对象,则每个对象单独进行退火算法 var ecObjectsUpdating []db.UpdatingObjectRedundancy - for _, obj := range ecObjects { + for i, obj := range ecObjects { ecRed := obj.Object.Redundancy.(*clitypes.ECRedundancy) solu := t.startAnnealing(execCtx, readerStgIDs, annealingObject{ totalBlockCount: ecRed.N, @@ -76,19 +87,19 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext, pinnedAt: obj.PinnedAt, blocks: obj.Blocks, }) + + planBld := exec.NewPlanBuilder() ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(execCtx, solu, obj, planBld, planningStgIDs)) sysEvents = append(sysEvents, t.generateSysEventForECObject(solu, obj)...) - } - ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs, reen) - if err != nil { - log.Warn(err.Error()) - return nil, nil, fmt.Errorf("execute plans: %w", err) - } + ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs, reen) + if err != nil { + log.Warn(err.Error()) + return nil, nil, fmt.Errorf("execute plans: %w", err) + } - // 根据按照方案进行调整的结果,填充更新元数据的命令 - for i := range ecObjectsUpdating { - t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets) + // 根据按照方案进行调整的结果,填充更新元数据的命令 + t.populateECObjectEntry(&ecObjectsUpdating[i], obj, ioSwRets) } return append(repObjectsUpdating, ecObjectsUpdating...), sysEvents, nil