You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 7.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  27. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  28. "sync"
  29. )
  30. type AiScheduler struct {
  31. yamlString string
  32. task *response.TaskInfo
  33. *scheduler.Scheduler
  34. option *option.AiOption
  35. ctx context.Context
  36. }
  37. type AiResult struct {
  38. TaskId string
  39. ClusterId string
  40. Strategy string
  41. Replica int32
  42. Msg string
  43. }
  44. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  45. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  46. }
  47. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  48. ai := models.Ai{
  49. ParticipantId: participantId,
  50. TaskId: task.TaskId,
  51. Status: "Saved",
  52. YamlString: as.yamlString,
  53. }
  54. utils.Convert(task.Metadata, &ai)
  55. return ai, nil
  56. }
  57. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  58. if len(as.option.ClusterIds) == 1 {
  59. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  60. }
  61. resources, err := as.findClustersWithResources()
  62. if err != nil {
  63. return nil, err
  64. }
  65. if len(resources) == 0 {
  66. return nil, errors.New("no cluster has resources")
  67. }
  68. if len(resources) == 1 {
  69. var cluster strategy.AssignedCluster
  70. cluster.ClusterId = resources[0].ClusterId
  71. cluster.Replicas = 1
  72. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  73. }
  74. params := &param.Params{Resources: resources}
  75. switch as.option.StrategyName {
  76. case strategy.REPLICATION:
  77. var clusterIds []string
  78. for _, resource := range resources {
  79. clusterIds = append(clusterIds, resource.ClusterId)
  80. }
  81. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  82. return strategy, nil
  83. case strategy.RESOURCES_PRICING:
  84. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  85. return strategy, nil
  86. case strategy.DYNAMIC_RESOURCES:
  87. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  88. return strategy, nil
  89. case strategy.STATIC_WEIGHT:
  90. //todo resources should match cluster StaticWeightMap
  91. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1)
  92. return strategy, nil
  93. }
  94. return nil, errors.New("no strategy has been chosen")
  95. }
  96. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
  97. if clusters == nil {
  98. return nil, errors.New("clusters is nil")
  99. }
  100. for i := len(clusters) - 1; i >= 0; i-- {
  101. if clusters[i].Replicas == 0 {
  102. clusters = append(clusters[:i], clusters[i+1:]...)
  103. }
  104. }
  105. if len(clusters) == 0 {
  106. return nil, errors.New("clusters is nil")
  107. }
  108. var wg sync.WaitGroup
  109. var results []*AiResult
  110. var errs []interface{}
  111. var ch = make(chan *AiResult, len(clusters))
  112. var errCh = make(chan interface{}, len(clusters))
  113. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  114. for _, cluster := range clusters {
  115. c := cluster
  116. wg.Add(1)
  117. go func() {
  118. opt, _ := cloneAiOption(as.option)
  119. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
  120. if err != nil {
  121. e := struct {
  122. err error
  123. clusterId string
  124. }{
  125. err: err,
  126. clusterId: c.ClusterId,
  127. }
  128. errCh <- e
  129. wg.Done()
  130. return
  131. }
  132. result, _ := convertType(resp)
  133. result.Replica = c.Replicas
  134. result.ClusterId = c.ClusterId
  135. result.Strategy = as.option.StrategyName
  136. ch <- result
  137. wg.Done()
  138. }()
  139. }
  140. wg.Wait()
  141. close(ch)
  142. close(errCh)
  143. for e := range errCh {
  144. errs = append(errs, e)
  145. }
  146. if len(errs) == len(clusters) {
  147. return nil, errors.New("submit task failed")
  148. }
  149. if len(errs) != 0 {
  150. var msg string
  151. for _, err := range errs {
  152. e := (err).(struct {
  153. err error
  154. clusterId string
  155. })
  156. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  157. }
  158. for s := range ch {
  159. if s.Msg != "" {
  160. msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  161. } else {
  162. msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
  163. }
  164. }
  165. return nil, errors.New(msg)
  166. }
  167. for s := range ch {
  168. // TODO: database operation
  169. results = append(results, s)
  170. }
  171. return results, nil
  172. }
  173. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  174. var wg sync.WaitGroup
  175. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  176. var ch = make(chan *collector.ResourceStats, clustersNum)
  177. var errCh = make(chan interface{}, clustersNum)
  178. var resourceSpecs []*collector.ResourceStats
  179. var errs []interface{}
  180. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  181. wg.Add(1)
  182. rc := resourceCollector
  183. id := s
  184. go func() {
  185. spec, err := rc.GetResourceStats(as.ctx)
  186. if err != nil {
  187. e := struct {
  188. err error
  189. clusterId string
  190. }{
  191. err: err,
  192. clusterId: id,
  193. }
  194. errCh <- e
  195. wg.Done()
  196. return
  197. }
  198. ch <- spec
  199. wg.Done()
  200. }()
  201. }
  202. wg.Wait()
  203. close(ch)
  204. close(errCh)
  205. for s := range ch {
  206. resourceSpecs = append(resourceSpecs, s)
  207. }
  208. for e := range errCh {
  209. errs = append(errs, e)
  210. }
  211. if len(errs) == clustersNum {
  212. return nil, errors.New("get resources failed")
  213. }
  214. if len(errs) != 0 {
  215. var msg string
  216. for _, err := range errs {
  217. e := (err).(struct {
  218. err error
  219. clusterId string
  220. })
  221. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  222. }
  223. return nil, errors.New(msg)
  224. }
  225. return resourceSpecs, nil
  226. }
  227. func convertType(in interface{}) (*AiResult, error) {
  228. var result AiResult
  229. switch (in).(type) {
  230. case *hpcAC.SubmitTaskAiResp:
  231. resp := (in).(*hpcAC.SubmitTaskAiResp)
  232. if resp.Code == "0" {
  233. result.TaskId = resp.Data
  234. } else {
  235. result.Msg = resp.Msg
  236. }
  237. return &result, nil
  238. case *octopus.CreateTrainJobResp:
  239. resp := (in).(*octopus.CreateTrainJobResp)
  240. if resp.Success {
  241. result.TaskId = resp.Payload.JobId
  242. } else {
  243. result.Msg = resp.Error.Message
  244. }
  245. return &result, nil
  246. default:
  247. return nil, errors.New("ai task response failed")
  248. }
  249. }
  250. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  251. origJSON, err := json.Marshal(opt)
  252. if err != nil {
  253. return nil, err
  254. }
  255. clone := option.AiOption{}
  256. if err = json.Unmarshal(origJSON, &clone); err != nil {
  257. return nil, err
  258. }
  259. return &clone, nil
  260. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.