@@ -16,25 +16,26 @@ package scheduler
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
tool "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"math/rand"
"strconv"
"strings"
"time"
)
type Replicas int64
type ParticipantId int64
type scheduler struct {
task *response.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
result map[ParticipantId]Replicas
result map[int64]string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
}
@@ -44,48 +45,40 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB,
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[ParticipantId]Replicas , 0)}, nil
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[int64]string , 0)}, nil
}
func (s *scheduler) SepcifyClusters() {
func (s *scheduler) SpecifyClusters() {
// 如果已指定集群名,通过数据库查询后返回p端ip列表
if len(s.task.Clusters) != 0 {
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
return
}
}
func (s *scheduler) MatchLabels() {
var ids []int64
count := 0
func (s *scheduler) SpecifyNsID() {
// 未指定集群名,只指定nsID
if len(s.task.Clusters) == 0 {
if len(s.task.NsID) != 0 {
var clusters string
s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
// 已指定 ParticipantId 直接不走标签匹配
if s.task.ParticipantId != 0 {
return
}
clusterArr := strings.Split(clusters, ",")
// 如果已指定集群名,通过数据库查询后返回p端ip列表
if len(s.task.Clusters) != 0 {
for i, _ := range s.task.Clusters {
clusterName := s.task.Clusters[i]
var participantId int64
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId)
s.participantIds = append(s.participantIds, participantId)
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds)
}
} else {
return
}
// 如果未指定集群名,通过用户名(nsID)筛选出用户对应的集群
if len(s.task.NsID) != 0 {
var clusters string
s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
}
clusterArr := strings.Split(clusters, ",")
func (s *scheduler) MatchLabels() {
for i, _ := range clusterArr {
clusterName := clusterArr[i]
var participantId int64
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId)
s.participantIds = append(s.participantIds, participantId)
}
} else {
//如果均未指定,则通过标签匹配
var ids []int64
count := 0
// 集群和nsID都未指定,则通过标签匹配
if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 {
//如果集群列表或nsID均未指定
for key := range s.task.MatchLabels {
var participantIds []int64
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
@@ -96,19 +89,54 @@ func (s *scheduler) MatchLabels() {
count++
}
s.participantIds = ids
} else {
return
}
}
//todo 屏蔽调度算法,目前直接在matchlabel阶段随机分配到不同集群
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
// TempAssign todo 屏蔽原调度算法
func (s *scheduler) TempAssign() error {
//需要判断task中的资源类型,针对metadata中的多个kind做不同处理
//输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合
var resources []interface{}
tool.Convert(s.task.Metadata, &resources)
for _, resource := range resources {
//如果是Deployment,需要对副本数做分发
if resource.(map[string]interface{})["kind"].(string) == "Deployment" || resource.(map[string]interface{})["kind"].(string) == "Replicaset" {
//replicas := int(resource.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64))
rand.Seed(time.Now().UnixNano())
//// 生成pID对应副本数 数组
//arrReplica := make(map[int64]int, len(s.participantIds))
//for i := 0; i < len(s.participantIds)-1; i++ {
// arrReplica[s.participantIds[i]] = rand.Intn(replicas)
// replicas -= arrReplica[s.participantIds[i]] // 更新剩余的和
//}
//arrReplica[s.participantIds[len(s.participantIds)-1]] = replicas
//
////将副本数依次写入新的yaml中并生成result数据
//yamlArray := make(map[int64]string, len(s.participantIds))
//
//for i := 0; i < len(s.participantIds)-1; i++ {
// //调整yaml
// yamlArray[s.participantIds[i]] = "sds"
//}
}
s.result[s.participantIds[0]] = ""
array := make([]int, len(s.participantIds))
for i := range array {
array[i] = int(replicas) / len(s.participantIds)
s.result[ParticipantId(s.participantIds[i])] = Replicas(array[i])
}
array[0] += int(replicas) % len(s.participantIds)
s.result[ParticipantId(s.participantIds[0])] = Replicas(array[0])
return
//var crd = s.task.Metadata
//for i := 0; i < len(s.task.Metadata); i++ {
//
//}
return nil
}
func (s *scheduler) AssignAndSchedule() error {
@@ -125,8 +153,8 @@ func (s *scheduler) AssignAndSchedule() error {
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
result := make(map[ParticipantId]Replicas )
result[ParticipantId(s.participantIds[0])] = Replicas(replicas )
result := make(map[int64]string )
result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64 )
s.result = result
return nil
@@ -162,7 +190,12 @@ func (s *scheduler) AssignAndSchedule() error {
func (s *scheduler) SaveToDb() error {
for key, value := range s.result {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(value))
num, err := strconv.Atoi(value)
if err != nil {
fmt.Println("转换失败:", err)
}
structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(num))
if err != nil {
return err
}
@@ -196,7 +229,7 @@ func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList
if e == 0 {
continue
}
s.result[ParticipantId(providerList[i].Pid)] = Replicas(int64(e) )
s.result[providerList[i].Pid] = string(e )
}
if len(s.result) == 0 {