You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

octopus.go 32 kB

11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package storeLink
  13. import (
  14. "bufio"
  15. "context"
  16. "errors"
  17. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  18. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  19. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  20. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  22. "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
  23. "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
  24. "io"
  25. "math"
  26. "mime/multipart"
  27. "strconv"
  28. "strings"
  29. "time"
  30. )
  31. type OctopusLink struct {
  32. octopusRpc octopusclient.Octopus
  33. pageIndex int32
  34. pageSize int32
  35. platform string
  36. participantId int64
  37. }
  38. const (
  39. IMG_NAME_PREFIX = "oct_"
  40. IMG_VERSION_PREFIX = "version_"
  41. TASK_NAME_PREFIX = "trainJob"
  42. RESOURCE_POOL = "common-pool"
  43. HANWUJI = "hanwuji"
  44. SUIYUAN = "suiyuan"
  45. SAILINGSI = "sailingsi"
  46. MLU = "MLU"
  47. BIV100 = "BI-V100"
  48. CAMBRICONMLU290 = 256
  49. GCU = "GCU"
  50. ENFLAME = "enflame"
  51. EnflameT20 = 128
  52. BASE_TOPS = 128
  53. CAMBRICON = "cambricon"
  54. ILUVATAR = "iluvatar"
  55. TRAIN_CMD = "cd /code; python train.py"
  56. VERSION = "V1"
  57. DOMAIN = "http://192.168.242.41:8001/"
  58. CAMBRICON_CN = "寒武纪290"
  59. ENFLAME_CN = "燧原T20"
  60. ILUVATAR_CN = "天数BI-V100"
  61. )
  62. var (
  63. cardAliasMap = map[string]string{
  64. MLU: CAMBRICON,
  65. GCU: ENFLAME,
  66. BIV100: ILUVATAR,
  67. }
  68. cardCnMap = map[string]string{
  69. MLU: CAMBRICON_CN,
  70. GCU: ENFLAME_CN,
  71. BIV100: ILUVATAR_CN,
  72. }
  73. cardTopsMap = map[string]float64{
  74. MLU: CAMBRICONMLU290,
  75. GCU: EnflameT20,
  76. }
  77. CardModelNameCmdMap = map[string]map[string]string{
  78. BIV100: {"blip-image-captioning-base": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code; python infer_biv100.py",
  79. "imagenet_resnet50": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_biv100.py",
  80. "ChatGLM-6B": "su root; pip install transformers==4.33.2; pip install fastapi uvicorn[standard]; cd /code; python infer_biv100.py"},
  81. MLU: {"blip-image-captioning-base": "",
  82. "imagenet_resnet50": "su root; . /torch/venv3/pytorch/bin/activate; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_mlu.py"},
  83. }
  84. )
  85. func NewOctopusLink(octopusRpc octopusclient.Octopus, name string, id int64) *OctopusLink {
  86. return &OctopusLink{octopusRpc: octopusRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
  87. }
  88. func (o *OctopusLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
  89. // octopus创建镜像
  90. createReq := &octopus.CreateImageReq{
  91. Platform: o.platform,
  92. CreateImage: &octopus.CreateImage{
  93. SourceType: 1,
  94. ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
  95. ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
  96. },
  97. }
  98. createResp, err := o.octopusRpc.CreateImage(ctx, createReq)
  99. if err != nil {
  100. return nil, err
  101. }
  102. // octopus上传镜像
  103. uploadReq := &octopus.UploadImageReq{
  104. Platform: o.platform,
  105. ImageId: createResp.Payload.ImageId,
  106. Params: &octopus.UploadImageParam{
  107. Domain: "",
  108. FileName: "",
  109. },
  110. }
  111. uploadResp, err := o.octopusRpc.UploadImage(ctx, uploadReq)
  112. if err != nil {
  113. return nil, err
  114. }
  115. // Todo 实际上传
  116. return uploadResp, nil
  117. }
  118. func (o *OctopusLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
  119. // octopus删除镜像
  120. req := &octopus.DeleteImageReq{
  121. Platform: o.platform,
  122. ImageId: imageId,
  123. }
  124. resp, err := o.octopusRpc.DeleteImage(ctx, req)
  125. if err != nil {
  126. return nil, err
  127. }
  128. return resp, nil
  129. }
  130. func (o *OctopusLink) QueryImageList(ctx context.Context) (interface{}, error) {
  131. // octopus获取镜像列表
  132. req := &octopus.GetUserImageListReq{
  133. Platform: o.platform,
  134. PageIndex: o.pageIndex,
  135. PageSize: o.pageSize,
  136. }
  137. resp, err := o.octopusRpc.GetUserImageList(ctx, req)
  138. if err != nil {
  139. return nil, err
  140. }
  141. return resp, nil
  142. }
  143. func (o *OctopusLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  144. // octopus提交任务
  145. // python参数
  146. var prms []*octopus.Parameters
  147. for _, param := range params {
  148. var p octopus.Parameters
  149. s := strings.Split(param, COMMA)
  150. p.Key = s[0]
  151. p.Value = s[1]
  152. prms = append(prms, &p)
  153. }
  154. //环境变量
  155. envMap := make(map[string]string)
  156. for _, env := range envs {
  157. s := strings.Split(env, COMMA)
  158. envMap[s[0]] = s[1]
  159. }
  160. req := &octopus.CreateTrainJobReq{
  161. Platform: o.platform,
  162. Params: &octopus.CreateTrainJobParam{
  163. ImageId: imageId,
  164. Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
  165. ResourcePool: RESOURCE_POOL,
  166. Config: []*octopus.Config{
  167. {
  168. Command: cmd,
  169. ResourceSpecId: resourceId,
  170. MinFailedTaskCount: 1,
  171. MinSucceededTaskCount: 1,
  172. TaskNumber: 1,
  173. Parameters: prms,
  174. Envs: envMap,
  175. },
  176. },
  177. DataSetId: datasetsId,
  178. DataSetVersion: VERSION,
  179. AlgorithmId: algorithmId,
  180. AlgorithmVersion: VERSION,
  181. },
  182. }
  183. resp, err := o.octopusRpc.CreateTrainJob(ctx, req)
  184. if err != nil {
  185. return nil, err
  186. }
  187. return resp, nil
  188. }
  189. func (o *OctopusLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
  190. // octopus获取任务
  191. req := &octopus.GetTrainJobReq{
  192. Platform: o.platform,
  193. Id: taskId,
  194. }
  195. resp, err := o.octopusRpc.GetTrainJob(ctx, req)
  196. if err != nil {
  197. return nil, err
  198. }
  199. return resp, nil
  200. }
  201. func (o *OctopusLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
  202. // octopus删除任务
  203. req := &octopus.DeleteTrainJobReq{
  204. Platform: o.platform,
  205. JobIds: []string{taskId},
  206. }
  207. resp, err := o.octopusRpc.DeleteTrainJob(ctx, req)
  208. if err != nil {
  209. return nil, err
  210. }
  211. return resp, nil
  212. }
  213. func (o *OctopusLink) QuerySpecs(ctx context.Context) (interface{}, error) {
  214. // octopus查询资源规格
  215. req := &octopus.GetResourceSpecsReq{
  216. Platform: o.platform,
  217. ResourcePool: RESOURCE_POOL,
  218. }
  219. resp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return resp, nil
  224. }
  225. func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
  226. req := &octopus.GetResourceSpecsReq{
  227. Platform: o.platform,
  228. ResourcePool: RESOURCE_POOL,
  229. }
  230. specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  231. if err != nil {
  232. return nil, err
  233. }
  234. if !specResp.Success {
  235. return nil, errors.New(specResp.Error.Message)
  236. }
  237. balanceReq := &octopus.GetUserBalanceReq{
  238. Platform: o.platform,
  239. }
  240. balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
  241. if err != nil {
  242. return nil, err
  243. }
  244. if !balanceResp.Success {
  245. return nil, errors.New(balanceResp.Error.Message)
  246. }
  247. var cards []*collector.Card
  248. balance := float64(balanceResp.Payload.BillingUser.Amount)
  249. var cpuHours float64
  250. for _, spec := range specResp.TrainResourceSpecs {
  251. if spec.Price == 0 {
  252. ns := strings.Split(spec.Name, COMMA)
  253. if len(ns) == 2 {
  254. nss := strings.Split(ns[0], COLON)
  255. if nss[0] == CPU {
  256. cpuHours = -1
  257. }
  258. }
  259. }
  260. if spec.Price == 1 {
  261. ns := strings.Split(spec.Name, COMMA)
  262. cardSpecs := strings.Split(ns[0], STAR)
  263. cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]]
  264. if !isMapContainsKey {
  265. continue
  266. }
  267. card := &collector.Card{
  268. Platform: OCTOPUS,
  269. Type: CARD,
  270. Name: cardSpecs[1],
  271. TOpsAtFp16: cardTops,
  272. CardHours: balance / spec.Price,
  273. }
  274. cards = append(cards, card)
  275. }
  276. }
  277. resourceStats := &collector.ResourceStats{
  278. ClusterId: strconv.FormatInt(o.participantId, 10),
  279. Name: o.platform,
  280. Balance: balance,
  281. CardsAvail: cards,
  282. CpuCoreHours: cpuHours,
  283. }
  284. return resourceStats, nil
  285. }
  286. func (o *OctopusLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
  287. req := &octopus.GetMyDatasetListReq{
  288. Platform: o.platform,
  289. PageIndex: o.pageIndex,
  290. PageSize: o.pageSize,
  291. }
  292. resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
  293. if err != nil {
  294. return nil, err
  295. }
  296. if !resp.Success {
  297. return nil, errors.New(resp.Error.Message)
  298. }
  299. specs := []*collector.DatasetsSpecs{}
  300. for _, dataset := range resp.Payload.Datasets {
  301. spec := &collector.DatasetsSpecs{Name: dataset.Name}
  302. specs = append(specs, spec)
  303. }
  304. return specs, nil
  305. }
  306. func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
  307. var algorithms []*collector.Algorithm
  308. req := &octopus.GetMyAlgorithmListReq{
  309. Platform: o.platform,
  310. PageIndex: o.pageIndex,
  311. PageSize: o.pageSize,
  312. }
  313. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  314. if err != nil {
  315. return nil, err
  316. }
  317. if !resp.Success {
  318. return nil, errors.New("failed to get algorithms")
  319. }
  320. for _, a := range resp.Payload.Algorithms {
  321. algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)}
  322. algorithms = append(algorithms, algorithm)
  323. }
  324. return algorithms, nil
  325. }
  326. func (o *OctopusLink) GetComputeCards(ctx context.Context) ([]string, error) {
  327. var cards []string
  328. for s, _ := range cardAliasMap {
  329. cards = append(cards, s)
  330. }
  331. return cards, nil
  332. }
  333. func (o *OctopusLink) GetUserBalance(ctx context.Context) (float64, error) {
  334. balanceReq := &octopus.GetUserBalanceReq{
  335. Platform: o.platform,
  336. }
  337. balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
  338. if err != nil {
  339. return 0, err
  340. }
  341. if !balanceResp.Success {
  342. if balanceResp.Error != nil {
  343. return 0, errors.New(balanceResp.Error.Message)
  344. } else {
  345. return 0, errors.New("failed to get user balance")
  346. }
  347. }
  348. balance := float64(balanceResp.Payload.BillingUser.Amount)
  349. return balance, nil
  350. }
  351. func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
  352. var name string
  353. if resourceType == CARD {
  354. name = dataset + UNDERSCORE + algorithm + UNDERSCORE + card
  355. } else {
  356. name = dataset + UNDERSCORE + algorithm + UNDERSCORE + CPU
  357. }
  358. req := &octopus.GetMyAlgorithmListReq{
  359. Platform: o.platform,
  360. PageIndex: o.pageIndex,
  361. PageSize: o.pageSize,
  362. }
  363. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  364. if err != nil {
  365. return "", err
  366. }
  367. if !resp.Success {
  368. return "", errors.New("failed to get algorithmList")
  369. }
  370. var algorithmId string
  371. var algorithms []*octopus.Algorithms
  372. for _, a := range resp.Payload.Algorithms {
  373. if strings.ToLower(a.FrameworkName) != taskType {
  374. continue
  375. }
  376. if a.AlgorithmDescript == name {
  377. algorithms = append(algorithms, a)
  378. }
  379. }
  380. if len(algorithms) == 0 {
  381. return "", errors.New("algorithmId not found")
  382. }
  383. if len(algorithms) == 1 {
  384. algorithmId = algorithms[0].AlgorithmId
  385. }
  386. aLatest := &octopus.Algorithms{}
  387. for i, _ := range algorithms {
  388. if time.Unix(algorithms[i].CreatedAt, 0).After(time.Unix(aLatest.CreatedAt, 0)) {
  389. aLatest = algorithms[i]
  390. }
  391. }
  392. if aLatest.AlgorithmId == "" {
  393. return "", errors.New("algorithmId not found")
  394. }
  395. algorithmId = aLatest.AlgorithmId
  396. dcReq := &octopus.DownloadCompressReq{
  397. Platform: o.platform,
  398. Version: VERSION,
  399. AlgorithmId: algorithmId,
  400. }
  401. dcResp, err := o.octopusRpc.DownloadCompress(ctx, dcReq)
  402. if err != nil {
  403. return "", err
  404. }
  405. if !dcResp.Success {
  406. return "", errors.New(dcResp.Error.Message)
  407. }
  408. daReq := &octopus.DownloadAlgorithmReq{
  409. Platform: o.platform,
  410. Version: VERSION,
  411. AlgorithmId: algorithmId,
  412. CompressAt: dcResp.Payload.CompressAt,
  413. Domain: DOMAIN,
  414. }
  415. daResp, err := o.octopusRpc.DownloadAlgorithm(ctx, daReq)
  416. if err != nil {
  417. return "", err
  418. }
  419. if !daResp.Success {
  420. return "", errors.New(dcResp.Error.Message)
  421. }
  422. urlReq := &octopus.AlgorithmUrlReq{
  423. Platform: o.platform,
  424. Url: daResp.Payload.DownloadUrl,
  425. }
  426. urlResp, err := o.octopusRpc.DownloadAlgorithmUrl(ctx, urlReq)
  427. if err != nil {
  428. return "", err
  429. }
  430. return urlResp.Algorithm, nil
  431. }
  432. func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
  433. //var name string
  434. //if resourceType == CARD {
  435. // name = dataset + UNDERSCORE + algorithm + UNDERSCORE + card
  436. //} else {
  437. // name = dataset + UNDERSCORE + algorithm + UNDERSCORE + CPU
  438. //}
  439. //uploadReq := &octopus.UploadAlgorithmReq{}
  440. return nil
  441. }
  442. func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
  443. instance, err := strconv.ParseInt(instanceNum, 10, 32)
  444. if err != nil {
  445. return "", err
  446. }
  447. req := &octopus.GetTrainJobLogReq{
  448. Platform: o.platform,
  449. TaskId: taskId,
  450. TaskNum: "task0",
  451. Num: int32(instance),
  452. }
  453. resp, err := o.octopusRpc.GetTrainJobLog(ctx, req)
  454. if err != nil {
  455. return "", err
  456. }
  457. if strings.Contains(resp.Content, "404 Not Found") {
  458. resp.Content = "waiting for logs..."
  459. }
  460. return resp.Content, nil
  461. }
  462. func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
  463. resp, err := o.QueryTask(ctx, taskId)
  464. if err != nil {
  465. return nil, err
  466. }
  467. jobresp, ok := (resp).(*octopus.GetTrainJobResp)
  468. if !jobresp.Success || !ok {
  469. if jobresp.Error != nil {
  470. return nil, errors.New(jobresp.Error.Message)
  471. } else {
  472. return nil, errors.New("get training task failed, empty error returned")
  473. }
  474. }
  475. var task collector.Task
  476. task.Id = jobresp.Payload.TrainJob.Id
  477. if jobresp.Payload.TrainJob.StartedAt != 0 {
  478. task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
  479. }
  480. if jobresp.Payload.TrainJob.CompletedAt != 0 {
  481. task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
  482. }
  483. switch jobresp.Payload.TrainJob.Status {
  484. case "succeeded":
  485. task.Status = constants.Completed
  486. case "failed":
  487. task.Status = constants.Failed
  488. case "running":
  489. task.Status = constants.Running
  490. case "stopped":
  491. task.Status = constants.Stopped
  492. case "pending":
  493. task.Status = constants.Pending
  494. default:
  495. task.Status = "undefined"
  496. }
  497. return &task, nil
  498. }
  499. func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
  500. err := o.GenerateSubmitParams(ctx, option)
  501. if err != nil {
  502. return nil, err
  503. }
  504. task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  505. if err != nil {
  506. return nil, err
  507. }
  508. return task, nil
  509. }
  510. func (o *OctopusLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
  511. err := o.generateResourceId(ctx, option, nil)
  512. if err != nil {
  513. return err
  514. }
  515. err = o.generateDatasetsId(ctx, option)
  516. if err != nil {
  517. return err
  518. }
  519. err = o.generateImageId(ctx, option, nil)
  520. if err != nil {
  521. return err
  522. }
  523. err = o.generateAlgorithmId(ctx, option, nil)
  524. if err != nil {
  525. return err
  526. }
  527. err = o.generateCmd(option, nil)
  528. if err != nil {
  529. return err
  530. }
  531. err = o.generateEnv(option)
  532. if err != nil {
  533. return err
  534. }
  535. err = o.generateParams(option)
  536. if err != nil {
  537. return err
  538. }
  539. return nil
  540. }
  541. func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error {
  542. req := &octopus.GetResourceSpecsReq{
  543. Platform: o.platform,
  544. ResourcePool: RESOURCE_POOL,
  545. }
  546. specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
  547. if err != nil {
  548. return err
  549. }
  550. if !specResp.Success {
  551. return errors.New(specResp.Error.Message)
  552. }
  553. if option != nil {
  554. err = generateResourceIdForTraining(option, specResp)
  555. if err != nil {
  556. return err
  557. }
  558. return nil
  559. }
  560. if ifoption != nil {
  561. err = generateResourceIdForInferDeployInstance(ifoption, specResp)
  562. if err != nil {
  563. return err
  564. }
  565. return nil
  566. }
  567. return errors.New("failed to set ResourceId")
  568. }
  569. func generateResourceIdForTraining(option *option.AiOption, specResp *octopus.GetResourceSpecsResp) error {
  570. if option.ResourceType == "" {
  571. return errors.New("ResourceType not set")
  572. }
  573. if option.ResourceType == CPU {
  574. for _, spec := range specResp.TrainResourceSpecs {
  575. if spec.Price == 0 {
  576. option.ResourceId = spec.Id
  577. return nil
  578. }
  579. }
  580. }
  581. if option.ResourceType == CARD {
  582. if option.ComputeCard == "" {
  583. option.ComputeCard = GCU
  584. }
  585. err := setResourceIdByCard(option, specResp, option.ComputeCard)
  586. if err != nil {
  587. return err
  588. }
  589. return nil
  590. }
  591. return errors.New("ResourceType not set")
  592. }
  593. func generateResourceIdForInferDeployInstance(option *option.InferOption, specResp *octopus.GetResourceSpecsResp) error {
  594. // temporarily use bi-v100
  595. cardName, ok := cardCnMap[BIV100]
  596. if !ok {
  597. errors.New("computeCard not set")
  598. }
  599. // set computeCard
  600. option.ComputeCard = BIV100
  601. for _, spec := range specResp.TrainResourceSpecs {
  602. names := strings.Split(spec.Name, COMMA)
  603. if len(names) != 4 {
  604. continue
  605. }
  606. ns := strings.Split(names[0], STAR)
  607. if len(ns) != 2 {
  608. continue
  609. }
  610. if ns[0] == "1" && ns[1] == cardName {
  611. option.ResourceId = spec.Id
  612. return nil
  613. }
  614. }
  615. return errors.New("failed to set ResourceId")
  616. }
  617. func (o *OctopusLink) generateDatasetsId(ctx context.Context, option *option.AiOption) error {
  618. if option.DatasetsName == "" {
  619. return errors.New("DatasetsName not set")
  620. }
  621. req := &octopus.GetMyDatasetListReq{
  622. Platform: o.platform,
  623. PageIndex: o.pageIndex,
  624. PageSize: o.pageSize,
  625. }
  626. resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
  627. if err != nil {
  628. return err
  629. }
  630. if !resp.Success {
  631. return errors.New("failed to get DatasetsId")
  632. }
  633. for _, dataset := range resp.Payload.Datasets {
  634. if dataset.Name == option.DatasetsName {
  635. option.DatasetsId = dataset.Id
  636. return nil
  637. }
  638. }
  639. return errors.New("failed to get DatasetsId")
  640. }
  641. func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error {
  642. preImgReq := &octopus.GetPresetImageListReq{
  643. Platform: o.platform,
  644. PageIndex: o.pageIndex,
  645. PageSize: o.pageSize,
  646. }
  647. preImgResp, err := o.octopusRpc.GetPresetImageList(ctx, preImgReq)
  648. if err != nil {
  649. return err
  650. }
  651. if !preImgResp.Success {
  652. return errors.New("failed to get PresetImages")
  653. }
  654. if option != nil {
  655. if option.TaskType == "" {
  656. return errors.New("TaskType not set")
  657. }
  658. req := &octopus.GetUserImageListReq{
  659. Platform: o.platform,
  660. PageIndex: o.pageIndex,
  661. PageSize: o.pageSize,
  662. }
  663. resp, err := o.octopusRpc.GetUserImageList(ctx, req)
  664. if err != nil {
  665. return err
  666. }
  667. if !resp.Success {
  668. return errors.New("failed to get imageId")
  669. }
  670. if option.ResourceType == CPU {
  671. for _, img := range resp.Payload.Images {
  672. if img.Image.ImageName == "test-image" {
  673. option.ImageId = img.Image.Id
  674. return nil
  675. }
  676. }
  677. }
  678. err = generateImageIdForTraining(option, preImgResp)
  679. if err != nil {
  680. return err
  681. }
  682. return nil
  683. }
  684. if ifoption != nil {
  685. err = generateImageIdForInferDeployInstance(ifoption, preImgResp)
  686. if err != nil {
  687. return err
  688. }
  689. return nil
  690. }
  691. return errors.New("failed to get ImageId")
  692. }
  693. func generateImageIdForTraining(option *option.AiOption, preImgResp *octopus.GetPresetImageListResp) error {
  694. if option.ResourceType == CARD {
  695. for _, image := range preImgResp.Payload.Images {
  696. if strings.Contains(image.ImageName, cardAliasMap[strings.ToUpper(option.ComputeCard)]) {
  697. switch strings.ToUpper(option.ComputeCard) {
  698. case GCU:
  699. if strings.HasPrefix(image.ImageVersion, "t20_") {
  700. option.ImageId = image.Id
  701. return nil
  702. }
  703. case BIV100:
  704. if strings.HasPrefix(image.ImageVersion, "bi_") {
  705. option.ImageId = image.Id
  706. return nil
  707. }
  708. case MLU:
  709. option.ImageId = image.Id
  710. return nil
  711. }
  712. }
  713. }
  714. }
  715. return errors.New("failed to set ImageId")
  716. }
  717. func generateImageIdForInferDeployInstance(option *option.InferOption, preImgResp *octopus.GetPresetImageListResp) error {
  718. for _, image := range preImgResp.Payload.Images {
  719. // temporarily use bi-v100
  720. if strings.Contains(image.ImageName, cardAliasMap[strings.ToUpper(BIV100)]) {
  721. switch strings.ToUpper(BIV100) {
  722. case GCU:
  723. if strings.HasPrefix(image.ImageVersion, "t20_") {
  724. option.ImageId = image.Id
  725. return nil
  726. }
  727. case BIV100:
  728. if strings.HasPrefix(image.ImageVersion, "bi_") {
  729. option.ImageId = image.Id
  730. return nil
  731. }
  732. case MLU:
  733. option.ImageId = image.Id
  734. return nil
  735. }
  736. }
  737. }
  738. return errors.New("failed to set ImageId")
  739. }
  740. func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error {
  741. req := &octopus.GetMyAlgorithmListReq{
  742. Platform: o.platform,
  743. PageIndex: o.pageIndex,
  744. PageSize: o.pageSize,
  745. }
  746. resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
  747. if err != nil {
  748. return err
  749. }
  750. if !resp.Success {
  751. return errors.New("failed to get algorithmId")
  752. }
  753. if option != nil {
  754. err = generateAlgorithmIdForTraining(option, resp)
  755. if err != nil {
  756. return err
  757. }
  758. return nil
  759. }
  760. if ifoption != nil {
  761. err = generateAlgorithmIdForInferDeployInstance(ifoption, resp)
  762. if err != nil {
  763. return err
  764. }
  765. return nil
  766. }
  767. return errors.New("failed to set AlgorithmId")
  768. }
  769. func generateAlgorithmIdForTraining(option *option.AiOption, resp *octopus.GetMyAlgorithmListResp) error {
  770. for _, algorithm := range resp.Payload.Algorithms {
  771. if algorithm.FrameworkName == strings.Title(option.TaskType) {
  772. ns := strings.Split(algorithm.AlgorithmName, UNDERSCORE)
  773. if ns[0] != option.DatasetsName {
  774. continue
  775. }
  776. if ns[1] != option.AlgorithmName {
  777. continue
  778. }
  779. switch option.ResourceType {
  780. case CPU:
  781. if ns[2] != CPU {
  782. continue
  783. }
  784. case CARD:
  785. if ns[2] != strings.ToLower(option.ComputeCard) {
  786. continue
  787. }
  788. }
  789. option.AlgorithmId = algorithm.AlgorithmId
  790. return nil
  791. }
  792. }
  793. return errors.New("Algorithm does not exist")
  794. }
  795. func generateAlgorithmIdForInferDeployInstance(option *option.InferOption, resp *octopus.GetMyAlgorithmListResp) error {
  796. if option.ModelType == "" {
  797. return errors.New("ModelType not set")
  798. }
  799. if option.ModelName == "" {
  800. return errors.New("ModelName not set")
  801. }
  802. for _, algorithm := range resp.Payload.Algorithms {
  803. if strings.Contains(algorithm.AlgorithmName, option.ModelName) {
  804. option.AlgorithmId = algorithm.AlgorithmId
  805. return nil
  806. }
  807. }
  808. return errors.New("ModelName does not exist")
  809. }
  810. func (o *OctopusLink) generateCmd(option *option.AiOption, ifoption *option.InferOption) error {
  811. if option != nil {
  812. err := generateCmdForTraining(option)
  813. if err != nil {
  814. return err
  815. }
  816. return nil
  817. }
  818. if ifoption != nil {
  819. err := generateCmdForInferDeployInstance(ifoption)
  820. if err != nil {
  821. return err
  822. }
  823. return nil
  824. }
  825. return errors.New("failed to set cmd")
  826. }
  827. func generateCmdForTraining(option *option.AiOption) error {
  828. if option.Cmd == "" {
  829. switch option.ComputeCard {
  830. case GCU:
  831. option.Cmd = "cd /code; python3 train.py"
  832. case MLU:
  833. option.Cmd = ". /torch/venv3/pytorch/bin/activate; cd /code; python train.py"
  834. default:
  835. option.Cmd = TRAIN_CMD
  836. }
  837. }
  838. return nil
  839. }
  840. func generateCmdForInferDeployInstance(option *option.InferOption) error {
  841. if option.Cmd == "" {
  842. nameCmd, ok := CardModelNameCmdMap[option.ComputeCard]
  843. if !ok {
  844. return errors.New("failed to set cmd, ComputeCard not exist")
  845. }
  846. cmd, ok := nameCmd[option.ModelName]
  847. if !ok {
  848. return errors.New("failed to set cmd, ModelName not exist")
  849. }
  850. option.Cmd = cmd
  851. return nil
  852. }
  853. return nil
  854. }
  855. func (o *OctopusLink) generateEnv(option *option.AiOption) error {
  856. return nil
  857. }
  858. func (o *OctopusLink) generateParams(option *option.AiOption) error {
  859. if len(option.Params) == 0 {
  860. epoch := "epoch" + COMMA + "1"
  861. option.Params = append(option.Params, epoch)
  862. }
  863. return nil
  864. }
  865. func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
  866. if option.Tops == 0 {
  867. for _, spec := range specs.TrainResourceSpecs {
  868. if spec.Price == 1 {
  869. ns := strings.Split(spec.Name, COMMA)
  870. cardSpecs := strings.Split(ns[0], STAR)
  871. if cardSpecs[1] == cardCnMap[strings.ToUpper(computeCard)] {
  872. option.ResourceId = spec.Id
  873. option.ComputeCard = computeCard
  874. return nil
  875. }
  876. } else {
  877. continue
  878. }
  879. }
  880. } else {
  881. cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
  882. for _, spec := range specs.TrainResourceSpecs {
  883. if option.Tops < BASE_TOPS {
  884. if spec.Price == 1 {
  885. ns := strings.Split(spec.Name, COMMA)
  886. cardSpecs := strings.Split(ns[0], STAR)
  887. if cardSpecs[1] == cardCnMap[strings.ToUpper(computeCard)] {
  888. option.ResourceId = spec.Id
  889. option.ComputeCard = computeCard
  890. return nil
  891. }
  892. } else {
  893. continue
  894. }
  895. } else {
  896. ns := strings.Split(spec.Name, COMMA)
  897. if len(ns) != 4 {
  898. continue
  899. }
  900. cardSpecs := strings.Split(ns[0], STAR)
  901. if cardSpecs[1] != cardCnMap[strings.ToUpper(computeCard)] {
  902. continue
  903. }
  904. s, err := strconv.ParseFloat(cardSpecs[0], 64)
  905. if err != nil {
  906. return err
  907. }
  908. switch computeCard {
  909. case GCU:
  910. option.ComputeCard = computeCard
  911. if cardNum == s { // 1, 4, 8
  912. option.ResourceId = spec.Id
  913. return nil
  914. }
  915. if 1 < cardNum && cardNum <= 4 && s == 4 {
  916. option.ResourceId = spec.Id
  917. return nil
  918. }
  919. if 4 < cardNum && s == 8 {
  920. option.ResourceId = spec.Id
  921. return nil
  922. }
  923. case MLU: // 1, 2, 4
  924. option.ComputeCard = computeCard
  925. if cardNum/2 == s {
  926. option.ResourceId = spec.Id
  927. return nil
  928. }
  929. if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
  930. option.ResourceId = spec.Id
  931. return nil
  932. }
  933. if 2 < cardNum/2 && s == 4 {
  934. option.ResourceId = spec.Id
  935. return nil
  936. }
  937. }
  938. }
  939. }
  940. }
  941. return errors.New("set ResourceId error")
  942. }
  943. func (o *OctopusLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
  944. req := &octopus.GetNotebookListReq{
  945. Platform: o.platform,
  946. PageIndex: o.pageIndex,
  947. PageSize: o.pageSize,
  948. SearchKey: DEPLOY_INSTANCE_PREFIEX,
  949. }
  950. list, err := o.octopusRpc.GetNotebookList(ctx, req)
  951. if err != nil {
  952. return nil, err
  953. }
  954. var imageUrls []*inference.InferUrl
  955. for _, notebook := range list.Payload.GetNotebooks() {
  956. if strings.Contains(notebook.Desc, option.ModelName) && notebook.Status == "running" {
  957. url := strings.Replace(notebook.Tasks[0].Url, FORWARD_SLASH, "", -1)
  958. names := strings.Split(notebook.Desc, FORWARD_SLASH)
  959. imageUrl := &inference.InferUrl{
  960. Url: DOMAIN + url,
  961. Card: names[2],
  962. }
  963. imageUrls = append(imageUrls, imageUrl)
  964. } else {
  965. continue
  966. }
  967. }
  968. if len(imageUrls) == 0 {
  969. return nil, errors.New("no infer url available")
  970. }
  971. clusterWithUrl := &inference.ClusterInferUrl{
  972. ClusterName: o.platform,
  973. ClusterType: TYPE_OCTOPUS,
  974. InferUrls: imageUrls,
  975. }
  976. return clusterWithUrl, nil
  977. }
  978. func (o *OctopusLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
  979. var insList []*inference.DeployInstance
  980. req := &octopus.GetNotebookListReq{
  981. Platform: o.platform,
  982. PageIndex: o.pageIndex,
  983. PageSize: o.pageSize,
  984. SearchKey: DEPLOY_INSTANCE_PREFIEX,
  985. }
  986. list, err := o.octopusRpc.GetNotebookList(ctx, req)
  987. if err != nil {
  988. return nil, err
  989. }
  990. if list.Error != nil {
  991. return nil, errors.New(list.Error.Message)
  992. }
  993. for _, notebook := range list.Payload.Notebooks {
  994. ins := &inference.DeployInstance{}
  995. ins.InstanceName = notebook.Name
  996. ins.InstanceId = notebook.Id
  997. ins.ClusterName = o.platform
  998. ins.Status = notebook.Status
  999. ins.ClusterType = TYPE_OCTOPUS
  1000. insList = append(insList, ins)
  1001. }
  1002. return insList, nil
  1003. }
  1004. func (o *OctopusLink) StartInferDeployInstance(ctx context.Context, id string) bool {
  1005. req := &octopus.StartNotebookReq{
  1006. Platform: o.platform,
  1007. Id: id,
  1008. }
  1009. resp, err := o.octopusRpc.StartNotebook(ctx, req)
  1010. if err != nil || !resp.Success {
  1011. return false
  1012. }
  1013. return resp.Success
  1014. }
  1015. func (o *OctopusLink) StopInferDeployInstance(ctx context.Context, id string) bool {
  1016. req := &octopus.StopNotebookReq{
  1017. Platform: o.platform,
  1018. Id: id,
  1019. }
  1020. resp, err := o.octopusRpc.StopNotebook(ctx, req)
  1021. if err != nil || !resp.Success {
  1022. return false
  1023. }
  1024. return resp.Success
  1025. }
  1026. func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
  1027. ins := &inference.DeployInstance{}
  1028. req := &octopus.GetNotebookReq{
  1029. Platform: o.platform,
  1030. Id: id,
  1031. }
  1032. resp, err := o.octopusRpc.GetNotebook(ctx, req)
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. if resp.Payload == nil {
  1037. return nil, errors.New("instance does not exist")
  1038. }
  1039. url := strings.Replace(resp.Payload.Notebook.Tasks[0].Url, FORWARD_SLASH, "", -1)
  1040. inferUrl := DOMAIN + url
  1041. var modelType string
  1042. var modelName string
  1043. var card string
  1044. if resp.Payload.Notebook.Desc != "" {
  1045. str := strings.Split(resp.Payload.Notebook.Desc, FORWARD_SLASH)
  1046. if len(str) == 3 {
  1047. modelType = str[0]
  1048. modelName = str[1]
  1049. card = str[2]
  1050. }
  1051. }
  1052. ins.InstanceName = resp.Payload.Notebook.Name
  1053. ins.InstanceId = resp.Payload.Notebook.Id
  1054. ins.ClusterName = o.platform
  1055. ins.Status = resp.Payload.Notebook.Status
  1056. ins.ClusterType = TYPE_OCTOPUS
  1057. ins.ModelType = modelType
  1058. ins.ModelName = modelName
  1059. ins.InferUrl = inferUrl
  1060. ins.InferCard = card
  1061. return ins, nil
  1062. }
  1063. func (o *OctopusLink) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
  1064. stream, err := o.octopusRpc.GetInferResult(ctx)
  1065. if err != nil {
  1066. return "", err
  1067. }
  1068. buffer := make([]byte, 2048)
  1069. bufferedReader := bufio.NewReader(file)
  1070. for {
  1071. _, err = bufferedReader.Read(buffer)
  1072. if err != nil {
  1073. if err != io.EOF {
  1074. return "", err
  1075. }
  1076. break
  1077. }
  1078. err = stream.Send(&octopus.InferResultReq{
  1079. Platform: o.platform,
  1080. InferUrl: url,
  1081. FileName: fileName,
  1082. FileBytes: buffer,
  1083. })
  1084. }
  1085. recv, err := stream.CloseAndRecv()
  1086. if err != nil {
  1087. return "", err
  1088. }
  1089. return recv.Result, nil
  1090. }
  1091. func (o *OctopusLink) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
  1092. err := o.generateResourceId(ctx, nil, option)
  1093. if err != nil {
  1094. return "", err
  1095. }
  1096. err = o.generateAlgorithmId(ctx, nil, option)
  1097. if err != nil {
  1098. return "", err
  1099. }
  1100. err = o.generateImageId(ctx, nil, option)
  1101. if err != nil {
  1102. return "", err
  1103. }
  1104. err = o.generateCmd(nil, option)
  1105. if err != nil {
  1106. return "", err
  1107. }
  1108. desc := option.ModelType + FORWARD_SLASH + option.ModelName + FORWARD_SLASH + strings.ToLower(BIV100)
  1109. param := &octopus.CreateNotebookParam{
  1110. Name: DEPLOY_INSTANCE_PREFIEX + DASH + utils.TimeString(),
  1111. ResourcePool: RESOURCE_POOL,
  1112. ResourceSpecId: option.ResourceId,
  1113. AlgorithmId: option.AlgorithmId,
  1114. AlgorithmVersion: VERSION,
  1115. ImageId: option.ImageId,
  1116. DatasetId: "",
  1117. DatasetVersion: "",
  1118. Command: option.Cmd,
  1119. Desc: desc,
  1120. TaskNumber: 1,
  1121. }
  1122. req := &octopus.CreateNotebookReq{
  1123. Platform: o.platform,
  1124. Params: param,
  1125. }
  1126. resp, err := o.octopusRpc.CreateNotebook(ctx, req)
  1127. if err != nil {
  1128. return "", err
  1129. }
  1130. if !resp.Success {
  1131. return "", errors.New(resp.Error.Message)
  1132. }
  1133. return resp.Payload.Id, nil
  1134. }
  1135. func (o *OctopusLink) CheckModelExistence(ctx context.Context, name string, mtype string) bool {
  1136. ifoption := &option.InferOption{
  1137. ModelName: name,
  1138. ModelType: mtype,
  1139. }
  1140. err := o.generateAlgorithmId(ctx, nil, ifoption)
  1141. if err != nil {
  1142. return false
  1143. }
  1144. return true
  1145. }
  1146. func (o *OctopusLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
  1147. return nil, nil
  1148. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.