You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

octopus.go 14 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package storeLink
  13. import (
  14. "context"
  15. "errors"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  17. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  18. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  20. "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
  21. "math"
  22. "strconv"
  23. "strings"
  24. )
  25. type OctopusLink struct {
  26. ctx context.Context
  27. svcCtx *svc.ServiceContext
  28. pageIndex int32
  29. pageSize int32
  30. platform string
  31. participantId int64
  32. }
  33. const (
  34. IMG_NAME_PREFIX = "oct_"
  35. IMG_VERSION_PREFIX = "version_"
  36. TASK_NAME_PREFIX = "trainJob"
  37. RESOURCE_POOL = "common-pool"
  38. HANWUJI = "hanwuji"
  39. SUIYUAN = "suiyuan"
  40. SAILINGSI = "sailingsi"
  41. MLU = "MLU"
  42. CAMBRICONMLU290 = 256
  43. GCU = "GCU"
  44. ENFLAME = "enflame"
  45. EnflameT20 = 128
  46. BASE_TOPS = 128
  47. CAMBRICON = "cambricon"
  48. TRAIN_CMD = "cd /code; python train.py"
  49. VERSION = "V1"
  50. )
  51. var (
  52. cardAliasMap = map[string]string{
  53. MLU: CAMBRICON,
  54. GCU: ENFLAME,
  55. }
  56. )
  57. func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
  58. return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
  59. }
  60. func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
  61. // octopus创建镜像
  62. createReq := &octopus.CreateImageReq{
  63. Platform: o.platform,
  64. CreateImage: &octopus.CreateImage{
  65. SourceType: 1,
  66. ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
  67. ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
  68. },
  69. }
  70. createResp, err := o.svcCtx.OctopusRpc.CreateImage(o.ctx, createReq)
  71. if err != nil {
  72. return nil, err
  73. }
  74. // octopus上传镜像
  75. uploadReq := &octopus.UploadImageReq{
  76. Platform: o.platform,
  77. ImageId: createResp.Payload.ImageId,
  78. Params: &octopus.UploadImageParam{
  79. Domain: "",
  80. FileName: "",
  81. },
  82. }
  83. uploadResp, err := o.svcCtx.OctopusRpc.UploadImage(o.ctx, uploadReq)
  84. if err != nil {
  85. return nil, err
  86. }
  87. // Todo 实际上传
  88. return uploadResp, nil
  89. }
  90. func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
  91. // octopus删除镜像
  92. req := &octopus.DeleteImageReq{
  93. Platform: o.platform,
  94. ImageId: imageId,
  95. }
  96. resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req)
  97. if err != nil {
  98. return nil, err
  99. }
  100. return resp, nil
  101. }
  102. func (o *OctopusLink) QueryImageList() (interface{}, error) {
  103. // octopus获取镜像列表
  104. req := &octopus.GetUserImageListReq{
  105. Platform: o.platform,
  106. PageIndex: o.pageIndex,
  107. PageSize: o.pageSize,
  108. }
  109. resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req)
  110. if err != nil {
  111. return nil, err
  112. }
  113. return resp, nil
  114. }
  115. func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  116. // octopus提交任务
  117. // python参数
  118. var prms []*octopus.Parameters
  119. for _, param := range params {
  120. var p octopus.Parameters
  121. s := strings.Split(param, COMMA)
  122. p.Key = s[0]
  123. p.Value = s[1]
  124. prms = append(prms, &p)
  125. }
  126. //环境变量
  127. envMap := make(map[string]string)
  128. for _, env := range envs {
  129. s := strings.Split(env, COMMA)
  130. envMap[s[0]] = s[1]
  131. }
  132. req := &octopus.CreateTrainJobReq{
  133. Platform: o.platform,
  134. Params: &octopus.CreateTrainJobParam{
  135. ImageId: imageId,
  136. Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
  137. ResourcePool: RESOURCE_POOL,
  138. Config: []*octopus.Config{
  139. {
  140. Command: cmd,
  141. ResourceSpecId: resourceId,
  142. MinFailedTaskCount: 1,
  143. MinSucceededTaskCount: 1,
  144. TaskNumber: 1,
  145. Parameters: prms,
  146. Envs: envMap,
  147. },
  148. },
  149. DataSetId: datasetsId,
  150. DataSetVersion: VERSION,
  151. AlgorithmId: algorithmId,
  152. AlgorithmVersion: VERSION,
  153. },
  154. }
  155. resp, err := o.svcCtx.OctopusRpc.CreateTrainJob(o.ctx, req)
  156. if err != nil {
  157. return nil, err
  158. }
  159. return resp, nil
  160. }
  161. func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
  162. // octopus获取任务
  163. req := &octopus.GetTrainJobReq{
  164. Platform: o.platform,
  165. Id: taskId,
  166. }
  167. resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req)
  168. if err != nil {
  169. return nil, err
  170. }
  171. return resp, nil
  172. }
  173. func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
  174. // octopus删除任务
  175. req := &octopus.DeleteTrainJobReq{
  176. Platform: o.platform,
  177. JobIds: []string{taskId},
  178. }
  179. resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req)
  180. if err != nil {
  181. return nil, err
  182. }
  183. return resp, nil
  184. }
  185. func (o *OctopusLink) QuerySpecs() (interface{}, error) {
  186. // octopus查询资源规格
  187. req := &octopus.GetResourceSpecsReq{
  188. Platform: o.platform,
  189. ResourcePool: RESOURCE_POOL,
  190. }
  191. resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
  192. if err != nil {
  193. return nil, err
  194. }
  195. return resp, nil
  196. }
  197. func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
  198. req := &octopus.GetResourceSpecsReq{
  199. Platform: o.platform,
  200. ResourcePool: RESOURCE_POOL,
  201. }
  202. specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
  203. if err != nil {
  204. return nil, err
  205. }
  206. if !specResp.Success {
  207. return nil, errors.New(specResp.Error.Message)
  208. }
  209. balanceReq := &octopus.GetUserBalanceReq{
  210. Platform: o.platform,
  211. }
  212. balanceResp, err := o.svcCtx.OctopusRpc.GetUserBalance(o.ctx, balanceReq)
  213. if err != nil {
  214. return nil, err
  215. }
  216. if !balanceResp.Success {
  217. return nil, errors.New(balanceResp.Error.Message)
  218. }
  219. //resourceStat := collector.ResourceStats{}
  220. //
  221. //for _, spec := range specResp.TrainResourceSpecs {
  222. //
  223. //}
  224. return nil, nil
  225. }
  226. func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
  227. req := &octopus.GetMyDatasetListReq{
  228. Platform: o.platform,
  229. PageIndex: o.pageIndex,
  230. PageSize: o.pageSize,
  231. }
  232. resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
  233. if err != nil {
  234. return nil, err
  235. }
  236. if !resp.Success {
  237. return nil, errors.New(resp.Error.Message)
  238. }
  239. specs := []*collector.DatasetsSpecs{}
  240. for _, dataset := range resp.Payload.Datasets {
  241. spec := &collector.DatasetsSpecs{Name: dataset.Name}
  242. specs = append(specs, spec)
  243. }
  244. return specs, nil
  245. }
  246. func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
  247. err := o.GenerateSubmitParams(option)
  248. if err != nil {
  249. return nil, err
  250. }
  251. task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  252. if err != nil {
  253. return nil, err
  254. }
  255. return task, nil
  256. }
  257. func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
  258. err := o.generateResourceId(option)
  259. if err != nil {
  260. return err
  261. }
  262. err = o.generateDatasetsId(option)
  263. if err != nil {
  264. return err
  265. }
  266. err = o.generateImageId(option)
  267. if err != nil {
  268. return err
  269. }
  270. err = o.generateAlgorithmId(option)
  271. if err != nil {
  272. return err
  273. }
  274. err = o.generateCmd(option)
  275. if err != nil {
  276. return err
  277. }
  278. err = o.generateEnv(option)
  279. if err != nil {
  280. return err
  281. }
  282. err = o.generateParams(option)
  283. if err != nil {
  284. return err
  285. }
  286. return nil
  287. }
  288. func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
  289. if option.ResourceType == "" {
  290. return errors.New("ResourceType not set")
  291. }
  292. req := &octopus.GetResourceSpecsReq{
  293. Platform: o.platform,
  294. ResourcePool: RESOURCE_POOL,
  295. }
  296. specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
  297. if err != nil {
  298. return err
  299. }
  300. if !specResp.Success {
  301. return errors.New(specResp.Error.Message)
  302. }
  303. if option.ResourceType == CPU {
  304. for _, spec := range specResp.TrainResourceSpecs {
  305. if spec.Price == 0 {
  306. option.ResourceId = spec.Id
  307. return nil
  308. }
  309. }
  310. }
  311. if option.ResourceType == CARD {
  312. err = setResourceIdByCard(option, specResp, GCU)
  313. if err != nil {
  314. return err
  315. }
  316. }
  317. return errors.New("failed to get ResourceId")
  318. }
  319. func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
  320. if option.DatasetsName == "" {
  321. return errors.New("DatasetsName not set")
  322. }
  323. req := &octopus.GetMyDatasetListReq{
  324. Platform: o.platform,
  325. PageIndex: o.pageIndex,
  326. PageSize: o.pageSize,
  327. }
  328. resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
  329. if err != nil {
  330. return err
  331. }
  332. if !resp.Success {
  333. return errors.New("failed to get DatasetsId")
  334. }
  335. for _, dataset := range resp.Payload.Datasets {
  336. if dataset.Name == option.DatasetsName {
  337. option.DatasetsId = dataset.Id
  338. return nil
  339. }
  340. }
  341. return errors.New("failed to get DatasetsId")
  342. }
  343. func (o *OctopusLink) generateImageId(option *option.AiOption) error {
  344. if option.TaskType == "" {
  345. return errors.New("TaskType not set")
  346. }
  347. req := &octopus.GetUserImageListReq{
  348. Platform: o.platform,
  349. PageIndex: o.pageIndex,
  350. PageSize: o.pageSize,
  351. }
  352. resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req)
  353. if err != nil {
  354. return err
  355. }
  356. if !resp.Success {
  357. return errors.New("failed to get imageId")
  358. }
  359. if option.ResourceType == CPU {
  360. for _, img := range resp.Payload.Images {
  361. if img.Image.ImageName == "test-image" {
  362. option.ImageId = img.Image.Id
  363. return nil
  364. }
  365. }
  366. }
  367. preImgReq := &octopus.GetPresetImageListReq{
  368. Platform: o.platform,
  369. PageIndex: o.pageIndex,
  370. PageSize: o.pageSize,
  371. }
  372. preImgResp, err := o.svcCtx.OctopusRpc.GetPresetImageList(o.ctx, preImgReq)
  373. if err != nil {
  374. return err
  375. }
  376. if !preImgResp.Success {
  377. return errors.New("failed to get PresetImages")
  378. }
  379. if option.ResourceType == CARD {
  380. for _, image := range preImgResp.Payload.Images {
  381. if strings.Contains(image.ImageName, cardAliasMap[option.ComputeCard]) {
  382. option.ImageId = image.Id
  383. return nil
  384. }
  385. }
  386. }
  387. return errors.New("failed to get ImageId")
  388. }
  389. func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
  390. // temporarily set algorithm to cnn
  391. option.AlgorithmName = "cnn"
  392. req := &octopus.GetMyAlgorithmListReq{
  393. Platform: o.platform,
  394. PageIndex: o.pageIndex,
  395. PageSize: o.pageSize,
  396. }
  397. resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req)
  398. if err != nil {
  399. return err
  400. }
  401. if !resp.Success {
  402. return errors.New("failed to get algorithmId")
  403. }
  404. for _, algorithm := range resp.Payload.Algorithms {
  405. if algorithm.FrameworkName == strings.Title(option.TaskType) {
  406. ns := strings.Split(algorithm.AlgorithmName, UNDERSCORE)
  407. if ns[0] != option.DatasetsName {
  408. continue
  409. }
  410. if ns[1] != option.AlgorithmName {
  411. continue
  412. }
  413. if ns[2] != option.ResourceType {
  414. continue
  415. }
  416. option.AlgorithmId = algorithm.AlgorithmId
  417. return nil
  418. }
  419. }
  420. return errors.New("failed to get AlgorithmId")
  421. }
  422. func (o *OctopusLink) generateCmd(option *option.AiOption) error {
  423. if option.Cmd == "" {
  424. switch option.ComputeCard {
  425. case GCU:
  426. option.Cmd = "cd /code; python3 train.py"
  427. default:
  428. option.Cmd = TRAIN_CMD
  429. }
  430. }
  431. return nil
  432. }
  433. func (o *OctopusLink) generateEnv(option *option.AiOption) error {
  434. return nil
  435. }
  436. func (o *OctopusLink) generateParams(option *option.AiOption) error {
  437. return nil
  438. }
  439. func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
  440. if option.Tops == 0 {
  441. for _, spec := range specs.TrainResourceSpecs {
  442. if spec.Price == 1 {
  443. ns := strings.Split(spec.Name, COMMA)
  444. cardSpecs := strings.Split(ns[0], STAR)
  445. if cardSpecs[1] == computeCard {
  446. option.ResourceId = spec.Id
  447. option.ComputeCard = computeCard
  448. return nil
  449. }
  450. } else {
  451. continue
  452. }
  453. }
  454. } else {
  455. cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
  456. for _, spec := range specs.TrainResourceSpecs {
  457. if option.Tops < BASE_TOPS {
  458. if spec.Price == 1 {
  459. ns := strings.Split(spec.Name, COMMA)
  460. cardSpecs := strings.Split(ns[0], STAR)
  461. if cardSpecs[1] == computeCard {
  462. option.ResourceId = spec.Id
  463. option.ComputeCard = computeCard
  464. return nil
  465. }
  466. } else {
  467. continue
  468. }
  469. } else {
  470. ns := strings.Split(spec.Name, COMMA)
  471. if len(ns) != 4 {
  472. continue
  473. }
  474. cardSpecs := strings.Split(ns[0], STAR)
  475. if cardSpecs[1] != computeCard {
  476. continue
  477. }
  478. s, err := strconv.ParseFloat(cardSpecs[0], 64)
  479. if err != nil {
  480. return err
  481. }
  482. switch computeCard {
  483. case GCU:
  484. if cardNum == s { // 1, 4, 8
  485. option.ResourceId = spec.Id
  486. option.ComputeCard = computeCard
  487. return nil
  488. }
  489. if 1 < cardNum && cardNum <= 4 && s == 4 {
  490. option.ResourceId = spec.Id
  491. option.ComputeCard = computeCard
  492. return nil
  493. }
  494. if 4 < cardNum && s == 8 {
  495. option.ResourceId = spec.Id
  496. option.ComputeCard = computeCard
  497. return nil
  498. }
  499. case MLU: // 1, 2, 4
  500. if cardNum/2 == s {
  501. option.ResourceId = spec.Id
  502. option.ComputeCard = computeCard
  503. return nil
  504. }
  505. if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
  506. option.ResourceId = spec.Id
  507. option.ComputeCard = computeCard
  508. return nil
  509. }
  510. if 2 < cardNum/2 && s == 4 {
  511. option.ResourceId = spec.Id
  512. option.ComputeCard = computeCard
  513. return nil
  514. }
  515. }
  516. }
  517. }
  518. }
  519. return errors.New("set ResourceId error")
  520. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.