You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 11 kB

10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  30. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  31. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  32. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  33. "gitlink.org.cn/JointCloud/pcm-openi/model"
  34. "strconv"
  35. "sync"
  36. )
  37. type AiScheduler struct {
  38. yamlString string
  39. task *response.TaskInfo
  40. *scheduler.Scheduler
  41. option *option.AiOption
  42. ctx context.Context
  43. }
  44. type AiResult struct {
  45. AdapterId string
  46. TaskName string
  47. JobId string
  48. ClusterId string
  49. Strategy string
  50. Replica int32
  51. Card string
  52. Msg string
  53. }
  54. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  55. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  56. }
  57. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  58. ai := models.Ai{
  59. AdapterId: participantId,
  60. TaskId: task.TaskId,
  61. Status: "Saved",
  62. YamlString: as.yamlString,
  63. }
  64. utils.Convert(task.Metadata, &ai)
  65. return ai, nil
  66. }
  67. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  68. if as.option.ComputeCard != "" {
  69. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  70. if ok {
  71. for _, id := range as.option.ClusterIds {
  72. cm, ok := m[id]
  73. if ok {
  74. cards, err := cm.GetComputeCards(as.ctx)
  75. if err != nil {
  76. return nil, err
  77. }
  78. if common.Contains(cards, as.option.ComputeCard) {
  79. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  80. }
  81. }
  82. }
  83. }
  84. }
  85. if len(as.option.ClusterIds) == 1 {
  86. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  87. }
  88. resources, err := as.findClustersWithResources()
  89. if err != nil {
  90. return nil, err
  91. }
  92. if len(resources) == 0 {
  93. return nil, errors.New("no cluster has resources")
  94. }
  95. if len(resources) == 1 {
  96. var cluster strategy.AssignedCluster
  97. cluster.ClusterId = resources[0].ClusterId
  98. cluster.Replicas = 1
  99. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  100. }
  101. params := &param.Params{Resources: resources}
  102. switch as.option.StrategyName {
  103. case strategy.REPLICATION:
  104. var clusterIds []string
  105. for _, resource := range resources {
  106. if resource == nil {
  107. continue
  108. }
  109. clusterIds = append(clusterIds, resource.ClusterId)
  110. }
  111. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  112. return strategy, nil
  113. case strategy.RESOURCES_PRICING:
  114. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  115. return strategy, nil
  116. case strategy.DYNAMIC_RESOURCES:
  117. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  118. return strategy, nil
  119. case strategy.STATIC_WEIGHT:
  120. //todo resources should match cluster StaticWeightMap
  121. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  122. return strategy, nil
  123. case strategy.RANDOM:
  124. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  125. return strategy, nil
  126. }
  127. return nil, errors.New("no strategy has been chosen")
  128. }
  129. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
  130. if clusters == nil {
  131. return nil, errors.New("clusters is nil")
  132. }
  133. for i := len(clusters) - 1; i >= 0; i-- {
  134. if clusters[i].Replicas == 0 {
  135. clusters = append(clusters[:i], clusters[i+1:]...)
  136. }
  137. }
  138. if len(clusters) == 0 {
  139. return nil, errors.New("clusters is nil")
  140. }
  141. var wg sync.WaitGroup
  142. var results []*AiResult
  143. var mu sync.Mutex
  144. var errs []interface{}
  145. var taskNum int32
  146. for _, cluster := range clusters {
  147. taskNum += cluster.Replicas
  148. }
  149. var ch = make(chan *AiResult, taskNum)
  150. var errCh = make(chan interface{}, taskNum)
  151. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  152. for _, cluster := range clusters {
  153. c := cluster
  154. for i := 0; i < int(c.Replicas); i++ {
  155. wg.Add(1)
  156. go func() {
  157. opt, _ := cloneAiOption(as.option)
  158. // decide opt params by mode
  159. updateAiOptionByMode(c, opt, mode)
  160. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
  161. if err != nil {
  162. e := struct {
  163. err error
  164. clusterId string
  165. }{
  166. err: err,
  167. clusterId: c.ClusterId,
  168. }
  169. errCh <- e
  170. wg.Done()
  171. return
  172. }
  173. result := &AiResult{}
  174. mu.Lock()
  175. result, _ = convertType(resp)
  176. mu.Unlock()
  177. result.AdapterId = opt.AdapterId
  178. result.TaskName = opt.TaskName
  179. result.Replica = c.Replicas
  180. result.ClusterId = c.ClusterId
  181. result.Strategy = as.option.StrategyName
  182. result.Card = opt.ComputeCard
  183. ch <- result
  184. wg.Done()
  185. }()
  186. }
  187. }
  188. wg.Wait()
  189. close(ch)
  190. close(errCh)
  191. for e := range errCh {
  192. errs = append(errs, e)
  193. }
  194. for s := range ch {
  195. results = append(results, s)
  196. }
  197. if len(errs) != 0 {
  198. var synergystatus int64
  199. if len(clusters) > 1 {
  200. synergystatus = 1
  201. }
  202. taskId, err := as.CreateTask(as.option.TaskName, synergystatus, as.option.StrategyName, "")
  203. if err != nil {
  204. return nil, err
  205. }
  206. // aiTasks
  207. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  208. if err != nil {
  209. return nil, err
  210. }
  211. var errmsg string
  212. for _, err := range errs {
  213. e := (err).(struct {
  214. err error
  215. clusterId string
  216. })
  217. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  218. errmsg += msg
  219. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  220. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  221. if err != nil {
  222. return nil, errors.New("database add failed: " + err.Error())
  223. }
  224. }
  225. for _, s := range results {
  226. as.option.ComputeCard = s.Card //execute card
  227. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  228. if s.Msg != "" {
  229. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  230. errmsg += msg
  231. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  232. if err != nil {
  233. return nil, errors.New("database add failed: " + err.Error())
  234. }
  235. } else {
  236. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  237. errmsg += msg
  238. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  239. if err != nil {
  240. return nil, errors.New("database add failed: " + err.Error())
  241. }
  242. }
  243. }
  244. logx.Errorf(errors.New(errmsg).Error())
  245. return nil, errors.New(errmsg)
  246. }
  247. return results, nil
  248. }
  249. func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
  250. switch mode {
  251. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  252. opt.Cmd = cluster.Cmd
  253. opt.Envs = cluster.Envs
  254. opt.Params = cluster.Params
  255. opt.ImageId = cluster.ImageId
  256. opt.AlgorithmId = cluster.CodeId
  257. opt.DatasetsId = cluster.DatasetId
  258. opt.ResourcesRequired = cluster.ResourcesRequired
  259. default:
  260. }
  261. }
  262. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  263. var wg sync.WaitGroup
  264. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  265. var ch = make(chan *collector.ResourceStats, clustersNum)
  266. var errCh = make(chan interface{}, clustersNum)
  267. var resourceSpecs []*collector.ResourceStats
  268. var errs []interface{}
  269. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  270. wg.Add(1)
  271. rc := resourceCollector
  272. id := s
  273. go func() {
  274. spec, err := rc.GetResourceStats(as.ctx)
  275. if err != nil {
  276. e := struct {
  277. err error
  278. clusterId string
  279. }{
  280. err: err,
  281. clusterId: id,
  282. }
  283. errCh <- e
  284. wg.Done()
  285. return
  286. }
  287. ch <- spec
  288. wg.Done()
  289. }()
  290. }
  291. wg.Wait()
  292. close(ch)
  293. close(errCh)
  294. for s := range ch {
  295. resourceSpecs = append(resourceSpecs, s)
  296. }
  297. for e := range errCh {
  298. errs = append(errs, e)
  299. }
  300. if len(errs) == clustersNum {
  301. return nil, errors.New("get resources failed")
  302. }
  303. if len(errs) != 0 {
  304. var msg string
  305. for _, err := range errs {
  306. e := (err).(struct {
  307. err error
  308. clusterId string
  309. })
  310. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  311. }
  312. //return nil, errors.New(msg)
  313. }
  314. return resourceSpecs, nil
  315. }
  316. func convertType(in interface{}) (*AiResult, error) {
  317. var result AiResult
  318. switch (in).(type) {
  319. case *hpcAC.SubmitTaskAiResp:
  320. resp := (in).(*hpcAC.SubmitTaskAiResp)
  321. if resp.Code == "0" {
  322. result.JobId = resp.Data
  323. } else {
  324. result.Msg = resp.Msg
  325. }
  326. return &result, nil
  327. case *octopus.CreateTrainJobResp:
  328. resp := (in).(*octopus.CreateTrainJobResp)
  329. if resp.Success {
  330. result.JobId = resp.Payload.JobId
  331. } else {
  332. result.Msg = resp.Error.Message
  333. }
  334. return &result, nil
  335. case *modelartsservice.CreateTrainingJobResp:
  336. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  337. if resp.ErrorMsg != "" {
  338. result.Msg = resp.ErrorMsg
  339. } else {
  340. result.JobId = resp.Metadata.Id
  341. }
  342. return &result, nil
  343. case model.CreateTask:
  344. resp := (in).(model.CreateTask)
  345. if resp.Code != 0 {
  346. result.Msg = resp.Msg
  347. } else {
  348. result.JobId = strconv.Itoa(resp.Data.Id)
  349. }
  350. return &result, nil
  351. default:
  352. return nil, errors.New("ai task response failed")
  353. }
  354. }
  355. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  356. origJSON, err := json.Marshal(opt)
  357. if err != nil {
  358. return nil, err
  359. }
  360. clone := option.AiOption{}
  361. if err = json.Unmarshal(origJSON, &clone); err != nil {
  362. return nil, err
  363. }
  364. return &clone, nil
  365. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.