You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduler.go 5.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package scheduler
  13. import (
  14. "encoding/json"
  15. "github.com/pkg/errors"
  16. "github.com/zeromicro/go-zero/core/logx"
  17. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/common"
  18. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
  19. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
  20. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
  21. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
  22. "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
  23. "gorm.io/gorm"
  24. "sigs.k8s.io/yaml"
  25. "strings"
  26. )
  27. type Scheduler struct {
  28. task *response.TaskInfo
  29. participantIds []int64
  30. subSchedule common.SubSchedule
  31. dbEngin *gorm.DB
  32. result []string //pID:子任务yamlstring 键值对
  33. participantRpc participantservice.ParticipantService
  34. ResourceCollector *map[string]collector.ResourceCollector
  35. Storages database.Storage
  36. AiExecutor *map[string]executor.AiExecutor
  37. }
  38. func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
  39. var task *response.TaskInfo
  40. err := json.Unmarshal([]byte(val), &task)
  41. if err != nil {
  42. return nil, errors.New("create scheduler failed : " + err.Error())
  43. }
  44. return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
  45. }
  46. func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
  47. return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
  48. }
  49. func (s *Scheduler) SpecifyClusters() {
  50. // 如果已指定集群名,通过数据库查询后返回p端ip列表
  51. if len(s.task.Clusters) != 0 {
  52. s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
  53. return
  54. }
  55. }
  56. func (s *Scheduler) SpecifyNsID() {
  57. // 未指定集群名,只指定nsID
  58. if len(s.task.Clusters) == 0 {
  59. if len(s.task.NsID) != 0 {
  60. var clusters string
  61. s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
  62. clusterArr := strings.Split(clusters, ",")
  63. s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds)
  64. }
  65. } else {
  66. return
  67. }
  68. }
  69. func (s *Scheduler) MatchLabels() {
  70. var ids []int64
  71. count := 0
  72. // 集群和nsID都未指定,则通过标签匹配
  73. if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 {
  74. //如果集群列表或nsID均未指定
  75. for key := range s.task.MatchLabels {
  76. var participantIds []int64
  77. s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
  78. if count == 0 {
  79. ids = participantIds
  80. }
  81. ids = common.Intersect(ids, participantIds)
  82. count++
  83. }
  84. s.participantIds = ids
  85. } else {
  86. return
  87. }
  88. }
  89. // TempAssign todo 屏蔽原调度算法
  90. func (s *Scheduler) TempAssign() error {
  91. //需要判断task中的资源类型,针对metadata中的多个kind做不同处理
  92. //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合
  93. var meData []string
  94. for _, yamlString := range s.task.Metadata {
  95. var data map[string]interface{}
  96. err := yaml.Unmarshal([]byte(yamlString), &data)
  97. if err != nil {
  98. }
  99. jsonData, err := json.Marshal(data)
  100. if err != nil {
  101. }
  102. meData = append(meData, string(jsonData))
  103. }
  104. s.task.Metadata = meData
  105. return nil
  106. }
  107. func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error {
  108. //// 已指定 ParticipantId
  109. //if s.task.ParticipantId != 0 {
  110. // return nil
  111. //}
  112. //// 标签匹配以及后,未找到ParticipantIds
  113. //if len(s.participantIds) == 0 {
  114. // return errors.New("未找到匹配的ParticipantIds")
  115. //}
  116. //
  117. //// 指定或者标签匹配的结果只有一个集群,给任务信息指定
  118. //if len(s.participantIds) == 1 {
  119. // s.task.ParticipantId = s.participantIds[0]
  120. // //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
  121. // //result := make(map[int64]string)
  122. // //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
  123. // //s.result = result
  124. //
  125. // return nil
  126. //}
  127. strategy, err := ss.PickOptimalStrategy()
  128. if err != nil {
  129. return err
  130. }
  131. clusters, err := strategy.Schedule()
  132. if err != nil {
  133. return err
  134. }
  135. //集群数量不满足,指定到标签匹配后第一个集群
  136. //if len(providerList) < 2 {
  137. // s.task.ParticipantId = s.participantIds[0]
  138. // return nil
  139. //}
  140. err = ss.AssignTask(clusters)
  141. if err != nil {
  142. return err
  143. }
  144. return nil
  145. }
  146. func (s *Scheduler) SaveToDb() error {
  147. for _, participantId := range s.participantIds {
  148. for _, resource := range s.task.Metadata {
  149. structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId)
  150. if err != nil {
  151. return err
  152. }
  153. tx := s.dbEngin.Create(structForDb)
  154. if tx.Error != nil {
  155. logx.Error(tx.Error)
  156. return tx.Error
  157. }
  158. }
  159. }
  160. return nil
  161. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.