You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 13 kB

10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
6 months ago
11 months ago
11 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  30. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  31. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  32. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  33. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  34. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  35. omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model"
  36. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  37. "gitlink.org.cn/JointCloud/pcm-openi/model"
  38. "strconv"
  39. "strings"
  40. "sync"
  41. )
  42. type AiScheduler struct {
  43. yamlString string
  44. task *response.TaskInfo
  45. *scheduler.Scheduler
  46. option *option.AiOption
  47. ctx context.Context
  48. }
  49. type AiResult struct {
  50. AdapterId string
  51. TaskName string
  52. JobId string
  53. ClusterId string
  54. Strategy string
  55. Replica int32
  56. Card string
  57. Msg string
  58. Output string
  59. }
  60. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  61. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  62. }
  63. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  64. ai := models.Ai{
  65. AdapterId: participantId,
  66. TaskId: task.TaskId,
  67. Status: "Saved",
  68. YamlString: as.yamlString,
  69. }
  70. utils.Convert(task.Metadata, &ai)
  71. return ai, nil
  72. }
  73. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  74. if as.option.ComputeCard != "" {
  75. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  76. if ok {
  77. for _, id := range as.option.ClusterIds {
  78. cm, ok := m[id]
  79. if ok {
  80. cards, err := cm.GetComputeCards(as.ctx)
  81. if err != nil {
  82. return nil, err
  83. }
  84. if common.Contains(cards, as.option.ComputeCard) {
  85. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  86. }
  87. }
  88. }
  89. }
  90. }
  91. if len(as.option.ClusterIds) == 1 {
  92. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  93. }
  94. resources, err := as.findClustersWithResources()
  95. if err != nil {
  96. return nil, err
  97. }
  98. if len(resources) == 0 {
  99. return nil, errors.New("no cluster has resources")
  100. }
  101. if len(resources) == 1 {
  102. var cluster strategy.AssignedCluster
  103. cluster.ClusterId = resources[0].ClusterId
  104. cluster.Replicas = 1
  105. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  106. }
  107. params := &param.Params{Resources: resources}
  108. switch as.option.StrategyName {
  109. case strategy.REPLICATION:
  110. var clusterIds []string
  111. for _, resource := range resources {
  112. if resource == nil {
  113. continue
  114. }
  115. clusterIds = append(clusterIds, resource.ClusterId)
  116. }
  117. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  118. return strategy, nil
  119. case strategy.RESOURCES_PRICING:
  120. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  121. return strategy, nil
  122. case strategy.DYNAMIC_RESOURCES:
  123. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  124. return strategy, nil
  125. case strategy.STATIC_WEIGHT:
  126. //todo resources should match cluster StaticWeightMap
  127. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  128. return strategy, nil
  129. case strategy.RANDOM:
  130. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  131. return strategy, nil
  132. }
  133. return nil, errors.New("no strategy has been chosen")
  134. }
  135. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
  136. if clusters == nil {
  137. return nil, errors.New("clusters is nil")
  138. }
  139. for i := len(clusters) - 1; i >= 0; i-- {
  140. if clusters[i].Replicas == 0 {
  141. clusters = append(clusters[:i], clusters[i+1:]...)
  142. }
  143. }
  144. if len(clusters) == 0 {
  145. return nil, errors.New("clusters is nil")
  146. }
  147. var wg sync.WaitGroup
  148. var results []*AiResult
  149. var mu sync.Mutex
  150. var errs []interface{}
  151. var taskNum int32
  152. for _, cluster := range clusters {
  153. taskNum += cluster.Replicas
  154. }
  155. var ch = make(chan *AiResult, taskNum)
  156. var errCh = make(chan interface{}, taskNum)
  157. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  158. for _, cluster := range clusters {
  159. c := cluster
  160. for i := 0; i < int(c.Replicas); i++ {
  161. wg.Add(1)
  162. go func() {
  163. opt, _ := cloneAiOption(as.option)
  164. // decide opt params by mode
  165. updateAiOptionByMode(c, opt, mode)
  166. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
  167. if err != nil {
  168. e := struct {
  169. err error
  170. clusterId string
  171. }{
  172. err: err,
  173. clusterId: c.ClusterId,
  174. }
  175. errCh <- e
  176. wg.Done()
  177. return
  178. }
  179. result := &AiResult{}
  180. mu.Lock()
  181. result, _ = convertType(resp)
  182. mu.Unlock()
  183. result.AdapterId = opt.AdapterId
  184. result.TaskName = opt.TaskName
  185. result.Replica = c.Replicas
  186. result.ClusterId = c.ClusterId
  187. result.Strategy = as.option.StrategyName
  188. result.Card = opt.ComputeCard
  189. result.Output = opt.Output
  190. ch <- result
  191. wg.Done()
  192. }()
  193. }
  194. }
  195. wg.Wait()
  196. close(ch)
  197. close(errCh)
  198. for e := range errCh {
  199. errs = append(errs, e)
  200. }
  201. for s := range ch {
  202. results = append(results, s)
  203. }
  204. err := as.handleErrors(errs, clusters, results, mode)
  205. if err != nil {
  206. return nil, err
  207. }
  208. return results, nil
  209. }
  210. func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.AssignedCluster, results []*AiResult, mode int) error {
  211. if len(errs) != 0 {
  212. var synergystatus int64
  213. if len(clusters) > 1 {
  214. synergystatus = 1
  215. }
  216. var taskId int64
  217. switch mode {
  218. case executor.SUBMIT_MODE_JOINT_CLOUD:
  219. tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil)
  220. if err != nil {
  221. return err
  222. }
  223. taskId = tid
  224. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  225. taskId = as.option.TaskId
  226. }
  227. // aiTasks
  228. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  229. if err != nil {
  230. return err
  231. }
  232. var errmsg string
  233. for _, err := range errs {
  234. e := (err).(struct {
  235. err error
  236. clusterId string
  237. })
  238. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  239. errmsg += msg
  240. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  241. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  242. if err != nil {
  243. return errors.New("database add failed: " + err.Error())
  244. }
  245. //report msg
  246. report := &jcs.JobStatusReportReq{}
  247. reportMsg := &jcs.TrainReportMessage{
  248. Type: "Train",
  249. TaskName: "",
  250. TaskID: strconv.FormatInt(taskId, 10),
  251. Status: false,
  252. Message: msg,
  253. ClusterID: e.clusterId,
  254. Output: "",
  255. }
  256. report.Report = reportMsg
  257. //report status
  258. _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  259. logx.Errorf(errors.New(errmsg).Error())
  260. return errors.New(errmsg)
  261. }
  262. for _, s := range results {
  263. as.option.ComputeCard = s.Card //execute card
  264. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  265. if s.Msg != "" {
  266. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  267. errmsg += msg
  268. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  269. if err != nil {
  270. return errors.New("database add failed: " + err.Error())
  271. }
  272. } else {
  273. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  274. errmsg += msg
  275. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  276. if err != nil {
  277. return errors.New("database add failed: " + err.Error())
  278. }
  279. }
  280. //add report msg
  281. report := &jcs.JobStatusReportReq{}
  282. reportMsg := &jcs.TrainReportMessage{
  283. Type: "Train",
  284. TaskName: "",
  285. TaskID: strconv.FormatInt(taskId, 10),
  286. Status: false,
  287. Message: s.Msg,
  288. ClusterID: s.ClusterId,
  289. Output: "",
  290. }
  291. report.Report = reportMsg
  292. //report status
  293. _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  294. }
  295. logx.Errorf(errors.New(errmsg).Error())
  296. return errors.New(errmsg)
  297. }
  298. return nil
  299. }
  300. func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
  301. switch mode {
  302. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  303. opt.Cmd = cluster.Cmd
  304. opt.Envs = cluster.Envs
  305. opt.Params = cluster.Params
  306. opt.ImageId = cluster.ImageId
  307. opt.AlgorithmId = cluster.CodeId
  308. opt.DatasetsId = cluster.DatasetId
  309. opt.ResourcesRequired = cluster.ResourcesRequired
  310. // assume params include output for modelarts
  311. for _, param := range cluster.Params {
  312. s := strings.Split(param, storeLink.COMMA)
  313. if s[0] == "output" {
  314. opt.Output = s[1]
  315. }
  316. }
  317. default:
  318. }
  319. }
  320. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  321. var wg sync.WaitGroup
  322. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  323. var ch = make(chan *collector.ResourceStats, clustersNum)
  324. var errCh = make(chan interface{}, clustersNum)
  325. var resourceSpecs []*collector.ResourceStats
  326. var errs []interface{}
  327. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  328. wg.Add(1)
  329. rc := resourceCollector
  330. id := s
  331. go func() {
  332. spec, err := rc.GetResourceStats(as.ctx)
  333. if err != nil {
  334. e := struct {
  335. err error
  336. clusterId string
  337. }{
  338. err: err,
  339. clusterId: id,
  340. }
  341. errCh <- e
  342. wg.Done()
  343. return
  344. }
  345. ch <- spec
  346. wg.Done()
  347. }()
  348. }
  349. wg.Wait()
  350. close(ch)
  351. close(errCh)
  352. for s := range ch {
  353. resourceSpecs = append(resourceSpecs, s)
  354. }
  355. for e := range errCh {
  356. errs = append(errs, e)
  357. }
  358. if len(errs) == clustersNum {
  359. return nil, errors.New("get resources failed")
  360. }
  361. if len(errs) != 0 {
  362. var msg string
  363. for _, err := range errs {
  364. e := (err).(struct {
  365. err error
  366. clusterId string
  367. })
  368. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  369. }
  370. //return nil, errors.New(msg)
  371. }
  372. return resourceSpecs, nil
  373. }
  374. func convertType(in interface{}) (*AiResult, error) {
  375. var result AiResult
  376. switch (in).(type) {
  377. case *hpcAC.SubmitTaskAiResp:
  378. resp := (in).(*hpcAC.SubmitTaskAiResp)
  379. if resp.Code == "0" {
  380. result.JobId = resp.Data
  381. } else {
  382. result.Msg = resp.Msg
  383. }
  384. return &result, nil
  385. case *octopus.CreateTrainJobResp:
  386. resp := (in).(*octopus.CreateTrainJobResp)
  387. if resp.Success {
  388. result.JobId = resp.Payload.JobId
  389. } else {
  390. result.Msg = resp.Error.Message
  391. }
  392. return &result, nil
  393. case *modelartsservice.CreateTrainingJobResp:
  394. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  395. if resp.ErrorMsg != "" {
  396. result.Msg = resp.ErrorMsg
  397. } else {
  398. result.JobId = resp.Metadata.Id
  399. }
  400. return &result, nil
  401. case model.CreateTask:
  402. resp := (in).(model.CreateTask)
  403. if resp.Code != 0 {
  404. result.Msg = resp.Msg
  405. } else {
  406. result.JobId = strconv.Itoa(resp.Data.Id)
  407. }
  408. return &result, nil
  409. case *entity.OctCreateJobResp:
  410. resp := (in).(entity.OctCreateJobResp)
  411. if resp.Code != 200 {
  412. result.Msg = resp.Msg
  413. } else {
  414. job := (resp.Data).(*omodel.OctCreateJob)
  415. result.JobId = job.JobId
  416. }
  417. return &result, nil
  418. default:
  419. return nil, errors.New("ai task response failed")
  420. }
  421. }
  422. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  423. origJSON, err := json.Marshal(opt)
  424. if err != nil {
  425. return nil, err
  426. }
  427. clone := option.AiOption{}
  428. if err = json.Unmarshal(origJSON, &clone); err != nil {
  429. return nil, err
  430. }
  431. return &clone, nil
  432. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.