You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 13 kB

10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
6 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  30. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  31. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  32. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  33. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  34. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  35. "gitlink.org.cn/JointCloud/pcm-openi/model"
  36. "strconv"
  37. "strings"
  38. "sync"
  39. )
  40. type AiScheduler struct {
  41. yamlString string
  42. task *response.TaskInfo
  43. *scheduler.Scheduler
  44. option *option.AiOption
  45. ctx context.Context
  46. }
  47. type AiResult struct {
  48. AdapterId string
  49. TaskName string
  50. JobId string
  51. ClusterId string
  52. Strategy string
  53. Replica int32
  54. Card string
  55. Msg string
  56. Output string
  57. }
  58. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  59. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  60. }
  61. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  62. ai := models.Ai{
  63. AdapterId: participantId,
  64. TaskId: task.TaskId,
  65. Status: "Saved",
  66. YamlString: as.yamlString,
  67. }
  68. utils.Convert(task.Metadata, &ai)
  69. return ai, nil
  70. }
  71. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  72. if as.option.ComputeCard != "" {
  73. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  74. if ok {
  75. for _, id := range as.option.ClusterIds {
  76. cm, ok := m[id]
  77. if ok {
  78. cards, err := cm.GetComputeCards(as.ctx)
  79. if err != nil {
  80. return nil, err
  81. }
  82. if common.Contains(cards, as.option.ComputeCard) {
  83. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  84. }
  85. }
  86. }
  87. }
  88. }
  89. if len(as.option.ClusterIds) == 1 {
  90. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  91. }
  92. resources, err := as.findClustersWithResources()
  93. if err != nil {
  94. return nil, err
  95. }
  96. if len(resources) == 0 {
  97. return nil, errors.New("no cluster has resources")
  98. }
  99. if len(resources) == 1 {
  100. var cluster strategy.AssignedCluster
  101. cluster.ClusterId = resources[0].ClusterId
  102. cluster.Replicas = 1
  103. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  104. }
  105. params := &param.Params{Resources: resources}
  106. switch as.option.StrategyName {
  107. case strategy.REPLICATION:
  108. var clusterIds []string
  109. for _, resource := range resources {
  110. if resource == nil {
  111. continue
  112. }
  113. clusterIds = append(clusterIds, resource.ClusterId)
  114. }
  115. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  116. return strategy, nil
  117. case strategy.RESOURCES_PRICING:
  118. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  119. return strategy, nil
  120. case strategy.DYNAMIC_RESOURCES:
  121. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  122. return strategy, nil
  123. case strategy.STATIC_WEIGHT:
  124. //todo resources should match cluster StaticWeightMap
  125. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  126. return strategy, nil
  127. case strategy.RANDOM:
  128. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  129. return strategy, nil
  130. }
  131. return nil, errors.New("no strategy has been chosen")
  132. }
  133. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
  134. if clusters == nil {
  135. return nil, errors.New("clusters is nil")
  136. }
  137. for i := len(clusters) - 1; i >= 0; i-- {
  138. if clusters[i].Replicas == 0 {
  139. clusters = append(clusters[:i], clusters[i+1:]...)
  140. }
  141. }
  142. if len(clusters) == 0 {
  143. return nil, errors.New("clusters is nil")
  144. }
  145. var wg sync.WaitGroup
  146. var results []*AiResult
  147. var mu sync.Mutex
  148. var errs []interface{}
  149. var taskNum int32
  150. for _, cluster := range clusters {
  151. taskNum += cluster.Replicas
  152. }
  153. var ch = make(chan *AiResult, taskNum)
  154. var errCh = make(chan interface{}, taskNum)
  155. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  156. for _, cluster := range clusters {
  157. c := cluster
  158. for i := 0; i < int(c.Replicas); i++ {
  159. wg.Add(1)
  160. go func() {
  161. opt, _ := cloneAiOption(as.option)
  162. // decide opt params by mode
  163. updateAiOptionByMode(c, opt, mode)
  164. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
  165. if err != nil {
  166. e := struct {
  167. err error
  168. clusterId string
  169. }{
  170. err: err,
  171. clusterId: c.ClusterId,
  172. }
  173. errCh <- e
  174. wg.Done()
  175. return
  176. }
  177. result := &AiResult{}
  178. mu.Lock()
  179. result, _ = convertType(resp)
  180. mu.Unlock()
  181. result.AdapterId = opt.AdapterId
  182. result.TaskName = opt.TaskName
  183. result.Replica = c.Replicas
  184. result.ClusterId = c.ClusterId
  185. result.Strategy = as.option.StrategyName
  186. result.Card = opt.ComputeCard
  187. result.Output = opt.Output
  188. ch <- result
  189. wg.Done()
  190. }()
  191. }
  192. }
  193. wg.Wait()
  194. close(ch)
  195. close(errCh)
  196. for e := range errCh {
  197. errs = append(errs, e)
  198. }
  199. for s := range ch {
  200. results = append(results, s)
  201. }
  202. err := as.handleErrors(errs, clusters, results, mode)
  203. if err != nil {
  204. return nil, err
  205. }
  206. return results, nil
  207. }
  208. func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.AssignedCluster, results []*AiResult, mode int) error {
  209. if len(errs) != 0 {
  210. var synergystatus int64
  211. if len(clusters) > 1 {
  212. synergystatus = 1
  213. }
  214. var taskId int64
  215. switch mode {
  216. case executor.SUBMIT_MODE_JOINT_CLOUD:
  217. tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil)
  218. if err != nil {
  219. return err
  220. }
  221. taskId = tid
  222. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  223. taskId = as.option.TaskId
  224. }
  225. // aiTasks
  226. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  227. if err != nil {
  228. return err
  229. }
  230. //report msg
  231. report := &jcs.JobStatusReportReq{
  232. TaskName: "",
  233. TaskID: strconv.FormatInt(taskId, 10),
  234. Messages: make([]*jcs.ReportMessage, 0),
  235. }
  236. var errmsg string
  237. for _, err := range errs {
  238. e := (err).(struct {
  239. err error
  240. clusterId string
  241. })
  242. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  243. errmsg += msg
  244. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  245. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  246. if err != nil {
  247. return errors.New("database add failed: " + err.Error())
  248. }
  249. //add report msg
  250. jobMsg := &jcs.ReportMessage{
  251. Status: false,
  252. Message: msg,
  253. ClusterID: e.clusterId,
  254. Output: "",
  255. }
  256. report.Messages = append(report.Messages, jobMsg)
  257. }
  258. for _, s := range results {
  259. as.option.ComputeCard = s.Card //execute card
  260. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  261. if s.Msg != "" {
  262. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  263. errmsg += msg
  264. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  265. if err != nil {
  266. return errors.New("database add failed: " + err.Error())
  267. }
  268. } else {
  269. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  270. errmsg += msg
  271. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  272. if err != nil {
  273. return errors.New("database add failed: " + err.Error())
  274. }
  275. }
  276. //add report msg
  277. jobMsg := &jcs.ReportMessage{
  278. Status: false,
  279. Message: s.Msg,
  280. ClusterID: s.ClusterId,
  281. Output: "",
  282. }
  283. report.Messages = append(report.Messages, jobMsg)
  284. }
  285. //report status
  286. _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  287. logx.Errorf(errors.New(errmsg).Error())
  288. return errors.New(errmsg)
  289. }
  290. return nil
  291. }
  292. func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
  293. switch mode {
  294. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  295. opt.Cmd = cluster.Cmd
  296. opt.Envs = cluster.Envs
  297. opt.Params = cluster.Params
  298. opt.ImageId = cluster.ImageId
  299. opt.AlgorithmId = cluster.CodeId
  300. opt.DatasetsId = cluster.DatasetId
  301. opt.ResourcesRequired = cluster.ResourcesRequired
  302. // assume params include output for modelarts
  303. for _, param := range cluster.Params {
  304. s := strings.Split(param, storeLink.COMMA)
  305. if s[0] == "output" {
  306. opt.Output = s[1]
  307. }
  308. }
  309. default:
  310. }
  311. }
  312. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  313. var wg sync.WaitGroup
  314. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  315. var ch = make(chan *collector.ResourceStats, clustersNum)
  316. var errCh = make(chan interface{}, clustersNum)
  317. var resourceSpecs []*collector.ResourceStats
  318. var errs []interface{}
  319. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  320. wg.Add(1)
  321. rc := resourceCollector
  322. id := s
  323. go func() {
  324. spec, err := rc.GetResourceStats(as.ctx)
  325. if err != nil {
  326. e := struct {
  327. err error
  328. clusterId string
  329. }{
  330. err: err,
  331. clusterId: id,
  332. }
  333. errCh <- e
  334. wg.Done()
  335. return
  336. }
  337. ch <- spec
  338. wg.Done()
  339. }()
  340. }
  341. wg.Wait()
  342. close(ch)
  343. close(errCh)
  344. for s := range ch {
  345. resourceSpecs = append(resourceSpecs, s)
  346. }
  347. for e := range errCh {
  348. errs = append(errs, e)
  349. }
  350. if len(errs) == clustersNum {
  351. return nil, errors.New("get resources failed")
  352. }
  353. if len(errs) != 0 {
  354. var msg string
  355. for _, err := range errs {
  356. e := (err).(struct {
  357. err error
  358. clusterId string
  359. })
  360. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  361. }
  362. //return nil, errors.New(msg)
  363. }
  364. return resourceSpecs, nil
  365. }
  366. func convertType(in interface{}) (*AiResult, error) {
  367. var result AiResult
  368. switch (in).(type) {
  369. case *hpcAC.SubmitTaskAiResp:
  370. resp := (in).(*hpcAC.SubmitTaskAiResp)
  371. if resp.Code == "0" {
  372. result.JobId = resp.Data
  373. } else {
  374. result.Msg = resp.Msg
  375. }
  376. return &result, nil
  377. case *octopus.CreateTrainJobResp:
  378. resp := (in).(*octopus.CreateTrainJobResp)
  379. if resp.Success {
  380. result.JobId = resp.Payload.JobId
  381. } else {
  382. result.Msg = resp.Error.Message
  383. }
  384. return &result, nil
  385. case *modelartsservice.CreateTrainingJobResp:
  386. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  387. if resp.ErrorMsg != "" {
  388. result.Msg = resp.ErrorMsg
  389. } else {
  390. result.JobId = resp.Metadata.Id
  391. }
  392. return &result, nil
  393. case model.CreateTask:
  394. resp := (in).(model.CreateTask)
  395. if resp.Code != 0 {
  396. result.Msg = resp.Msg
  397. } else {
  398. result.JobId = strconv.Itoa(resp.Data.Id)
  399. }
  400. return &result, nil
  401. default:
  402. return nil, errors.New("ai task response failed")
  403. }
  404. }
  405. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  406. origJSON, err := json.Marshal(opt)
  407. if err != nil {
  408. return nil, err
  409. }
  410. clone := option.AiOption{}
  411. if err = json.Unmarshal(origJSON, &clone); err != nil {
  412. return nil, err
  413. }
  414. return &clone, nil
  415. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.