You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 8.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  28. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  29. "sync"
  30. )
  31. type AiScheduler struct {
  32. yamlString string
  33. task *response.TaskInfo
  34. *scheduler.Scheduler
  35. option *option.AiOption
  36. ctx context.Context
  37. }
  38. type AiResult struct {
  39. TaskId string
  40. ClusterId string
  41. Strategy string
  42. Replica int32
  43. Msg string
  44. }
  45. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  46. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  47. }
  48. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  49. ai := models.Ai{
  50. AdapterId: participantId,
  51. TaskId: task.TaskId,
  52. Status: "Saved",
  53. YamlString: as.yamlString,
  54. }
  55. utils.Convert(task.Metadata, &ai)
  56. return ai, nil
  57. }
  58. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  59. if len(as.option.ClusterIds) == 1 {
  60. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  61. }
  62. resources, err := as.findClustersWithResources()
  63. if err != nil {
  64. return nil, err
  65. }
  66. if len(resources) == 0 {
  67. return nil, errors.New("no cluster has resources")
  68. }
  69. if len(resources) == 1 {
  70. var cluster strategy.AssignedCluster
  71. cluster.ClusterId = resources[0].ClusterId
  72. cluster.Replicas = 1
  73. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  74. }
  75. params := &param.Params{Resources: resources}
  76. switch as.option.StrategyName {
  77. case strategy.REPLICATION:
  78. var clusterIds []string
  79. for _, resource := range resources {
  80. clusterIds = append(clusterIds, resource.ClusterId)
  81. }
  82. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  83. return strategy, nil
  84. case strategy.RESOURCES_PRICING:
  85. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  86. return strategy, nil
  87. case strategy.DYNAMIC_RESOURCES:
  88. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  89. return strategy, nil
  90. case strategy.STATIC_WEIGHT:
  91. //todo resources should match cluster StaticWeightMap
  92. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1)
  93. return strategy, nil
  94. }
  95. return nil, errors.New("no strategy has been chosen")
  96. }
  97. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
  98. if clusters == nil {
  99. return nil, errors.New("clusters is nil")
  100. }
  101. for i := len(clusters) - 1; i >= 0; i-- {
  102. if clusters[i].Replicas == 0 {
  103. clusters = append(clusters[:i], clusters[i+1:]...)
  104. }
  105. }
  106. if len(clusters) == 0 {
  107. return nil, errors.New("clusters is nil")
  108. }
  109. var wg sync.WaitGroup
  110. var results []*AiResult
  111. var errs []interface{}
  112. var ch = make(chan *AiResult, len(clusters))
  113. var errCh = make(chan interface{}, len(clusters))
  114. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  115. for _, cluster := range clusters {
  116. c := cluster
  117. wg.Add(1)
  118. go func() {
  119. opt, _ := cloneAiOption(as.option)
  120. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
  121. if err != nil {
  122. e := struct {
  123. err error
  124. clusterId string
  125. }{
  126. err: err,
  127. clusterId: c.ClusterId,
  128. }
  129. errCh <- e
  130. wg.Done()
  131. return
  132. }
  133. result, _ := convertType(resp)
  134. result.Replica = c.Replicas
  135. result.ClusterId = c.ClusterId
  136. result.Strategy = as.option.StrategyName
  137. ch <- result
  138. wg.Done()
  139. }()
  140. }
  141. wg.Wait()
  142. close(ch)
  143. close(errCh)
  144. for e := range errCh {
  145. errs = append(errs, e)
  146. }
  147. for s := range ch {
  148. results = append(results, s)
  149. }
  150. if len(errs) != 0 {
  151. var synergystatus int64
  152. if len(clusters) > 1 {
  153. synergystatus = 1
  154. }
  155. strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName)
  156. taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus)
  157. if err != nil {
  158. return nil, errors.New("database add failed: " + err.Error())
  159. }
  160. var errmsg string
  161. for _, err := range errs {
  162. e := (err).(struct {
  163. err error
  164. clusterId string
  165. })
  166. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  167. errmsg += msg
  168. err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
  169. if err != nil {
  170. return nil, errors.New("database add failed: " + err.Error())
  171. }
  172. }
  173. for _, s := range results {
  174. if s.Msg != "" {
  175. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  176. errmsg += msg
  177. err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
  178. if err != nil {
  179. return nil, errors.New("database add failed: " + err.Error())
  180. }
  181. } else {
  182. msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
  183. errmsg += msg
  184. err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
  185. if err != nil {
  186. return nil, errors.New("database add failed: " + err.Error())
  187. }
  188. }
  189. }
  190. return nil, errors.New(errmsg)
  191. }
  192. return results, nil
  193. }
  194. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  195. var wg sync.WaitGroup
  196. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  197. var ch = make(chan *collector.ResourceStats, clustersNum)
  198. var errCh = make(chan interface{}, clustersNum)
  199. var resourceSpecs []*collector.ResourceStats
  200. var errs []interface{}
  201. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  202. wg.Add(1)
  203. rc := resourceCollector
  204. id := s
  205. go func() {
  206. spec, err := rc.GetResourceStats(as.ctx)
  207. if err != nil {
  208. e := struct {
  209. err error
  210. clusterId string
  211. }{
  212. err: err,
  213. clusterId: id,
  214. }
  215. errCh <- e
  216. wg.Done()
  217. return
  218. }
  219. ch <- spec
  220. wg.Done()
  221. }()
  222. }
  223. wg.Wait()
  224. close(ch)
  225. close(errCh)
  226. for s := range ch {
  227. resourceSpecs = append(resourceSpecs, s)
  228. }
  229. for e := range errCh {
  230. errs = append(errs, e)
  231. }
  232. if len(errs) == clustersNum {
  233. return nil, errors.New("get resources failed")
  234. }
  235. if len(errs) != 0 {
  236. var msg string
  237. for _, err := range errs {
  238. e := (err).(struct {
  239. err error
  240. clusterId string
  241. })
  242. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  243. }
  244. return nil, errors.New(msg)
  245. }
  246. return resourceSpecs, nil
  247. }
  248. func convertType(in interface{}) (*AiResult, error) {
  249. var result AiResult
  250. switch (in).(type) {
  251. case *hpcAC.SubmitTaskAiResp:
  252. resp := (in).(*hpcAC.SubmitTaskAiResp)
  253. if resp.Code == "0" {
  254. result.TaskId = resp.Data
  255. } else {
  256. result.Msg = resp.Msg
  257. }
  258. return &result, nil
  259. case *octopus.CreateTrainJobResp:
  260. resp := (in).(*octopus.CreateTrainJobResp)
  261. if resp.Success {
  262. result.TaskId = resp.Payload.JobId
  263. } else {
  264. result.Msg = resp.Error.Message
  265. }
  266. return &result, nil
  267. default:
  268. return nil, errors.New("ai task response failed")
  269. }
  270. }
  271. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  272. origJSON, err := json.Marshal(opt)
  273. if err != nil {
  274. return nil, err
  275. }
  276. clone := option.AiOption{}
  277. if err = json.Unmarshal(origJSON, &clone); err != nil {
  278. return nil, err
  279. }
  280. return &clone, nil
  281. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.