You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduler.go 6.0 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package scheduler
  13. import (
  14. "encoding/json"
  15. "github.com/pkg/errors"
  16. "github.com/zeromicro/go-zero/core/logx"
  17. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  18. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  23. "gorm.io/gorm"
  24. "sigs.k8s.io/yaml"
  25. "strings"
  26. "sync"
  27. )
  28. const (
  29. JOINT_CLOUD_MODE = iota + 1
  30. STORAGE_SCHEDULE_MODE
  31. )
  32. type Scheduler struct {
  33. task *response.TaskInfo
  34. participantIds []int64
  35. subSchedule SubSchedule
  36. dbEngin *gorm.DB
  37. result []string //pID:子任务yamlstring 键值对
  38. AiStorages *database.AiStorage
  39. AiService *service.AiService
  40. mu sync.RWMutex
  41. }
  42. type SubSchedule interface {
  43. GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
  44. PickOptimalStrategy() (strategy.Strategy, error)
  45. AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error)
  46. }
  47. func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB) (*Scheduler, error) {
  48. var task *response.TaskInfo
  49. err := json.Unmarshal([]byte(val), &task)
  50. if err != nil {
  51. return nil, errors.New("create scheduler failed : " + err.Error())
  52. }
  53. return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin}, nil
  54. }
  55. func NewSchdlr(aiService *service.AiService, storages *database.AiStorage) *Scheduler {
  56. return &Scheduler{AiService: aiService, AiStorages: storages}
  57. }
  58. func (s *Scheduler) SpecifyClusters() {
  59. // 如果已指定集群名,通过数据库查询后返回p端ip列表
  60. if len(s.task.Clusters) != 0 {
  61. s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
  62. return
  63. }
  64. }
  65. func (s *Scheduler) SpecifyNsID() {
  66. // 未指定集群名,只指定nsID
  67. if len(s.task.Clusters) == 0 {
  68. if len(s.task.NsID) != 0 {
  69. var clusters string
  70. s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
  71. clusterArr := strings.Split(clusters, ",")
  72. s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds)
  73. }
  74. } else {
  75. return
  76. }
  77. }
  78. func (s *Scheduler) MatchLabels() {
  79. var ids []int64
  80. count := 0
  81. // 集群和nsID都未指定,则通过标签匹配
  82. if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 {
  83. //如果集群列表或nsID均未指定
  84. for key := range s.task.MatchLabels {
  85. var participantIds []int64
  86. s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
  87. if count == 0 {
  88. ids = participantIds
  89. }
  90. ids = common.Intersect(ids, participantIds)
  91. count++
  92. }
  93. s.participantIds = ids
  94. } else {
  95. return
  96. }
  97. }
  98. // TempAssign todo 屏蔽原调度算法
  99. func (s *Scheduler) TempAssign() error {
  100. //需要判断task中的资源类型,针对metadata中的多个kind做不同处理
  101. //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合
  102. var meData []string
  103. for _, yamlString := range s.task.Metadata {
  104. var data map[string]interface{}
  105. err := yaml.Unmarshal([]byte(yamlString), &data)
  106. if err != nil {
  107. }
  108. jsonData, err := json.Marshal(data)
  109. if err != nil {
  110. }
  111. meData = append(meData, string(jsonData))
  112. }
  113. s.task.Metadata = meData
  114. return nil
  115. }
  116. func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters interface{}) (interface{}, error) {
  117. var result interface{}
  118. switch mode {
  119. case JOINT_CLOUD_MODE:
  120. //choose strategy
  121. strategy, err := ss.PickOptimalStrategy()
  122. if err != nil {
  123. return nil, err
  124. }
  125. //schedule
  126. clusters, err := strategy.Schedule()
  127. if err != nil {
  128. return nil, err
  129. }
  130. //assign tasks to clusters
  131. resp, err := ss.AssignTask(clusters, mode)
  132. if err != nil {
  133. return nil, err
  134. }
  135. result = resp
  136. case STORAGE_SCHEDULE_MODE:
  137. jobClusterInfos, ok := assignedClusters.([]*types.JobClusterInfo)
  138. if !ok {
  139. return nil, errors.New("converting JobClusterInfos fails")
  140. }
  141. var clusters []*strategy.AssignedCluster
  142. for _, info := range jobClusterInfos {
  143. cluster := &strategy.AssignedCluster{ClusterId: info.ClusterID, Replicas: 1}
  144. clusters = append(clusters, cluster)
  145. }
  146. //assign tasks to clusters
  147. resp, err := ss.AssignTask(clusters, mode)
  148. if err != nil {
  149. return nil, err
  150. }
  151. result = resp
  152. }
  153. return result, nil
  154. }
  155. func (s *Scheduler) SaveToDb() error {
  156. for _, participantId := range s.participantIds {
  157. for _, resource := range s.task.Metadata {
  158. structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId)
  159. if err != nil {
  160. return err
  161. }
  162. tx := s.dbEngin.Create(structForDb)
  163. if tx.Error != nil {
  164. logx.Error(tx.Error)
  165. return tx.Error
  166. }
  167. }
  168. }
  169. return nil
  170. }
  171. func (s *Scheduler) CreateTask(taskName string, synergyCode int64, strategyName string, yaml string) (int64, error) {
  172. strategyCode, err := s.AiStorages.GetStrategyCode(strategyName)
  173. if err != nil {
  174. return 0, err
  175. }
  176. id, err := s.AiStorages.SaveTask(taskName, strategyCode, synergyCode, "10", yaml)
  177. if err != nil {
  178. return 0, err
  179. }
  180. return id, nil
  181. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.