/* Copyright (c) [2023] [pcm] [pcm-coordinator] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ package scheduler import ( "encoding/json" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" tool "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "strconv" "strings" ) type scheduler struct { task *response.TaskInfo participantIds []int64 scheduleService scheduleService dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService } func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } func (s *scheduler) SpecifyClusters() { // 如果已指定集群名,通过数据库查询后返回p端ip列表 if len(s.task.Clusters) != 0 { s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds) return } } func (s *scheduler) SpecifyNsID() { // 未指定集群名,只指定nsID if len(s.task.Clusters) == 0 { if len(s.task.NsID) != 0 { var clusters string s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters) clusterArr := strings.Split(clusters, ",") s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds) } } else { return } } func (s *scheduler) MatchLabels() { var ids []int64 count := 0 // 集群和nsID都未指定,则通过标签匹配 if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 { //如果集群列表或nsID均未指定 for key := range s.task.MatchLabels { var participantIds []int64 s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) if count == 0 { ids = participantIds } ids = intersect(ids, participantIds) count++ } s.participantIds = ids } else { return } } // TempAssign todo 屏蔽原调度算法 func (s *scheduler) TempAssign() error { //需要判断task中的资源类型,针对metadata中的多个kind做不同处理 //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合 var resources []interface{} tool.Convert(s.task.Metadata, &resources) for index := range resources { //如果是Deployment,需要对副本数做分发 if resources[index].(map[string]interface{})["kind"].(string) == "Deployment" || resources[index].(map[string]interface{})["kind"].(string) == "Replicaset" || resources[index].(map[string]interface{})["kind"].(string) == "StatefulSet" { resources[index].(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas } } s.task.Metadata = resources return nil } func (s *scheduler) AssignAndSchedule() error { // 已指定 ParticipantId if s.task.ParticipantId != 0 { return nil } // 标签匹配以及后,未找到ParticipantIds if len(s.participantIds) == 0 { return errors.New("未找到匹配的ParticipantIds") } // 指定或者标签匹配的结果只有一个集群,给任务信息指定 if len(s.participantIds) == 1 { s.task.ParticipantId = s.participantIds[0] replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) result := make(map[int64]string) result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) //s.result = result return nil } //生成算法所需参数 task, providerList, err := s.obtainParamsforStrategy() if err != nil { return err } //集群数量不满足,指定到标签匹配后第一个集群 if len(providerList) < 2 { s.task.ParticipantId = s.participantIds[0] return nil } //调度算法 strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...) if err != nil { return err } //调度结果 err = s.assignReplicasToResult(strategy, providerList) if err != nil { return err } return nil } func (s *scheduler) SaveToDb() error { for _, participantId := range s.participantIds { var resources []interface{} tool.Convert(s.task.Metadata, &resources) for _, resource := range resources { structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, int64(participantId)) if err != nil { return err } tx := s.dbEngin.Create(structForDb) if tx.Error != nil { logx.Error(tx.Error) return tx.Error } } } return nil } func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) { task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin) if len(providerList) == 0 { return nil, nil, errors.New("获取集群失败") } return task, providerList, nil } func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error { if len(strategy.Tasksolution) == 0 { return errors.New("调度失败, 未能获取调度结果") } for i, e := range strategy.Tasksolution { if e == 0 { continue } s.result[providerList[i].Pid] = string(e) } if len(s.result) == 0 { return errors.New("可用集群为空") } return nil }