Browse Source

fix: add ai Random RandomStrategy

Former-commit-id: bde21281fe
pull/190/head
qiwang 1 year ago
parent
commit
d4e18c8796
4 changed files with 86 additions and 29 deletions
  1. +1
    -0
      api/internal/logic/schedule/schedulesubmitlogic.go
  2. +24
    -24
      api/internal/scheduler/schedulers/aiScheduler.go
  3. +60
    -4
      api/internal/scheduler/strategy/random.go
  4. +1
    -1
      api/internal/scheduler/strategy/test/strategy_test.go

+ 1
- 0
api/internal/logic/schedule/schedulesubmitlogic.go View File

@@ -44,6 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
Params: req.AiOption.Params,
Envs: req.AiOption.Envs,
Cmd: req.AiOption.Cmd,
ClusterIds: req.AiOption.AiClusterIds,
}
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
if err != nil {


+ 24
- 24
api/internal/scheduler/schedulers/aiScheduler.go View File

@@ -25,7 +25,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@@ -71,44 +70,45 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}

resources, err := as.findClustersWithResources()
/* resources, err := as.findClustersWithResources()

if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}

if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}

params := &param.Params{Resources: resources}
params := &param.Params{Resources: resources}*/

switch as.option.StrategyName {
case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
/* for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
}*/
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
/* case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil*/
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
return strategy, nil
case strategy.RANDOM:

strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
return strategy, nil
}

return nil, errors.New("no strategy has been chosen")


+ 60
- 4
api/internal/scheduler/strategy/random.go View File

@@ -1,10 +1,66 @@
package strategy

/*type RandomStrategy struct {
import (
"github.com/pkg/errors"
"math/rand"
)

type RandomStrategy struct {
clusterIds []string
replicas int32
}

func NewRandomStrategy(clusterIds []string, replicas int32) *NewRandomStrategy {
return &RandomStrategy{clusterIds: clusterIds, replicas: replicas}
}*/
func NewRandomStrategy(clusterIds []string, replicas int32) *RandomStrategy {
return &RandomStrategy{clusterIds: clusterIds,
replicas: replicas,
}
}

func (s *RandomStrategy) Schedule() ([]*AssignedCluster, error) {

if s.replicas < 1 {
return nil, errors.New("replicas must be greater than 0")
}

if len(s.clusterIds) < 1 {
return nil, errors.New("cluster must be greater than 0")
}

if len(s.clusterIds) == 0 || s.clusterIds == nil {
return nil, errors.New("weight must be set")
}

// 创建一个切片来保存每个部分的数量
parts := make([]int32, len(s.clusterIds))

// 首先将每个部分都分配至少一个副本
for i := range parts {
parts[i] = 1
s.replicas--
}

// 剩余要分配的副本数
remaining := s.replicas

// 随机分配剩余的副本
for remaining > 0 {
// 随机选择一个部分(索引从0到numParts-1)
partIndex := rand.Intn(len(s.clusterIds))

// 如果该部分加上一个副本后不会超过总数,则分配一个副本
if parts[partIndex]+1 <= s.replicas {
parts[partIndex]++
remaining--
}
}

var results []*AssignedCluster
if len(s.clusterIds) == len(parts) {
for i, key := range s.clusterIds {
cluster := &AssignedCluster{ClusterId: key, Replicas: parts[i]}
results = append(results, cluster)
}
}
return results, nil

}

+ 1
- 1
api/internal/scheduler/strategy/test/strategy_test.go View File

@@ -202,7 +202,7 @@ func splitRandomParts(total int) []int {

func TestNumRandom(t *testing.T) {
total := 10 // 假设副本数是10
numParts := 3 // 假设要分成5个集群
numParts := 2 // 假设要分成5个集群

parts, err := splitIntoParts(total, numParts)
if err != nil {


Loading…
Cancel
Save