You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 11 kB

11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  30. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  31. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  32. "sync"
  33. )
  34. type AiScheduler struct {
  35. yamlString string
  36. task *response.TaskInfo
  37. *scheduler.Scheduler
  38. option *option.AiOption
  39. ctx context.Context
  40. }
  41. type AiResult struct {
  42. AdapterId string
  43. TaskName string
  44. JobId string
  45. ClusterId string
  46. Strategy string
  47. Replica int32
  48. Card string
  49. Msg string
  50. }
  51. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  52. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  53. }
  54. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  55. ai := models.Ai{
  56. AdapterId: participantId,
  57. TaskId: task.TaskId,
  58. Status: "Saved",
  59. YamlString: as.yamlString,
  60. }
  61. utils.Convert(task.Metadata, &ai)
  62. return ai, nil
  63. }
  64. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  65. if as.option.ComputeCard != "" {
  66. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  67. if ok {
  68. for _, id := range as.option.ClusterIds {
  69. cm, ok := m[id]
  70. if ok {
  71. cards, err := cm.GetComputeCards(as.ctx)
  72. if err != nil {
  73. return nil, err
  74. }
  75. if common.Contains(cards, as.option.ComputeCard) {
  76. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  77. }
  78. }
  79. }
  80. }
  81. }
  82. if len(as.option.ClusterIds) == 1 {
  83. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  84. }
  85. resources, err := as.findClustersWithResources()
  86. if err != nil {
  87. return nil, err
  88. }
  89. if len(resources) == 0 {
  90. return nil, errors.New("no cluster has resources")
  91. }
  92. if len(resources) == 1 {
  93. var cluster strategy.AssignedCluster
  94. cluster.ClusterId = resources[0].ClusterId
  95. cluster.Replicas = 1
  96. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  97. }
  98. params := &param.Params{Resources: resources}
  99. switch as.option.StrategyName {
  100. case strategy.REPLICATION:
  101. var clusterIds []string
  102. for _, resource := range resources {
  103. if resource == nil {
  104. continue
  105. }
  106. clusterIds = append(clusterIds, resource.ClusterId)
  107. }
  108. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  109. return strategy, nil
  110. case strategy.RESOURCES_PRICING:
  111. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  112. return strategy, nil
  113. case strategy.DYNAMIC_RESOURCES:
  114. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  115. return strategy, nil
  116. case strategy.STATIC_WEIGHT:
  117. //todo resources should match cluster StaticWeightMap
  118. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  119. return strategy, nil
  120. case strategy.RANDOM:
  121. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  122. return strategy, nil
  123. }
  124. return nil, errors.New("no strategy has been chosen")
  125. }
  126. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
  127. if clusters == nil {
  128. return nil, errors.New("clusters is nil")
  129. }
  130. for i := len(clusters) - 1; i >= 0; i-- {
  131. if clusters[i].Replicas == 0 {
  132. clusters = append(clusters[:i], clusters[i+1:]...)
  133. }
  134. }
  135. if len(clusters) == 0 {
  136. return nil, errors.New("clusters is nil")
  137. }
  138. var wg sync.WaitGroup
  139. var results []*AiResult
  140. var mu sync.Mutex
  141. var errs []interface{}
  142. var taskNum int32
  143. for _, cluster := range clusters {
  144. taskNum += cluster.Replicas
  145. }
  146. var ch = make(chan *AiResult, taskNum)
  147. var errCh = make(chan interface{}, taskNum)
  148. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  149. for _, cluster := range clusters {
  150. c := cluster
  151. for i := 0; i < int(c.Replicas); i++ {
  152. wg.Add(1)
  153. go func() {
  154. opt, _ := cloneAiOption(as.option)
  155. // decide opt params by mode
  156. updateAiOptionByMode(c, opt, mode)
  157. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
  158. if err != nil {
  159. e := struct {
  160. err error
  161. clusterId string
  162. }{
  163. err: err,
  164. clusterId: c.ClusterId,
  165. }
  166. errCh <- e
  167. wg.Done()
  168. return
  169. }
  170. result := &AiResult{}
  171. mu.Lock()
  172. result, _ = convertType(resp)
  173. mu.Unlock()
  174. result.AdapterId = opt.AdapterId
  175. result.TaskName = opt.TaskName
  176. result.Replica = c.Replicas
  177. result.ClusterId = c.ClusterId
  178. result.Strategy = as.option.StrategyName
  179. result.Card = opt.ComputeCard
  180. ch <- result
  181. wg.Done()
  182. }()
  183. }
  184. }
  185. wg.Wait()
  186. close(ch)
  187. close(errCh)
  188. for e := range errCh {
  189. errs = append(errs, e)
  190. }
  191. for s := range ch {
  192. results = append(results, s)
  193. }
  194. if len(errs) != 0 {
  195. var synergystatus int64
  196. if len(clusters) > 1 {
  197. synergystatus = 1
  198. }
  199. taskId, err := as.CreateTask(as.option.TaskName, synergystatus, as.option.StrategyName, "")
  200. if err != nil {
  201. return nil, err
  202. }
  203. // aiTasks
  204. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  205. if err != nil {
  206. return nil, err
  207. }
  208. var errmsg string
  209. for _, err := range errs {
  210. e := (err).(struct {
  211. err error
  212. clusterId string
  213. })
  214. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  215. errmsg += msg
  216. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  217. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  218. if err != nil {
  219. return nil, errors.New("database add failed: " + err.Error())
  220. }
  221. }
  222. for _, s := range results {
  223. as.option.ComputeCard = s.Card //execute card
  224. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  225. if s.Msg != "" {
  226. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  227. errmsg += msg
  228. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  229. if err != nil {
  230. return nil, errors.New("database add failed: " + err.Error())
  231. }
  232. } else {
  233. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  234. errmsg += msg
  235. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  236. if err != nil {
  237. return nil, errors.New("database add failed: " + err.Error())
  238. }
  239. }
  240. }
  241. logx.Errorf(errors.New(errmsg).Error())
  242. return nil, errors.New(errmsg)
  243. }
  244. return results, nil
  245. }
  246. func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
  247. switch mode {
  248. case scheduler.SUBMIT_MODE_STORAGE_SCHEDULE:
  249. opt.Cmd = cluster.Cmd
  250. opt.Envs = cluster.Envs
  251. opt.Params = cluster.Params
  252. opt.ImageId = cluster.ImageId
  253. opt.AlgorithmId = cluster.CodeId
  254. opt.DatasetsId = cluster.DatasetId
  255. opt.ResourcesRequired = cluster.ResourcesRequired
  256. default:
  257. }
  258. }
  259. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  260. var wg sync.WaitGroup
  261. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  262. var ch = make(chan *collector.ResourceStats, clustersNum)
  263. var errCh = make(chan interface{}, clustersNum)
  264. var resourceSpecs []*collector.ResourceStats
  265. var errs []interface{}
  266. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  267. wg.Add(1)
  268. rc := resourceCollector
  269. id := s
  270. go func() {
  271. spec, err := rc.GetResourceStats(as.ctx)
  272. if err != nil {
  273. e := struct {
  274. err error
  275. clusterId string
  276. }{
  277. err: err,
  278. clusterId: id,
  279. }
  280. errCh <- e
  281. wg.Done()
  282. return
  283. }
  284. ch <- spec
  285. wg.Done()
  286. }()
  287. }
  288. wg.Wait()
  289. close(ch)
  290. close(errCh)
  291. for s := range ch {
  292. resourceSpecs = append(resourceSpecs, s)
  293. }
  294. for e := range errCh {
  295. errs = append(errs, e)
  296. }
  297. if len(errs) == clustersNum {
  298. return nil, errors.New("get resources failed")
  299. }
  300. if len(errs) != 0 {
  301. var msg string
  302. for _, err := range errs {
  303. e := (err).(struct {
  304. err error
  305. clusterId string
  306. })
  307. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  308. }
  309. return nil, errors.New(msg)
  310. }
  311. return resourceSpecs, nil
  312. }
  313. func convertType(in interface{}) (*AiResult, error) {
  314. var result AiResult
  315. switch (in).(type) {
  316. case *hpcAC.SubmitTaskAiResp:
  317. resp := (in).(*hpcAC.SubmitTaskAiResp)
  318. if resp.Code == "0" {
  319. result.JobId = resp.Data
  320. } else {
  321. result.Msg = resp.Msg
  322. }
  323. return &result, nil
  324. case *octopus.CreateTrainJobResp:
  325. resp := (in).(*octopus.CreateTrainJobResp)
  326. if resp.Success {
  327. result.JobId = resp.Payload.JobId
  328. } else {
  329. result.Msg = resp.Error.Message
  330. }
  331. return &result, nil
  332. case *modelartsservice.CreateTrainingJobResp:
  333. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  334. if resp.ErrorMsg != "" {
  335. result.Msg = resp.ErrorMsg
  336. } else {
  337. result.JobId = resp.Metadata.Id
  338. }
  339. return &result, nil
  340. default:
  341. return nil, errors.New("ai task response failed")
  342. }
  343. }
  344. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  345. origJSON, err := json.Marshal(opt)
  346. if err != nil {
  347. return nil, err
  348. }
  349. clone := option.AiOption{}
  350. if err = json.Unmarshal(origJSON, &clone); err != nil {
  351. return nil, err
  352. }
  353. return &clone, nil
  354. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.