You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

octopus.go 23 kB


  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package storeLink
  13. import (
  14. "context"
  15. "errors"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  17. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  18. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  21. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  22. "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
  23. "math"
  24. "strconv"
  25. "strings"
  26. "time"
  27. )
  28. type OctopusLink struct {
  29. octopusRpc octopusclient.Octopus
  30. pageIndex int32
  31. pageSize int32
  32. platform string
  33. participantId int64
  34. }
  35. const (
  36. IMG_NAME_PREFIX = "oct_"
  37. IMG_VERSION_PREFIX = "version_"
  38. TASK_NAME_PREFIX = "trainJob"
  39. RESOURCE_POOL = "common-pool"
  40. HANWUJI = "hanwuji"
  41. SUIYUAN = "suiyuan"
  42. SAILINGSI = "sailingsi"
  43. MLU = "MLU"
  44. BIV100 = "BI-V100"
  45. CAMBRICONMLU290 = 256
  46. GCU = "GCU"
  47. ENFLAME = "enflame"
  48. EnflameT20 = 128
  49. BASE_TOPS = 128
  50. CAMBRICON = "cambricon"
  51. ILUVATAR = "iluvatar"
  52. TRAIN_CMD = "cd /code; python train.py"
  53. VERSION = "V1"
  54. DOMAIN = "http://192.168.242.41:8001/"
  55. CAMBRICON_CN = "寒武纪290"
  56. ENFLAME_CN = "燧原T20"
  57. ILUVATAR_CN = "天数BI-V100"
  58. )
  59. var (
  60. cardAliasMap = map[string]string{
  61. MLU: CAMBRICON,
  62. GCU: ENFLAME,
  63. BIV100: ILUVATAR,
  64. }
  65. cardCnMap = map[string]string{
  66. MLU: CAMBRICON_CN,
  67. GCU: ENFLAME_CN,
  68. BIV100: ILUVATAR_CN,
  69. }
  70. cardTopsMap = map[string]float64{
  71. MLU: CAMBRICONMLU290,
  72. GCU: EnflameT20,
  73. }
  74. )
  75. func NewOctopusLink(octopusRpc octopusclient.Octopus, name string, id int64) *OctopusLink {
  76. return &OctopusLink{octopusRpc: octopusRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
  77. }
  78. func (o *OctopusLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
  79. // octopus创建镜像
  80. createReq := &octopus.CreateImageReq{
  81. Platform: o.platform,
  82. CreateImage: &octopus.CreateImage{
  83. SourceType: 1,
  84. ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
  85. ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
  86. },
  87. }
  88. createResp, err := o.octopusRpc.CreateImage(ctx, createReq)
  89. if err != nil {
  90. return nil, err
  91. }
  92. // octopus上传镜像
  93. uploadReq := &octopus.UploadImageReq{
  94. Platform: o.platform,
  95. ImageId: createResp.Payload.ImageId,
  96. Params: &octopus.UploadImageParam{
  97. Domain: "",
  98. FileName: "",
  99. },
  100. }
  101. uploadResp, err := o.octopusRpc.UploadImage(ctx, uploadReq)
  102. if err != nil {
  103. return nil, err
  104. }
  105. // Todo 实际上传
  106. return uploadResp, nil
  107. }
  108. func (o *OctopusLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
  109. // octopus删除镜像
  110. req := &octopus.DeleteImageReq{
  111. Platform: o.platform,
  112. ImageId: imageId,
  113. }
  114. resp, err := o.octopusRpc.DeleteImage(ctx, req)
  115. if err != nil {
  116. return nil, err
  117. }
  118. return resp, nil
  119. }
  120. func (o *OctopusLink) QueryImageList(ctx context.Context) (interface{}, error) {
  121. // octopus获取镜像列表
  122. req := &octopus.GetUserImageListReq{
  123. Platform: o.platform,
  124. PageIndex: o.pageIndex,
  125. PageSize: o.pageSize,
  126. }
  127. resp, err := o.octopusRpc.GetUserImageList(ctx, req)
  128. if err != nil {
  129. return nil, err
  130. }
  131. return resp, nil
  132. }
  133. func (o *OctopusLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  134. // octopus提交任务
  135. // python参数
  136. var prms []*octopus.Parameters
  137. for _, param := range params {
  138. var p octopus.Parameters
  139. s := strings.Split(param, COMMA)
  140. p.Key = s[0]
  141. p.Value = s[1]
  142. prms = append(prms, &p)
  143. }
  144. //环境变量
  145. envMap := make(map[string]string)
  146. for _, env := range envs {
  147. s := strings.Split(env, COMMA)
  148. envMap[s[0]] = s[1]
  149. }
  150. req := &octopus.CreateTrainJobReq{
  151. Platform: o.platform,
  152. Params: &octopus.CreateTrainJobParam{
  153. ImageId: imageId,
  154. Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
  155. ResourcePool: RESOURCE_POOL,
  156. Config: []*octopus.Config{
  157. {
  158. Command: cmd,
  159. ResourceSpecId: resourceId,
  160. MinFailedTaskCount: 1,
  161. MinSucceededTaskCount: 1,
  162. TaskNumber: 1,
  163. Parameters: prms,
  164. Envs: envMap,
  165. },
  166. },
  167. DataSetId: datasetsId,
  168. DataSetVersion: VERSION,
  169. AlgorithmId: algorithmId,
  170. AlgorithmVersion: VERSION,
  171. },
  172. }
  173. resp, err := o.octopusRpc.CreateTrainJob(ctx, req)
  174. if err != nil {
  175. return nil, err
  176. }
  177. return resp, nil
  178. }
  179. func (o *OctopusLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
  180. // octopus获取任务
  181. req := &octopus.GetTrainJobReq{
  182. Platform: o.platform,
  183. Id: taskId,
  184. }
  185. resp, err := o.octopusRpc.GetTrainJob(ctx, req)
  186. if err != nil {
  187. return nil, err
  188. }
  189. return resp, nil
  190. }
  191. func (o *OctopusLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
  192. // octopus删除任务
  193. req := &octopus.DeleteTrainJobReq{
  194. Platform: o.platform,
  195. JobIds: []string{taskId},
  196. }
  197. resp, err := o.octopusRpc.DeleteTrainJob(ctx, req)
  198. if err != nil {
  199. return nil, err
  200. }
  201. return resp, nil
  202. }
  203. func (o *OctopusLink) QuerySpecs(ctx context.Context) (interface{}, error) {
  204. // octopus查询资源规格
  205. req := &octopus.GetResourceSpecsReq{
  206. Platform: o.platform,
  207. ResourcePool: RESOURCE_POOL,
  208. }
  209. resp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  210. if err != nil {
  211. return nil, err
  212. }
  213. return resp, nil
  214. }
  215. func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
  216. req := &octopus.GetResourceSpecsReq{
  217. Platform: o.platform,
  218. ResourcePool: RESOURCE_POOL,
  219. }
  220. specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  221. if err != nil {
  222. return nil, err
  223. }
  224. if !specResp.Success {
  225. return nil, errors.New(specResp.Error.Message)
  226. }
  227. balanceReq := &octopus.GetUserBalanceReq{
  228. Platform: o.platform,
  229. }
  230. balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
  231. if err != nil {
  232. return nil, err
  233. }
  234. if !balanceResp.Success {
  235. return nil, errors.New(balanceResp.Error.Message)
  236. }
  237. var cards []*collector.Card
  238. balance := float64(balanceResp.Payload.BillingUser.Amount)
  239. var cpuHours float64
  240. for _, spec := range specResp.TrainResourceSpecs {
  241. if spec.Price == 0 {
  242. ns := strings.Split(spec.Name, COMMA)
  243. if len(ns) == 2 {
  244. nss := strings.Split(ns[0], COLON)
  245. if nss[0] == CPU {
  246. cpuHours = -1
  247. }
  248. }
  249. }
  250. if spec.Price == 1 {
  251. ns := strings.Split(spec.Name, COMMA)
  252. cardSpecs := strings.Split(ns[0], STAR)
  253. cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]]
  254. if !isMapContainsKey {
  255. continue
  256. }
  257. card := &collector.Card{
  258. Platform: OCTOPUS,
  259. Type: CARD,
  260. Name: cardSpecs[1],
  261. TOpsAtFp16: cardTops,
  262. CardHours: balance / spec.Price,
  263. }
  264. cards = append(cards, card)
  265. }
  266. }
  267. resourceStats := &collector.ResourceStats{
  268. ClusterId: strconv.FormatInt(o.participantId, 10),
  269. Name: o.platform,
  270. Balance: balance,
  271. CardsAvail: cards,
  272. CpuCoreHours: cpuHours,
  273. }
  274. return resourceStats, nil
  275. }
  276. func (o *OctopusLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
  277. req := &octopus.GetMyDatasetListReq{
  278. Platform: o.platform,
  279. PageIndex: o.pageIndex,
  280. PageSize: o.pageSize,
  281. }
  282. resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
  283. if err != nil {
  284. return nil, err
  285. }
  286. if !resp.Success {
  287. return nil, errors.New(resp.Error.Message)
  288. }
  289. specs := []*collector.DatasetsSpecs{}
  290. for _, dataset := range resp.Payload.Datasets {
  291. spec := &collector.DatasetsSpecs{Name: dataset.Name}
  292. specs = append(specs, spec)
  293. }
  294. return specs, nil
  295. }
  296. func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
  297. var algorithms []*collector.Algorithm
  298. req := &octopus.GetMyAlgorithmListReq{
  299. Platform: o.platform,
  300. PageIndex: o.pageIndex,
  301. PageSize: o.pageSize,
  302. }
  303. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  304. if err != nil {
  305. return nil, err
  306. }
  307. if !resp.Success {
  308. return nil, errors.New("failed to get algorithms")
  309. }
  310. for _, a := range resp.Payload.Algorithms {
  311. algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)}
  312. algorithms = append(algorithms, algorithm)
  313. }
  314. return algorithms, nil
  315. }
  316. func (o *OctopusLink) GetComputeCards(ctx context.Context) ([]string, error) {
  317. var cards []string
  318. for s, _ := range cardAliasMap {
  319. cards = append(cards, s)
  320. }
  321. return cards, nil
  322. }
  323. func (o *OctopusLink) GetUserBalance(ctx context.Context) (float64, error) {
  324. balanceReq := &octopus.GetUserBalanceReq{
  325. Platform: o.platform,
  326. }
  327. balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
  328. if err != nil {
  329. return 0, err
  330. }
  331. if !balanceResp.Success {
  332. if balanceResp.Error != nil {
  333. return 0, errors.New(balanceResp.Error.Message)
  334. } else {
  335. return 0, errors.New("failed to get user balance")
  336. }
  337. }
  338. balance := float64(balanceResp.Payload.BillingUser.Amount)
  339. return balance, nil
  340. }
  341. func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
  342. var name string
  343. if resourceType == CARD {
  344. name = dataset + UNDERSCORE + algorithm + UNDERSCORE + card
  345. } else {
  346. name = dataset + UNDERSCORE + algorithm + UNDERSCORE + CPU
  347. }
  348. req := &octopus.GetMyAlgorithmListReq{
  349. Platform: o.platform,
  350. PageIndex: o.pageIndex,
  351. PageSize: o.pageSize,
  352. }
  353. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  354. if err != nil {
  355. return "", err
  356. }
  357. if !resp.Success {
  358. return "", errors.New("failed to get algorithmList")
  359. }
  360. var algorithmId string
  361. var algorithms []*octopus.Algorithms
  362. for _, a := range resp.Payload.Algorithms {
  363. if strings.ToLower(a.FrameworkName) != taskType {
  364. continue
  365. }
  366. if a.AlgorithmDescript == name {
  367. algorithms = append(algorithms, a)
  368. }
  369. }
  370. if len(algorithms) == 0 {
  371. return "", errors.New("algorithmId not found")
  372. }
  373. if len(algorithms) == 1 {
  374. algorithmId = algorithms[0].AlgorithmId
  375. }
  376. aLatest := &octopus.Algorithms{}
  377. for i, _ := range algorithms {
  378. if time.Unix(algorithms[i].CreatedAt, 0).After(time.Unix(aLatest.CreatedAt, 0)) {
  379. aLatest = algorithms[i]
  380. }
  381. }
  382. if aLatest.AlgorithmId == "" {
  383. return "", errors.New("algorithmId not found")
  384. }
  385. algorithmId = aLatest.AlgorithmId
  386. dcReq := &octopus.DownloadCompressReq{
  387. Platform: o.platform,
  388. Version: VERSION,
  389. AlgorithmId: algorithmId,
  390. }
  391. dcResp, err := o.octopusRpc.DownloadCompress(ctx, dcReq)
  392. if err != nil {
  393. return "", err
  394. }
  395. if !dcResp.Success {
  396. return "", errors.New(dcResp.Error.Message)
  397. }
  398. daReq := &octopus.DownloadAlgorithmReq{
  399. Platform: o.platform,
  400. Version: VERSION,
  401. AlgorithmId: algorithmId,
  402. CompressAt: dcResp.Payload.CompressAt,
  403. Domain: DOMAIN,
  404. }
  405. daResp, err := o.octopusRpc.DownloadAlgorithm(ctx, daReq)
  406. if err != nil {
  407. return "", err
  408. }
  409. if !daResp.Success {
  410. return "", errors.New(dcResp.Error.Message)
  411. }
  412. urlReq := &octopus.AlgorithmUrlReq{
  413. Platform: o.platform,
  414. Url: daResp.Payload.DownloadUrl,
  415. }
  416. urlResp, err := o.octopusRpc.DownloadAlgorithmUrl(ctx, urlReq)
  417. if err != nil {
  418. return "", err
  419. }
  420. return urlResp.Algorithm, nil
  421. }
  422. func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
  423. //var name string
  424. //if resourceType == CARD {
  425. // name = dataset + UNDERSCORE + algorithm + UNDERSCORE + card
  426. //} else {
  427. // name = dataset + UNDERSCORE + algorithm + UNDERSCORE + CPU
  428. //}
  429. //uploadReq := &octopus.UploadAlgorithmReq{}
  430. return nil
  431. }
  432. func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
  433. instance, err := strconv.ParseInt(instanceNum, 10, 32)
  434. if err != nil {
  435. return "", err
  436. }
  437. req := &octopus.GetTrainJobLogReq{
  438. Platform: o.platform,
  439. TaskId: taskId,
  440. TaskNum: "task0",
  441. Num: int32(instance),
  442. }
  443. resp, err := o.octopusRpc.GetTrainJobLog(ctx, req)
  444. if err != nil {
  445. return "", err
  446. }
  447. if strings.Contains(resp.Content, "404 Not Found") {
  448. resp.Content = "waiting for logs..."
  449. }
  450. return resp.Content, nil
  451. }
  452. func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
  453. resp, err := o.QueryTask(ctx, taskId)
  454. if err != nil {
  455. return nil, err
  456. }
  457. jobresp, ok := (resp).(*octopus.GetTrainJobResp)
  458. if !jobresp.Success || !ok {
  459. if jobresp.Error != nil {
  460. return nil, errors.New(jobresp.Error.Message)
  461. } else {
  462. return nil, errors.New("get training task failed, empty error returned")
  463. }
  464. }
  465. var task collector.Task
  466. task.Id = jobresp.Payload.TrainJob.Id
  467. if jobresp.Payload.TrainJob.StartedAt != 0 {
  468. task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
  469. }
  470. if jobresp.Payload.TrainJob.CompletedAt != 0 {
  471. task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
  472. }
  473. switch jobresp.Payload.TrainJob.Status {
  474. case "succeeded":
  475. task.Status = constants.Completed
  476. case "failed":
  477. task.Status = constants.Failed
  478. case "running":
  479. task.Status = constants.Running
  480. case "stopped":
  481. task.Status = constants.Stopped
  482. case "pending":
  483. task.Status = constants.Pending
  484. default:
  485. task.Status = "undefined"
  486. }
  487. return &task, nil
  488. }
  489. func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
  490. err := o.GenerateSubmitParams(ctx, option)
  491. if err != nil {
  492. return nil, err
  493. }
  494. task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  495. if err != nil {
  496. return nil, err
  497. }
  498. return task, nil
  499. }
  500. func (o *OctopusLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
  501. err := o.generateResourceId(ctx, option)
  502. if err != nil {
  503. return err
  504. }
  505. err = o.generateDatasetsId(ctx, option)
  506. if err != nil {
  507. return err
  508. }
  509. err = o.generateImageId(ctx, option)
  510. if err != nil {
  511. return err
  512. }
  513. err = o.generateAlgorithmId(ctx, option)
  514. if err != nil {
  515. return err
  516. }
  517. err = o.generateCmd(option)
  518. if err != nil {
  519. return err
  520. }
  521. err = o.generateEnv(option)
  522. if err != nil {
  523. return err
  524. }
  525. err = o.generateParams(option)
  526. if err != nil {
  527. return err
  528. }
  529. return nil
  530. }
  531. func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
  532. if option.ResourceType == "" {
  533. return errors.New("ResourceType not set")
  534. }
  535. req := &octopus.GetResourceSpecsReq{
  536. Platform: o.platform,
  537. ResourcePool: RESOURCE_POOL,
  538. }
  539. specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  540. if err != nil {
  541. return err
  542. }
  543. if !specResp.Success {
  544. return errors.New(specResp.Error.Message)
  545. }
  546. if option.ResourceType == CPU {
  547. for _, spec := range specResp.TrainResourceSpecs {
  548. if spec.Price == 0 {
  549. option.ResourceId = spec.Id
  550. return nil
  551. }
  552. }
  553. }
  554. if option.ResourceType == CARD {
  555. if option.ComputeCard == "" {
  556. option.ComputeCard = GCU
  557. }
  558. err = setResourceIdByCard(option, specResp, option.ComputeCard)
  559. if err != nil {
  560. return err
  561. }
  562. return nil
  563. }
  564. return errors.New("failed to get ResourceId")
  565. }
  566. func (o *OctopusLink) generateDatasetsId(ctx context.Context, option *option.AiOption) error {
  567. if option.DatasetsName == "" {
  568. return errors.New("DatasetsName not set")
  569. }
  570. req := &octopus.GetMyDatasetListReq{
  571. Platform: o.platform,
  572. PageIndex: o.pageIndex,
  573. PageSize: o.pageSize,
  574. }
  575. resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
  576. if err != nil {
  577. return err
  578. }
  579. if !resp.Success {
  580. return errors.New("failed to get DatasetsId")
  581. }
  582. for _, dataset := range resp.Payload.Datasets {
  583. if dataset.Name == option.DatasetsName {
  584. option.DatasetsId = dataset.Id
  585. return nil
  586. }
  587. }
  588. return errors.New("failed to get DatasetsId")
  589. }
  590. func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOption) error {
  591. if option.TaskType == "" {
  592. return errors.New("TaskType not set")
  593. }
  594. req := &octopus.GetUserImageListReq{
  595. Platform: o.platform,
  596. PageIndex: o.pageIndex,
  597. PageSize: o.pageSize,
  598. }
  599. resp, err := o.octopusRpc.GetUserImageList(ctx, req)
  600. if err != nil {
  601. return err
  602. }
  603. if !resp.Success {
  604. return errors.New("failed to get imageId")
  605. }
  606. if option.ResourceType == CPU {
  607. for _, img := range resp.Payload.Images {
  608. if img.Image.ImageName == "test-image" {
  609. option.ImageId = img.Image.Id
  610. return nil
  611. }
  612. }
  613. }
  614. preImgReq := &octopus.GetPresetImageListReq{
  615. Platform: o.platform,
  616. PageIndex: o.pageIndex,
  617. PageSize: o.pageSize,
  618. }
  619. preImgResp, err := o.octopusRpc.GetPresetImageList(ctx, preImgReq)
  620. if err != nil {
  621. return err
  622. }
  623. if !preImgResp.Success {
  624. return errors.New("failed to get PresetImages")
  625. }
  626. if option.ResourceType == CARD {
  627. for _, image := range preImgResp.Payload.Images {
  628. if strings.Contains(image.ImageName, cardAliasMap[strings.ToUpper(option.ComputeCard)]) {
  629. switch strings.ToUpper(option.ComputeCard) {
  630. case GCU:
  631. if strings.HasPrefix(image.ImageVersion, "t20_") {
  632. option.ImageId = image.Id
  633. return nil
  634. }
  635. case BIV100:
  636. if strings.HasPrefix(image.ImageVersion, "bi_") {
  637. option.ImageId = image.Id
  638. return nil
  639. }
  640. case MLU:
  641. option.ImageId = image.Id
  642. return nil
  643. }
  644. }
  645. }
  646. }
  647. return errors.New("failed to get ImageId")
  648. }
  649. func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
  650. req := &octopus.GetMyAlgorithmListReq{
  651. Platform: o.platform,
  652. PageIndex: o.pageIndex,
  653. PageSize: o.pageSize,
  654. }
  655. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  656. if err != nil {
  657. return err
  658. }
  659. if !resp.Success {
  660. return errors.New("failed to get algorithmId")
  661. }
  662. for _, algorithm := range resp.Payload.Algorithms {
  663. if algorithm.FrameworkName == strings.Title(option.TaskType) {
  664. ns := strings.Split(algorithm.AlgorithmName, UNDERSCORE)
  665. if ns[0] != option.DatasetsName {
  666. continue
  667. }
  668. if ns[1] != option.AlgorithmName {
  669. continue
  670. }
  671. switch option.ResourceType {
  672. case CPU:
  673. if ns[2] != CPU {
  674. continue
  675. }
  676. case CARD:
  677. if ns[2] != strings.ToLower(option.ComputeCard) {
  678. continue
  679. }
  680. }
  681. option.AlgorithmId = algorithm.AlgorithmId
  682. return nil
  683. }
  684. }
  685. if option.AlgorithmId == "" {
  686. return errors.New("Algorithm does not exist")
  687. }
  688. return errors.New("failed to get AlgorithmId")
  689. }
  690. func (o *OctopusLink) generateCmd(option *option.AiOption) error {
  691. if option.Cmd == "" {
  692. switch option.ComputeCard {
  693. case GCU:
  694. option.Cmd = "cd /code; python3 train.py"
  695. case MLU:
  696. option.Cmd = ". /torch/venv3/pytorch/bin/activate; cd /code; python train.py"
  697. default:
  698. option.Cmd = TRAIN_CMD
  699. }
  700. }
  701. return nil
  702. }
  703. func (o *OctopusLink) generateEnv(option *option.AiOption) error {
  704. return nil
  705. }
  706. func (o *OctopusLink) generateParams(option *option.AiOption) error {
  707. if len(option.Params) == 0 {
  708. epoch := "epoch" + COMMA + "1"
  709. option.Params = append(option.Params, epoch)
  710. }
  711. return nil
  712. }
  713. func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
  714. if option.Tops == 0 {
  715. for _, spec := range specs.TrainResourceSpecs {
  716. if spec.Price == 1 {
  717. ns := strings.Split(spec.Name, COMMA)
  718. cardSpecs := strings.Split(ns[0], STAR)
  719. if cardSpecs[1] == cardCnMap[strings.ToUpper(computeCard)] {
  720. option.ResourceId = spec.Id
  721. option.ComputeCard = computeCard
  722. return nil
  723. }
  724. } else {
  725. continue
  726. }
  727. }
  728. } else {
  729. cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
  730. for _, spec := range specs.TrainResourceSpecs {
  731. if option.Tops < BASE_TOPS {
  732. if spec.Price == 1 {
  733. ns := strings.Split(spec.Name, COMMA)
  734. cardSpecs := strings.Split(ns[0], STAR)
  735. if cardSpecs[1] == cardCnMap[strings.ToUpper(computeCard)] {
  736. option.ResourceId = spec.Id
  737. option.ComputeCard = computeCard
  738. return nil
  739. }
  740. } else {
  741. continue
  742. }
  743. } else {
  744. ns := strings.Split(spec.Name, COMMA)
  745. if len(ns) != 4 {
  746. continue
  747. }
  748. cardSpecs := strings.Split(ns[0], STAR)
  749. if cardSpecs[1] != cardCnMap[strings.ToUpper(computeCard)] {
  750. continue
  751. }
  752. s, err := strconv.ParseFloat(cardSpecs[0], 64)
  753. if err != nil {
  754. return err
  755. }
  756. switch computeCard {
  757. case GCU:
  758. option.ComputeCard = computeCard
  759. if cardNum == s { // 1, 4, 8
  760. option.ResourceId = spec.Id
  761. return nil
  762. }
  763. if 1 < cardNum && cardNum <= 4 && s == 4 {
  764. option.ResourceId = spec.Id
  765. return nil
  766. }
  767. if 4 < cardNum && s == 8 {
  768. option.ResourceId = spec.Id
  769. return nil
  770. }
  771. case MLU: // 1, 2, 4
  772. option.ComputeCard = computeCard
  773. if cardNum/2 == s {
  774. option.ResourceId = spec.Id
  775. return nil
  776. }
  777. if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
  778. option.ResourceId = spec.Id
  779. return nil
  780. }
  781. if 2 < cardNum/2 && s == 4 {
  782. option.ResourceId = spec.Id
  783. return nil
  784. }
  785. }
  786. }
  787. }
  788. }
  789. return errors.New("set ResourceId error")
  790. }
  791. func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) {
  792. req := &octopus.GetNotebookListReq{
  793. Platform: o.platform,
  794. PageIndex: o.pageIndex,
  795. PageSize: o.pageSize,
  796. }
  797. list, err := o.octopusRpc.GetNotebookList(ctx, req)
  798. if err != nil {
  799. return nil, err
  800. }
  801. var imageUrls []*inference.InferUrl
  802. for _, notebook := range list.Payload.GetNotebooks() {
  803. if strings.Contains(notebook.AlgorithmName, option.ModelName) && notebook.Status == "running" {
  804. url := strings.Replace(notebook.Tasks[0].Url, FORWARD_SLASH, "", -1)
  805. names := strings.Split(notebook.AlgorithmName, UNDERSCORE)
  806. imageUrl := &inference.InferUrl{
  807. Url: DOMAIN + url,
  808. Card: names[2],
  809. }
  810. imageUrls = append(imageUrls, imageUrl)
  811. } else {
  812. continue
  813. }
  814. }
  815. if len(imageUrls) == 0 {
  816. return nil, errors.New("no infer url available")
  817. }
  818. return imageUrls, nil
  819. }
  820. func (o *OctopusLink) GetInferDeployInstanceList(ctx context.Context, option *option.InferOption) ([]*inference.DeployInstance, error) {
  821. return nil, nil
  822. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.