You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 14 kB

3 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
3 months ago
11 months ago
11 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "github.com/zeromicro/go-zero/core/logx"
  22. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  30. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  31. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
  32. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  33. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  34. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  35. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  36. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  37. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  38. omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model"
  39. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  40. "gitlink.org.cn/JointCloud/pcm-openi/model"
  41. )
  42. type AiScheduler struct {
  43. yamlString string
  44. task *response.TaskInfo
  45. *scheduler.Scheduler
  46. option *option.AiOption
  47. ctx context.Context
  48. }
  49. type AiResult struct {
  50. AdapterId string
  51. TaskName string
  52. JobId string
  53. ClusterId string
  54. Strategy string
  55. Replica int32
  56. Card string
  57. Msg string
  58. Output string
  59. }
  60. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  61. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  62. }
  63. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  64. ai := models.Ai{
  65. AdapterId: participantId,
  66. TaskId: task.TaskId,
  67. Status: "Saved",
  68. YamlString: as.yamlString,
  69. }
  70. utils.Convert(task.Metadata, &ai)
  71. return ai, nil
  72. }
  73. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  74. if as.option.ComputeCard != "" {
  75. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  76. if ok {
  77. for _, id := range as.option.ClusterIds {
  78. cm, ok := m[id]
  79. if ok {
  80. cards, err := cm.GetComputeCards(as.ctx)
  81. if err != nil {
  82. return nil, err
  83. }
  84. if common.Contains(cards, as.option.ComputeCard) {
  85. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  86. }
  87. }
  88. }
  89. }
  90. }
  91. if len(as.option.ClusterIds) == 1 {
  92. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  93. }
  94. resources, err := as.findClustersWithResources()
  95. if err != nil {
  96. return nil, err
  97. }
  98. if len(resources) == 0 {
  99. return nil, errors.New("no cluster has resources")
  100. }
  101. if len(resources) == 1 {
  102. var cluster strategy.AssignedCluster
  103. cluster.ClusterId = resources[0].ClusterId
  104. cluster.Replicas = 1
  105. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  106. }
  107. params := &param.Params{Resources: resources}
  108. switch as.option.StrategyName {
  109. case strategy.REPLICATION:
  110. var clusterIds []string
  111. for _, resource := range resources {
  112. if resource == nil {
  113. continue
  114. }
  115. clusterIds = append(clusterIds, resource.ClusterId)
  116. }
  117. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  118. return strategy, nil
  119. case strategy.RESOURCES_PRICING:
  120. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  121. return strategy, nil
  122. case strategy.DYNAMIC_RESOURCES:
  123. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  124. return strategy, nil
  125. case strategy.STATIC_WEIGHT:
  126. //todo resources should match cluster StaticWeightMap
  127. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  128. return strategy, nil
  129. case strategy.RANDOM:
  130. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  131. return strategy, nil
  132. }
  133. return nil, errors.New("no strategy has been chosen")
  134. }
  135. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
  136. if clusters == nil {
  137. return nil, errors.New("clusters is nil")
  138. }
  139. for i := len(clusters) - 1; i >= 0; i-- {
  140. if clusters[i].Replicas == 0 {
  141. clusters = append(clusters[:i], clusters[i+1:]...)
  142. }
  143. }
  144. if len(clusters) == 0 {
  145. return nil, errors.New("clusters is nil")
  146. }
  147. var wg sync.WaitGroup
  148. var results []*AiResult
  149. var mu sync.Mutex
  150. var errs []interface{}
  151. var taskNum int32
  152. for _, cluster := range clusters {
  153. taskNum += cluster.Replicas
  154. }
  155. var ch = make(chan *AiResult, taskNum)
  156. var errCh = make(chan interface{}, taskNum)
  157. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  158. for _, cluster := range clusters {
  159. c := cluster
  160. for i := 0; i < int(c.Replicas); i++ {
  161. wg.Add(1)
  162. go func() {
  163. opt, _ := cloneAiOption(as.option)
  164. // decide opt params by mode
  165. updateAiOptionByMode(c, opt, mode)
  166. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
  167. if err != nil {
  168. e := struct {
  169. err error
  170. clusterId string
  171. }{
  172. err: err,
  173. clusterId: c.ClusterId,
  174. }
  175. errCh <- e
  176. wg.Done()
  177. return
  178. }
  179. result := &AiResult{}
  180. mu.Lock()
  181. result, _ = convertType(resp)
  182. mu.Unlock()
  183. result.AdapterId = opt.AdapterId
  184. result.TaskName = opt.TaskName
  185. result.Replica = c.Replicas
  186. result.ClusterId = c.ClusterId
  187. result.Strategy = as.option.StrategyName
  188. result.Card = opt.ComputeCard
  189. result.Output = opt.Output
  190. ch <- result
  191. wg.Done()
  192. }()
  193. }
  194. }
  195. wg.Wait()
  196. close(ch)
  197. close(errCh)
  198. for e := range errCh {
  199. errs = append(errs, e)
  200. }
  201. for s := range ch {
  202. results = append(results, s)
  203. }
  204. err := as.handleErrors(errs, clusters, results, mode)
  205. if err != nil {
  206. return nil, err
  207. }
  208. return results, nil
  209. }
  210. func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.AssignedCluster, results []*AiResult, mode int) error {
  211. if len(errs) != 0 {
  212. var synergystatus int64
  213. if len(clusters) > 1 {
  214. synergystatus = 1
  215. }
  216. var taskId int64
  217. switch mode {
  218. case executor.SUBMIT_MODE_JOINT_CLOUD:
  219. tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil, "0")
  220. if err != nil {
  221. return err
  222. }
  223. taskId = tid
  224. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  225. taskId = as.option.TaskId
  226. }
  227. // aiTasks
  228. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  229. if err != nil {
  230. return err
  231. }
  232. var errmsg string
  233. for _, err := range errs {
  234. e := (err).(struct {
  235. err error
  236. clusterId string
  237. })
  238. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  239. errmsg += msg
  240. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  241. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  242. if err != nil {
  243. return errors.New("database add failed: " + err.Error())
  244. }
  245. //report msg
  246. report := &jcs.JobStatusReportReq{}
  247. reportMsg := &jcs.TrainReportMessage{
  248. Type: "Train",
  249. TaskName: "",
  250. TaskID: strconv.FormatInt(taskId, 10),
  251. Status: false,
  252. Message: msg,
  253. ClusterID: e.clusterId,
  254. Output: "",
  255. }
  256. report.Report = reportMsg
  257. //report status
  258. _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  259. logx.Errorf(errors.New(errmsg).Error())
  260. return errors.New(errmsg)
  261. }
  262. for _, s := range results {
  263. as.option.ComputeCard = s.Card //execute card
  264. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  265. if s.Msg != "" {
  266. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  267. errmsg += msg
  268. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  269. if err != nil {
  270. return errors.New("database add failed: " + err.Error())
  271. }
  272. } else {
  273. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  274. errmsg += msg
  275. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  276. if err != nil {
  277. return errors.New("database add failed: " + err.Error())
  278. }
  279. }
  280. //add report msg
  281. report := &jcs.JobStatusReportReq{}
  282. reportMsg := &jcs.TrainReportMessage{
  283. Type: "Train",
  284. TaskName: "",
  285. TaskID: strconv.FormatInt(taskId, 10),
  286. Status: false,
  287. Message: s.Msg,
  288. ClusterID: s.ClusterId,
  289. Output: "",
  290. }
  291. report.Report = reportMsg
  292. //report status
  293. _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  294. }
  295. logx.Errorf(errors.New(errmsg).Error())
  296. return errors.New(errmsg)
  297. }
  298. return nil
  299. }
  300. func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
  301. switch mode {
  302. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  303. opt.Cmd = cluster.Cmd
  304. opt.Envs = cluster.Envs
  305. opt.Params = cluster.Params
  306. opt.ImageId = cluster.ImageId
  307. opt.AlgorithmId = cluster.CodeId
  308. opt.DatasetsId = cluster.DatasetId
  309. opt.ResourcesRequired = cluster.ResourcesRequired
  310. // assume params include output for modelarts
  311. for _, param := range cluster.Params {
  312. s := strings.Split(param, storeLink.COMMA)
  313. if s[0] == "output" {
  314. opt.Output = s[1]
  315. }
  316. }
  317. default:
  318. }
  319. }
  320. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  321. var wg sync.WaitGroup
  322. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  323. var ch = make(chan *collector.ResourceStats, clustersNum)
  324. var errCh = make(chan interface{}, clustersNum)
  325. var resourceSpecs []*collector.ResourceStats
  326. var errs []interface{}
  327. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  328. wg.Add(1)
  329. rc := resourceCollector
  330. id := s
  331. go func() {
  332. spec, err := rc.GetResourceStats(as.ctx)
  333. if err != nil {
  334. e := struct {
  335. err error
  336. clusterId string
  337. }{
  338. err: err,
  339. clusterId: id,
  340. }
  341. errCh <- e
  342. wg.Done()
  343. return
  344. }
  345. ch <- spec
  346. wg.Done()
  347. }()
  348. }
  349. wg.Wait()
  350. close(ch)
  351. close(errCh)
  352. for s := range ch {
  353. resourceSpecs = append(resourceSpecs, s)
  354. }
  355. for e := range errCh {
  356. errs = append(errs, e)
  357. }
  358. if len(errs) == clustersNum {
  359. return nil, errors.New("get resources failed")
  360. }
  361. if len(errs) != 0 {
  362. var msg string
  363. for _, err := range errs {
  364. e := (err).(struct {
  365. err error
  366. clusterId string
  367. })
  368. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  369. }
  370. //return nil, errors.New(msg)
  371. }
  372. return resourceSpecs, nil
  373. }
  374. func convertType(in interface{}) (*AiResult, error) {
  375. var result AiResult
  376. switch (in).(type) {
  377. case *hpcAC.SubmitTaskAiResp:
  378. resp := (in).(*hpcAC.SubmitTaskAiResp)
  379. if resp.Code == "0" {
  380. result.JobId = resp.Data
  381. } else {
  382. result.Msg = resp.Msg
  383. }
  384. return &result, nil
  385. case *octopus.CreateTrainJobResp:
  386. resp := (in).(*octopus.CreateTrainJobResp)
  387. if resp.Success {
  388. result.JobId = resp.Payload.JobId
  389. } else {
  390. result.Msg = resp.Error.Message
  391. }
  392. return &result, nil
  393. case *modelartsservice.CreateTrainingJobResp:
  394. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  395. if resp.ErrorMsg != "" {
  396. result.Msg = resp.ErrorMsg
  397. } else {
  398. result.JobId = resp.Metadata.Id
  399. }
  400. return &result, nil
  401. case model.CreateTask:
  402. resp := (in).(model.CreateTask)
  403. if resp.Code != 0 {
  404. result.Msg = resp.Msg
  405. } else {
  406. result.JobId = strconv.Itoa(resp.Data.Id)
  407. }
  408. return &result, nil
  409. case *entity.OctResp:
  410. resp := (in).(*entity.OctResp)
  411. if resp.Code != 200 {
  412. var msg string
  413. var oErr omodel.Error
  414. m, _ := json.Marshal(resp.Data)
  415. err := json.Unmarshal([]byte(m), &oErr)
  416. if err != nil {
  417. return nil, err
  418. }
  419. msg = resp.Msg + storeLink.COMMA + oErr.SubMessage
  420. result.Msg = msg
  421. } else {
  422. var cj omodel.OctCreateJob
  423. m, _ := json.Marshal(resp.Data)
  424. err := json.Unmarshal([]byte(m), &cj)
  425. if err != nil {
  426. return nil, err
  427. }
  428. result.JobId = cj.JobId
  429. }
  430. return &result, nil
  431. default:
  432. return nil, errors.New("ai task response failed")
  433. }
  434. }
  435. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  436. origJSON, err := json.Marshal(opt)
  437. if err != nil {
  438. return nil, err
  439. }
  440. clone := option.AiOption{}
  441. if err = json.Unmarshal(origJSON, &clone); err != nil {
  442. return nil, err
  443. }
  444. return &clone, nil
  445. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.