Browse Source

增加冗余策略调整的时长控制

master
Sydonian 4 months ago
parent
commit
03a3701b47
2 changed files with 32 additions and 18 deletions
  1. +7
    -4
      client/internal/ticktock/change_redundancy.go
  2. +25
    -14
      client/internal/ticktock/redundancy_shrink.go

+ 7
- 4
client/internal/ticktock/change_redundancy.go View File

@@ -59,6 +59,8 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {
}

lastPkgID := clitypes.PackageID(0)

loop:
for {
pkgs, err := db.DoTx21(t.db, t.db.Package().BatchGetDetailPaged, lastPkgID, BatchGetPackageDetailCount)
if err != nil {
@@ -71,6 +73,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {
lastPkgID = pkgs[len(pkgs)-1].Package.PackageID

for _, p := range pkgs {
// 如果执行超过两个小时,则停止
if time.Since(startTime) > time.Hour*2 {
break loop
}

err := j.changeOne(ctx, p)
if err != nil {
log.Warnf("change redundancy: %v", err)
@@ -78,10 +85,6 @@ func (j *ChangeRedundancy) Execute(t *TickTock) {
}
}

// 如果执行超过两个小时,则停止
if time.Since(startTime) > time.Hour*2 {
break
}
}
}



+ 25
- 14
client/internal/ticktock/redundancy_shrink.go View File

@@ -47,7 +47,6 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext,
}
}

planBld := exec.NewPlanBuilder()
planningStgIDs := make(map[clitypes.UserSpaceID]bool)

var sysEvents []datamap.SysEventBody
@@ -61,14 +60,26 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext,
pinnedAt: repMostHubIDs,
blocks: nil,
})
for _, obj := range repObjects {
repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(execCtx, solu, obj, planBld, planningStgIDs))
sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, obj)...)

iRepObj := 0
for iRepObj < len(repObjects) {
planBld := exec.NewPlanBuilder()
for c := 0; c < 10 && iRepObj < len(repObjects); c++ {
repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(execCtx, solu, repObjects[iRepObj], planBld, planningStgIDs))
sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, repObjects[iRepObj])...)
iRepObj++
}

_, err := t.executePlans(execCtx, planBld, planningStgIDs, reen)
if err != nil {
log.Warn(err.Error())
return nil, nil, fmt.Errorf("execute plans: %w", err)
}
}

// 对于ec对象,则每个对象单独进行退火算法
var ecObjectsUpdating []db.UpdatingObjectRedundancy
for _, obj := range ecObjects {
for i, obj := range ecObjects {
ecRed := obj.Object.Redundancy.(*clitypes.ECRedundancy)
solu := t.startAnnealing(execCtx, readerStgIDs, annealingObject{
totalBlockCount: ecRed.N,
@@ -76,19 +87,19 @@ func (t *ChangeRedundancy) doRedundancyShrink(execCtx *changeRedundancyContext,
pinnedAt: obj.PinnedAt,
blocks: obj.Blocks,
})

planBld := exec.NewPlanBuilder()
ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(execCtx, solu, obj, planBld, planningStgIDs))
sysEvents = append(sysEvents, t.generateSysEventForECObject(solu, obj)...)
}

ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs, reen)
if err != nil {
log.Warn(err.Error())
return nil, nil, fmt.Errorf("execute plans: %w", err)
}
ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs, reen)
if err != nil {
log.Warn(err.Error())
return nil, nil, fmt.Errorf("execute plans: %w", err)
}

// 根据按照方案进行调整的结果,填充更新元数据的命令
for i := range ecObjectsUpdating {
t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets)
// 根据按照方案进行调整的结果,填充更新元数据的命令
t.populateECObjectEntry(&ecObjectsUpdating[i], obj, ioSwRets)
}

return append(repObjectsUpdating, ecObjectsUpdating...), sysEvents, nil


Loading…
Cancel
Save