You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiScheduler.go 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package schedulers
  13. import (
  14. "context"
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  23. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
  26. "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
  27. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  28. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  29. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  30. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  31. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  32. "sync"
  33. )
  34. type AiScheduler struct {
  35. yamlString string
  36. task *response.TaskInfo
  37. *scheduler.Scheduler
  38. option *option.AiOption
  39. ctx context.Context
  40. }
  41. type AiResult struct {
  42. AdapterId string
  43. TaskName string
  44. JobId string
  45. ClusterId string
  46. Strategy string
  47. Replica int32
  48. Card string
  49. Msg string
  50. }
  51. func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
  52. return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
  53. }
  54. func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
  55. ai := models.Ai{
  56. AdapterId: participantId,
  57. TaskId: task.TaskId,
  58. Status: "Saved",
  59. YamlString: as.yamlString,
  60. }
  61. utils.Convert(task.Metadata, &ai)
  62. return ai, nil
  63. }
  64. func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
  65. if as.option.ComputeCard != "" {
  66. m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
  67. if ok {
  68. for _, id := range as.option.ClusterIds {
  69. cm, ok := m[id]
  70. if ok {
  71. cards, err := cm.GetComputeCards(as.ctx)
  72. if err != nil {
  73. return nil, err
  74. }
  75. if common.Contains(cards, as.option.ComputeCard) {
  76. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
  77. }
  78. }
  79. }
  80. }
  81. }
  82. if len(as.option.ClusterIds) == 1 {
  83. return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
  84. }
  85. resources, err := as.findClustersWithResources()
  86. if err != nil {
  87. return nil, err
  88. }
  89. if len(resources) == 0 {
  90. return nil, errors.New("no cluster has resources")
  91. }
  92. if len(resources) == 1 {
  93. var cluster strategy.AssignedCluster
  94. cluster.ClusterId = resources[0].ClusterId
  95. cluster.Replicas = 1
  96. return &strategy.SingleAssignment{Cluster: &cluster}, nil
  97. }
  98. params := &param.Params{Resources: resources}
  99. switch as.option.StrategyName {
  100. case strategy.REPLICATION:
  101. var clusterIds []string
  102. for _, resource := range resources {
  103. if resource == nil {
  104. continue
  105. }
  106. clusterIds = append(clusterIds, resource.ClusterId)
  107. }
  108. strategy := strategy.NewReplicationStrategy(clusterIds, 1)
  109. return strategy, nil
  110. case strategy.RESOURCES_PRICING:
  111. strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
  112. return strategy, nil
  113. case strategy.DYNAMIC_RESOURCES:
  114. strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
  115. return strategy, nil
  116. case strategy.STATIC_WEIGHT:
  117. //todo resources should match cluster StaticWeightMap
  118. strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
  119. return strategy, nil
  120. case strategy.RANDOM:
  121. strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
  122. return strategy, nil
  123. }
  124. return nil, errors.New("no strategy has been chosen")
  125. }
  126. func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
  127. if clusters == nil {
  128. return nil, errors.New("clusters is nil")
  129. }
  130. for i := len(clusters) - 1; i >= 0; i-- {
  131. if clusters[i].Replicas == 0 {
  132. clusters = append(clusters[:i], clusters[i+1:]...)
  133. }
  134. }
  135. if len(clusters) == 0 {
  136. return nil, errors.New("clusters is nil")
  137. }
  138. var wg sync.WaitGroup
  139. var results []*AiResult
  140. var mu sync.Mutex
  141. var errs []interface{}
  142. var taskNum int32
  143. for _, cluster := range clusters {
  144. taskNum += cluster.Replicas
  145. }
  146. var ch = make(chan *AiResult, taskNum)
  147. var errCh = make(chan interface{}, taskNum)
  148. executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
  149. for _, cluster := range clusters {
  150. c := cluster
  151. for i := 0; i < int(c.Replicas); i++ {
  152. wg.Add(1)
  153. go func() {
  154. opt, _ := cloneAiOption(as.option)
  155. resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
  156. if err != nil {
  157. e := struct {
  158. err error
  159. clusterId string
  160. }{
  161. err: err,
  162. clusterId: c.ClusterId,
  163. }
  164. errCh <- e
  165. wg.Done()
  166. return
  167. }
  168. result := &AiResult{}
  169. mu.Lock()
  170. result, _ = convertType(resp)
  171. mu.Unlock()
  172. result.AdapterId = opt.AdapterId
  173. result.TaskName = opt.TaskName
  174. result.Replica = c.Replicas
  175. result.ClusterId = c.ClusterId
  176. result.Strategy = as.option.StrategyName
  177. result.Card = opt.ComputeCard
  178. ch <- result
  179. wg.Done()
  180. }()
  181. }
  182. }
  183. wg.Wait()
  184. close(ch)
  185. close(errCh)
  186. for e := range errCh {
  187. errs = append(errs, e)
  188. }
  189. for s := range ch {
  190. results = append(results, s)
  191. }
  192. if len(errs) != 0 {
  193. var synergystatus int64
  194. if len(clusters) > 1 {
  195. synergystatus = 1
  196. }
  197. strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName)
  198. taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus)
  199. if err != nil {
  200. return nil, errors.New("database add failed: " + err.Error())
  201. }
  202. adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
  203. if err != nil {
  204. return nil, err
  205. }
  206. var errmsg string
  207. for _, err := range errs {
  208. e := (err).(struct {
  209. err error
  210. clusterId string
  211. })
  212. msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  213. errmsg += msg
  214. clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
  215. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
  216. if err != nil {
  217. return nil, errors.New("database add failed: " + err.Error())
  218. }
  219. }
  220. for _, s := range results {
  221. as.option.ComputeCard = s.Card //execute card
  222. clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
  223. if s.Msg != "" {
  224. msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
  225. errmsg += msg
  226. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
  227. if err != nil {
  228. return nil, errors.New("database add failed: " + err.Error())
  229. }
  230. } else {
  231. msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
  232. errmsg += msg
  233. err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
  234. if err != nil {
  235. return nil, errors.New("database add failed: " + err.Error())
  236. }
  237. }
  238. }
  239. logx.Errorf(errors.New(errmsg).Error())
  240. return nil, errors.New(errmsg)
  241. }
  242. return results, nil
  243. }
  244. func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
  245. var wg sync.WaitGroup
  246. var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
  247. var ch = make(chan *collector.ResourceStats, clustersNum)
  248. var errCh = make(chan interface{}, clustersNum)
  249. var resourceSpecs []*collector.ResourceStats
  250. var errs []interface{}
  251. for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
  252. wg.Add(1)
  253. rc := resourceCollector
  254. id := s
  255. go func() {
  256. spec, err := rc.GetResourceStats(as.ctx)
  257. if err != nil {
  258. e := struct {
  259. err error
  260. clusterId string
  261. }{
  262. err: err,
  263. clusterId: id,
  264. }
  265. errCh <- e
  266. wg.Done()
  267. return
  268. }
  269. ch <- spec
  270. wg.Done()
  271. }()
  272. }
  273. wg.Wait()
  274. close(ch)
  275. close(errCh)
  276. for s := range ch {
  277. resourceSpecs = append(resourceSpecs, s)
  278. }
  279. for e := range errCh {
  280. errs = append(errs, e)
  281. }
  282. if len(errs) == clustersNum {
  283. return nil, errors.New("get resources failed")
  284. }
  285. if len(errs) != 0 {
  286. var msg string
  287. for _, err := range errs {
  288. e := (err).(struct {
  289. err error
  290. clusterId string
  291. })
  292. msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
  293. }
  294. return nil, errors.New(msg)
  295. }
  296. return resourceSpecs, nil
  297. }
  298. func convertType(in interface{}) (*AiResult, error) {
  299. var result AiResult
  300. switch (in).(type) {
  301. case *hpcAC.SubmitTaskAiResp:
  302. resp := (in).(*hpcAC.SubmitTaskAiResp)
  303. if resp.Code == "0" {
  304. result.JobId = resp.Data
  305. } else {
  306. result.Msg = resp.Msg
  307. }
  308. return &result, nil
  309. case *octopus.CreateTrainJobResp:
  310. resp := (in).(*octopus.CreateTrainJobResp)
  311. if resp.Success {
  312. result.JobId = resp.Payload.JobId
  313. } else {
  314. result.Msg = resp.Error.Message
  315. }
  316. return &result, nil
  317. case *modelartsservice.CreateTrainingJobResp:
  318. resp := (in).(*modelartsservice.CreateTrainingJobResp)
  319. if resp.ErrorMsg != "" {
  320. result.Msg = resp.ErrorMsg
  321. } else {
  322. result.JobId = resp.Metadata.Id
  323. }
  324. return &result, nil
  325. default:
  326. return nil, errors.New("ai task response failed")
  327. }
  328. }
  329. func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
  330. origJSON, err := json.Marshal(opt)
  331. if err != nil {
  332. return nil, err
  333. }
  334. clone := option.AiOption{}
  335. if err = json.Unmarshal(origJSON, &clone); err != nil {
  336. return nil, err
  337. }
  338. return &clone, nil
  339. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.