|
- /*
-
- Copyright (c) [2023] [pcm]
- [pcm-coordinator] is licensed under Mulan PSL v2.
- You can use this software according to the terms and conditions of the Mulan PSL v2.
- You may obtain a copy of Mulan PSL v2 at:
- http://license.coscl.org.cn/MulanPSL2
- THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
- EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
- MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
- See the Mulan PSL v2 for more details.
-
- */
-
- package scheduler
-
- import (
- "encoding/json"
- "strings"
-
- "github.com/pkg/errors"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
- "gorm.io/gorm"
- "sigs.k8s.io/yaml"
- )
-
- type Scheduler struct {
- task *response.TaskInfo
- participantIds []int64
- subSchedule SubSchedule
- dbEngin *gorm.DB
- result []string //pID:子任务yamlstring 键值对
- AiStorages *database.AiStorage
- AiService *service.AiService
- HpcStorages *database.HpcStorage
- HpcService *service.HpcService
- CloudStorages *database.CloudStorage
- CloudService *service.CloudService
- }
-
- type SubSchedule interface {
- GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
- PickOptimalStrategy() (strategy.Strategy, error)
- AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error)
- }
-
- func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB) (*Scheduler, error) {
- var task *response.TaskInfo
- err := json.Unmarshal([]byte(val), &task)
- if err != nil {
- return nil, errors.New("create scheduler failed : " + err.Error())
- }
- return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin}, nil
- }
-
- func NewSchdlr(aiService *service.AiService, storages *database.AiStorage, hpcStorage *database.HpcStorage, hpcService *service.HpcService, cloudStorage *database.CloudStorage, cloudService *service.CloudService) *Scheduler {
- return &Scheduler{AiService: aiService, AiStorages: storages, HpcStorages: hpcStorage, HpcService: hpcService, CloudStorages: cloudStorage, CloudService: cloudService}
- }
-
- func (s *Scheduler) SpecifyClusters() {
- // 如果已指定集群名,通过数据库查询后返回p端ip列表
- if len(s.task.Clusters) != 0 {
- s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
- return
- }
- }
-
- func (s *Scheduler) SpecifyNsID() {
- // 未指定集群名,只指定nsID
- if len(s.task.Clusters) == 0 {
- if len(s.task.NsID) != 0 {
- var clusters string
- s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
-
- clusterArr := strings.Split(clusters, ",")
-
- s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds)
- }
- } else {
- return
- }
- }
-
- func (s *Scheduler) MatchLabels() {
-
- var ids []int64
- count := 0
- // 集群和nsID都未指定,则通过标签匹配
- if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 {
- //如果集群列表或nsID均未指定
- for key := range s.task.MatchLabels {
- var participantIds []int64
- s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
- if count == 0 {
- ids = participantIds
- }
- ids = common.Intersect(ids, participantIds)
- count++
- }
- s.participantIds = ids
- } else {
- return
- }
- }
-
- // TempAssign todo 屏蔽原调度算法
- func (s *Scheduler) TempAssign() error {
-
- //需要判断task中的资源类型,针对metadata中的多个kind做不同处理
- //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合
- var meData []string
- for _, yamlString := range s.task.Metadata {
- var data map[string]interface{}
- err := yaml.Unmarshal([]byte(yamlString), &data)
- if err != nil {
- }
-
- jsonData, err := json.Marshal(data)
- if err != nil {
- }
- meData = append(meData, string(jsonData))
- }
- s.task.Metadata = meData
- return nil
- }
-
- func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters []*strategy.AssignedCluster) (interface{}, error) {
- var result interface{}
- switch mode {
- case executor.SUBMIT_MODE_JOINT_CLOUD:
- //choose strategy
- strategy, err := ss.PickOptimalStrategy()
- if err != nil {
- return nil, err
- }
-
- //schedule
- clusters, err := strategy.Schedule()
- if err != nil {
- return nil, err
- }
-
- //assign tasks to clusters
- resp, err := ss.AssignTask(clusters, mode)
- if err != nil {
- return nil, err
- }
-
- result = resp
-
- case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
-
- //assign tasks to clusters
- resp, err := ss.AssignTask(assignedClusters, mode)
- if err != nil {
- return nil, err
- }
-
- result = resp
- }
-
- return result, nil
- }
-
- func (s *Scheduler) SaveToDb() error {
-
- for _, participantId := range s.participantIds {
-
- for _, resource := range s.task.Metadata {
- structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId)
- if err != nil {
- return err
- }
-
- tx := s.dbEngin.Create(structForDb)
- if tx.Error != nil {
- logx.Error(tx.Error)
- return tx.Error
- }
- }
-
- }
- return nil
- }
-
- func (s *Scheduler) CreateTask(taskName string, desc string, userId int64, synergyCode int64, strategyName string, yaml string, token string, userIp string, config *config.Config, userName string) (int64, error) {
- strategyCode, err := s.AiStorages.GetStrategyCode(strategyName)
- if err != nil {
- return 0, err
- }
-
- id, err := s.AiStorages.SaveTask(taskName, desc, userId, strategyCode, synergyCode, "10", yaml, nil, userName)
- if err != nil {
- return 0, err
- }
-
- return id, nil
- }
|