You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 9.1 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  29. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  30. "sync"
  31. )
  32. type AiScheduler struct {
  33. yamlString string
  34. task *response.TaskInfo
  35. *scheduler.Scheduler
  36. option *option.AiOption
  37. ctx context.Context
  38. }
  39. type AiResult struct {
  40. JobId string
  41. ClusterId string
  42. Strategy string
  43. Replica int32
  44. Card string
  45. Msg string
  46. }
  47. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  48. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  49. }
  50. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  51. ai := models.Ai{
  52. AdapterId: participantId,
  53. TaskId: task.TaskId,
  54. Status: "Saved",
  55. YamlString: as.yamlString,
  56. }
  57. utils.Convert(task.Metadata, &ai)
  58. return ai, nil
  59. }
  60. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  61. if len(as.option.ClusterIds) == 1 {
  62. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  63. }
  64. resources, err := as.findClustersWithResources()
  65. if err != nil {
  66. return nil, err
  67. }
  68. if len(resources) == 0 {
  69. return nil, errors.New("no cluster has resources")
  70. }
  71. if len(resources) == 1 {
  72. var cluster strategy.AssignedCluster
  73. cluster.ClusterId = resources[0].ClusterId
  74. cluster.Replicas = 1
  75. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  76. }
  77. params := &param.Params{Resources: resources}
  78. switch as.option.StrategyName {
  79. case strategy.REPLICATION:
  80. var clusterIds []string
  81. for _, resource := range resources {
  82. clusterIds = append(clusterIds, resource.ClusterId)
  83. }
  84. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  85. return strategy, nil
  86. case strategy.RESOURCES_PRICING:
  87. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  88. return strategy, nil
  89. case strategy.DYNAMIC_RESOURCES:
  90. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  91. return strategy, nil
  92. case strategy.STATIC_WEIGHT:
  93. //todo resources should match cluster StaticWeightMap
  94. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  95. return strategy, nil
  96. }
  97. return nil, errors.New("no strategy has been chosen")
  98. }
  99. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
  100. if clusters == nil {
  101. return nil, errors.New("clusters is nil")
  102. }
  103. for i := len(clusters) - 1; i >= 0; i-- {
  104. if clusters[i].Replicas == 0 {
  105. clusters = append(clusters[:i], clusters[i+1:]...)
  106. }
  107. }
  108. if len(clusters) == 0 {
  109. return nil, errors.New("clusters is nil")
  110. }
  111. var wg sync.WaitGroup
  112. var results []*AiResult
  113. var errs []interface{}
  114. var taskNum int32
  115. for _, cluster := range clusters {
  116. taskNum += cluster.Replicas
  117. }
  118. var ch = make(chan *AiResult, taskNum)
  119. var errCh = make(chan interface{}, taskNum)
  120. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  121. for _, cluster := range clusters {
  122. c := cluster
  123. for i := 0; i < int(c.Replicas); i++ {
  124. wg.Add(1)
  125. go func() {
  126. opt, _ := cloneAiOption(as.option)
  127. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
  128. if err != nil {
  129. e := struct {
  130. err error
  131. clusterId string
  132. }{
  133. err: err,
  134. clusterId: c.ClusterId,
  135. }
  136. errCh <- e
  137. wg.Done()
  138. return
  139. }
  140. result, _ := convertType(resp)
  141. result.Replica = c.Replicas
  142. result.ClusterId = c.ClusterId
  143. result.Strategy = as.option.StrategyName
  144. result.Card = opt.ComputeCard
  145. ch <- result
  146. wg.Done()
  147. }()
  148. }
  149. }
  150. wg.Wait()
  151. close(ch)
  152. close(errCh)
  153. for e := range errCh {
  154. errs = append(errs, e)
  155. }
  156. for s := range ch {
  157. results = append(results, s)
  158. }
  159. if len(errs) != 0 {
  160. var synergystatus int64
  161. if len(clusters) > 1 {
  162. synergystatus = 1
  163. }
  164. strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName)
  165. taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus)
  166. if err != nil {
  167. return nil, errors.New("database add failed: " + err.Error())
  168. }
  169. var errmsg string
  170. for _, err := range errs {
  171. e := (err).(struct {
  172. err error
  173. clusterId string
  174. })
  175. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  176. errmsg += msg
  177. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  178. err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg)
  179. if err != nil {
  180. return nil, errors.New("database add failed: " + err.Error())
  181. }
  182. }
  183. for _, s := range results {
  184. as.option.ComputeCard = s.Card //execute card
  185. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  186. if s.Msg != "" {
  187. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  188. errmsg += msg
  189. err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg)
  190. if err != nil {
  191. return nil, errors.New("database add failed: " + err.Error())
  192. }
  193. } else {
  194. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  195. errmsg += msg
  196. err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  197. if err != nil {
  198. return nil, errors.New("database add failed: " + err.Error())
  199. }
  200. }
  201. }
  202. logx.Errorf(errors.New(errmsg).Error())
  203. return nil, errors.New(errmsg)
  204. }
  205. return results, nil
  206. }
  207. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  208. var wg sync.WaitGroup
  209. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  210. var ch = make(chan *collector.ResourceStats, clustersNum)
  211. var errCh = make(chan interface{}, clustersNum)
  212. var resourceSpecs []*collector.ResourceStats
  213. var errs []interface{}
  214. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  215. wg.Add(1)
  216. rc := resourceCollector
  217. id := s
  218. go func() {
  219. spec, err := rc.GetResourceStats(as.ctx)
  220. if err != nil {
  221. e := struct {
  222. err error
  223. clusterId string
  224. }{
  225. err: err,
  226. clusterId: id,
  227. }
  228. errCh <- e
  229. wg.Done()
  230. return
  231. }
  232. ch <- spec
  233. wg.Done()
  234. }()
  235. }
  236. wg.Wait()
  237. close(ch)
  238. close(errCh)
  239. for s := range ch {
  240. resourceSpecs = append(resourceSpecs, s)
  241. }
  242. for e := range errCh {
  243. errs = append(errs, e)
  244. }
  245. if len(errs) == clustersNum {
  246. return nil, errors.New("get resources failed")
  247. }
  248. if len(errs) != 0 {
  249. var msg string
  250. for _, err := range errs {
  251. e := (err).(struct {
  252. err error
  253. clusterId string
  254. })
  255. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  256. }
  257. return nil, errors.New(msg)
  258. }
  259. return resourceSpecs, nil
  260. }
  261. func convertType(in interface{}) (*AiResult, error) {
  262. var result AiResult
  263. switch (in).(type) {
  264. case *hpcAC.SubmitTaskAiResp:
  265. resp := (in).(*hpcAC.SubmitTaskAiResp)
  266. if resp.Code == "0" {
  267. result.JobId = resp.Data
  268. } else {
  269. result.Msg = resp.Msg
  270. }
  271. return &result, nil
  272. case *octopus.CreateTrainJobResp:
  273. resp := (in).(*octopus.CreateTrainJobResp)
  274. if resp.Success {
  275. result.JobId = resp.Payload.JobId
  276. } else {
  277. result.Msg = resp.Error.Message
  278. }
  279. return &result, nil
  280. default:
  281. return nil, errors.New("ai task response failed")
  282. }
  283. }
  284. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  285. origJSON, err := json.Marshal(opt)
  286. if err != nil {
  287. return nil, err
  288. }
  289. clone := option.AiOption{}
  290. if err = json.Unmarshal(origJSON, &clone); err != nil {
  291. return nil, err
  292. }
  293. return &clone, nil
  294. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.