You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

openi.go 29 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
2 months ago
10 months ago
10 months ago
10 months ago
10 months ago
2 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
4 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago

  1. package storeLink
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/json-iterator/go"
  9. "github.com/rs/zerolog/log"
  10. openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  17. "gitlink.org.cn/JointCloud/pcm-openi/common"
  18. "gitlink.org.cn/JointCloud/pcm-openi/model"
  19. "mime/multipart"
  20. "net/http"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. )
  28. const (
  29. DEBUG = "DEBUG"
  30. TRAIN = "TRAIN"
  31. INFERENCE = "INFERENCE"
  32. C2NET = "C2Net"
  33. TESTREPO = "testrepo"
  34. ONLINEINFERENCE = "ONLINEINFERENCE" //online inference
  35. )
  36. const (
  37. CreationRequirelUrl = "/api/v1/task/creationRequired"
  38. TaskCreatelUrl = "/api/v1/task/create"
  39. ReposUrl = "/api/v1/user/repos"
  40. TaskListUrl = "/api/v1/task/list"
  41. TaskDetailsUrl = "/api/v1/task/detail"
  42. TaskLogUrl = "/api/v1/task/log"
  43. TaskStopUrl = "/api/v1/task/stop"
  44. TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl"
  45. )
  46. // compute source
  47. var (
  48. ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
  49. )
  50. type ResourceSpecOpenI struct {
  51. ResType string
  52. Name string
  53. Number int64
  54. }
  55. type OpenI struct {
  56. participantId int64
  57. platform string
  58. host string
  59. userName string
  60. accessToken string
  61. }
  62. func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI {
  63. return &OpenI{
  64. host: host,
  65. participantId: id,
  66. userName: name,
  67. accessToken: token,
  68. platform: platform,
  69. }
  70. }
  71. func (o *OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
  72. switch mode {
  73. case executor.SUBMIT_MODE_JOINT_CLOUD:
  74. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  75. var repoName string
  76. codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
  77. if len(codePaths) != 3 {
  78. return nil, fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
  79. }
  80. repoName = codePaths[0]
  81. spec := &ResourceSpecOpenI{}
  82. for _, res := range option.ResourcesRequired {
  83. typeName, ok := res["type"]
  84. if !ok {
  85. continue
  86. }
  87. name, ok := res["name"]
  88. if !ok {
  89. continue
  90. }
  91. for _, s := range ComputeSource {
  92. switch typeName {
  93. case s:
  94. num, ok := res["number"]
  95. if !ok {
  96. continue
  97. }
  98. n := openIcom.ConvertTypeToString(num)
  99. val, err := strconv.ParseInt(n, 10, 64)
  100. if err != nil {
  101. return nil, err
  102. }
  103. spec.ResType = s
  104. spec.Name = openIcom.ConvertTypeToString(name)
  105. spec.Number = val
  106. break
  107. }
  108. }
  109. }
  110. if spec.ResType == "" || spec.Name == "" {
  111. return nil, errors.New("resource spec not found")
  112. }
  113. creationRequirelUrl := o.host + CreationRequirelUrl
  114. param := model.TaskCreationRequiredParam{
  115. UserName: o.userName,
  116. RepoName: repoName,
  117. JobType: TRAIN,
  118. ComputeSource: spec.ResType,
  119. ClusterType: C2NET,
  120. }
  121. b, _ := json.Marshal(param)
  122. byt := bytes.NewBuffer(b)
  123. resp := struct {
  124. Code int `json:"code"`
  125. Msg string `json:"msg"`
  126. Data model.TaskCreationRequired `json:"data"`
  127. }{}
  128. req := common.GetRestyRequest(common.TIMEOUT)
  129. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  130. req.RawRequest = r
  131. req.URL = creationRequirelUrl
  132. _, err := req.
  133. SetHeader("Content-Type", "application/json").
  134. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  135. SetBody(byt).
  136. SetResult(&resp).
  137. Send()
  138. if err != nil {
  139. return nil, errors.New("failed to invoke TaskCreationRequired; " + err.Error())
  140. }
  141. if len(resp.Data.Data.Specs.All) == 0 {
  142. return nil, errors.New("TaskCreationRequired specs are empty")
  143. }
  144. for _, s := range resp.Data.Data.Specs.All {
  145. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  146. if int(spec.Number) == s.AccCardsNum {
  147. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  148. break
  149. }
  150. }
  151. }
  152. if option.ResourceId == "" {
  153. return nil, errors.New("can not find spec Id")
  154. }
  155. option.ComputeCard = spec.Name
  156. }
  157. task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  158. if err != nil {
  159. return nil, err
  160. }
  161. return task, nil
  162. }
  163. func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  164. taskCreatelUrl := o.host + TaskCreatelUrl
  165. var repoName string
  166. var branchName string
  167. var bootFile string
  168. codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
  169. if len(codePaths) != 3 {
  170. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  171. }
  172. specs := strings.Split(resourceId, FORWARD_SLASH)
  173. specId, err := strconv.ParseInt(specs[0], 10, 0)
  174. if err != nil {
  175. return nil, err
  176. }
  177. computeSource := specs[1]
  178. repoName = codePaths[0]
  179. branchName = codePaths[1]
  180. bootFile = codePaths[2]
  181. // params
  182. var parameters struct {
  183. Parameter []struct {
  184. Label string `json:"label"`
  185. Value string `json:"value"`
  186. } `json:"parameter"`
  187. }
  188. // add default param
  189. current_id := strconv.Itoa(int(o.participantId))
  190. current_platform := CURRENT_PLATFORM + COMMA + current_id
  191. params = append(params, current_platform)
  192. for _, param := range params {
  193. s := strings.Split(param, COMMA)
  194. st := struct {
  195. Label string `json:"label"`
  196. Value string `json:"value"`
  197. }{
  198. Label: s[0],
  199. Value: s[1],
  200. }
  201. parameters.Parameter = append(parameters.Parameter, st)
  202. }
  203. paramStr, _ := json.Marshal(parameters)
  204. // choose imageId and imageUrl
  205. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  206. if err != nil {
  207. return nil, err
  208. }
  209. taskParam := &model.CreateTaskParam{
  210. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  211. JobType: TRAIN,
  212. Cluster: C2NET,
  213. DisplayJobName: strings.ToLower(TRAIN + UNDERSCORE + utils.RandomString(10)),
  214. ComputeSource: computeSource,
  215. SpecId: int(specId),
  216. BranchName: branchName,
  217. ImageId: imgId,
  218. ImageUrl: imgUrl,
  219. DatasetUuidStr: datasetsId,
  220. Params: string(paramStr),
  221. BootFile: bootFile,
  222. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  223. WorkServerNumber: 1, // 运行节点数
  224. }
  225. param := model.CreateTaskReq{
  226. UserName: o.userName,
  227. RepoName: repoName,
  228. CreateTaskParam: taskParam,
  229. }
  230. resp := struct {
  231. Code int `json:"code"`
  232. Msg string `json:"msg"`
  233. Data model.CreateTask `json:"data"`
  234. }{}
  235. req := common.GetRestyRequest(common.TIMEOUT)
  236. _, err = req.
  237. SetHeader("Content-Type", "application/json").
  238. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  239. SetBody(&param).
  240. SetResult(&resp).
  241. Post(taskCreatelUrl)
  242. if err != nil {
  243. return nil, err
  244. }
  245. if resp.Code != 200 {
  246. return nil, errors.New(resp.Msg)
  247. }
  248. if resp.Data.Code != 0 {
  249. return nil, errors.New(resp.Msg)
  250. }
  251. if (resp.Data == model.CreateTask{}) {
  252. return nil, errors.New("failed to submit task, empty response")
  253. }
  254. return resp.Data, nil
  255. }
  256. func swapImageIdAndImageUrl(imageId string) (string, string, error) {
  257. if imageId == "" {
  258. return "", "", errors.New("imageId is empty")
  259. }
  260. var imgId string
  261. var imgUrl string
  262. parsedURL, err := url.Parse("http://" + imageId)
  263. if err != nil {
  264. return "", "", err
  265. }
  266. if utils.IsValidHostAddress(parsedURL.Host) {
  267. imgId = ""
  268. imgUrl = imageId
  269. } else {
  270. imgId = imageId
  271. imgUrl = ""
  272. }
  273. return imgId, imgUrl, nil
  274. }
  275. func (o *OpenI) Stop(ctx context.Context, id string) error {
  276. task, err := o.getTrainingTask(ctx, id)
  277. if err != nil {
  278. return err
  279. }
  280. codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
  281. if len(codePaths) != 3 {
  282. return errors.New("failed to stop, openI desc not set")
  283. }
  284. repoName := codePaths[0]
  285. taskStopUrl := o.host + TaskStopUrl
  286. param := model.StopTaskParam{
  287. UserName: o.userName,
  288. RepoName: repoName,
  289. Id: id,
  290. }
  291. resp := struct {
  292. Code int `json:"code"`
  293. Msg string `json:"msg"`
  294. Data model.StopTask `json:"data"`
  295. }{}
  296. req := common.GetRestyRequest(common.TIMEOUT)
  297. _, err = req.
  298. SetHeader("Content-Type", "application/json").
  299. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  300. SetBody(&param).
  301. SetResult(&resp).
  302. Post(taskStopUrl)
  303. if err != nil {
  304. return err
  305. }
  306. if resp.Code != http.StatusOK {
  307. return errors.New("failed to stop")
  308. }
  309. return nil
  310. }
  311. func (o *OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
  312. return nil, errors.New("failed to implement")
  313. }
  314. func (o *OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
  315. return nil, errors.New("failed to implement")
  316. }
  317. func (o *OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
  318. return false
  319. }
  320. func (o *OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
  321. err := o.Stop(ctx, id)
  322. if err != nil {
  323. return false
  324. }
  325. return true
  326. }
  327. func (o *OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
  328. task, err := o.getTrainingTask(ctx, id)
  329. if err != nil {
  330. return nil, err
  331. }
  332. description := task.Data.Task.Description
  333. //从描述中解析出repoName
  334. codePaths := strings.SplitN(description, FORWARD_SLASH, 3)
  335. if len(codePaths) != 3 {
  336. return nil, fmt.Errorf("algorithmId %s format is incorrect", description)
  337. }
  338. repoName := codePaths[0]
  339. var resp inference.DeployInstance
  340. resp.InstanceId = id
  341. resp.InstanceName = task.Data.Task.DisplayJobName
  342. resp.ModelName = task.Data.Task.PretrainModelName
  343. resp.ModelType = ""
  344. resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType
  345. resp.ClusterName = o.platform
  346. resp.ClusterType = TYPE_OPENI
  347. //获取在线推理url
  348. var inferUrl string
  349. if task.Data.Task.Status == "RUNNING" {
  350. inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName)
  351. if err != nil {
  352. return nil, err
  353. }
  354. }
  355. resp.InferUrl = inferUrl
  356. resp.Status = task.Data.Task.Status
  357. resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout)
  358. log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp)
  359. return &resp, nil
  360. }
  361. func (o *OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
  362. var repoName string
  363. codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
  364. if len(codePaths) != 3 {
  365. return "", fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
  366. }
  367. repoName = codePaths[0]
  368. spec := &ResourceSpecOpenI{}
  369. for _, res := range option.ResourcesRequired {
  370. typeName, ok := res["type"]
  371. if !ok {
  372. continue
  373. }
  374. name, ok := res["name"]
  375. if !ok {
  376. continue
  377. }
  378. for _, s := range ComputeSource {
  379. switch typeName {
  380. case s:
  381. num, ok := res["number"]
  382. if !ok {
  383. continue
  384. }
  385. n := openIcom.ConvertTypeToString(num)
  386. val, err := strconv.ParseInt(n, 10, 64)
  387. if err != nil {
  388. return "", err
  389. }
  390. spec.ResType = s
  391. spec.Name = openIcom.ConvertTypeToString(name)
  392. spec.Number = val
  393. break
  394. }
  395. }
  396. }
  397. if spec.ResType == "" || spec.Name == "" {
  398. return "", errors.New("resource spec not found")
  399. }
  400. creationRequirelUrl := o.host + CreationRequirelUrl
  401. param := model.TaskCreationRequiredParam{
  402. UserName: o.userName,
  403. RepoName: repoName,
  404. JobType: ONLINEINFERENCE,
  405. ComputeSource: spec.ResType,
  406. ClusterType: C2NET,
  407. }
  408. b, _ := json.Marshal(param)
  409. byt := bytes.NewBuffer(b)
  410. resp := struct {
  411. Code int `json:"code"`
  412. Msg string `json:"msg"`
  413. Data model.TaskCreationRequired `json:"data"`
  414. }{}
  415. req := common.GetRestyRequest(common.TIMEOUT)
  416. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  417. req.RawRequest = r
  418. req.URL = creationRequirelUrl
  419. _, err := req.
  420. SetHeader("Content-Type", "application/json").
  421. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  422. SetBody(byt).
  423. SetResult(&resp).
  424. Send()
  425. if err != nil {
  426. return "", errors.New("failed to invoke TaskCreationRequired")
  427. }
  428. if len(resp.Data.Data.Specs.All) == 0 {
  429. return "", errors.New("TaskCreationRequired specs are empty")
  430. }
  431. for _, s := range resp.Data.Data.Specs.All {
  432. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  433. if int(spec.Number) == s.AccCardsNum {
  434. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  435. break
  436. }
  437. }
  438. }
  439. if option.ResourceId == "" {
  440. return "", errors.New("can not find spec Id")
  441. }
  442. option.ComputeCard = spec.Name
  443. task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID)
  444. if err != nil {
  445. return "", err
  446. }
  447. ma, err := jsoniter.Marshal(task)
  448. if err != nil {
  449. return "", err
  450. }
  451. taskId := jsoniter.Get(ma, "data").Get("id").ToString()
  452. return taskId, nil
  453. }
  454. func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) {
  455. taskCreatelUrl := o.host + TaskCreatelUrl
  456. var repoName string
  457. var branchName string
  458. var bootFile string
  459. //从描述中解析出repoName
  460. codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
  461. if len(codePaths) != 3 {
  462. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  463. }
  464. specs := strings.Split(resourceId, FORWARD_SLASH)
  465. specId, err := strconv.ParseInt(specs[0], 10, 0)
  466. if err != nil {
  467. return nil, err
  468. }
  469. computeSource := specs[1]
  470. repoName = codePaths[0]
  471. branchName = codePaths[1]
  472. bootFile = strings.Join(codePaths[2:], "/")
  473. log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile)
  474. //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"
  475. // choose imageId and imageUrl
  476. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  477. if err != nil {
  478. return nil, err
  479. }
  480. taskParam := &model.CreateTaskParam{
  481. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  482. JobType: ONLINEINFERENCE,
  483. Cluster: C2NET,
  484. DisplayJobName: strings.ToLower(ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10)),
  485. ComputeSource: computeSource,
  486. SpecId: int(specId),
  487. BranchName: branchName,
  488. ImageId: imgId,
  489. ImageUrl: imgUrl,
  490. PretrainModelIdStr: modelId,
  491. BootFile: bootFile,
  492. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  493. WorkServerNumber: 1, // 运行节点数
  494. }
  495. param := model.CreateTaskReq{
  496. UserName: o.userName,
  497. RepoName: repoName,
  498. CreateTaskParam: taskParam,
  499. }
  500. resp := struct {
  501. Code int `json:"code"`
  502. Msg string `json:"msg"`
  503. Data model.CreateTask `json:"data"`
  504. }{}
  505. req := common.GetRestyRequest(common.TIMEOUT)
  506. _, err = req.
  507. SetHeader("Content-Type", "application/json").
  508. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  509. SetBody(&param).
  510. SetResult(&resp).
  511. Post(taskCreatelUrl)
  512. if err != nil {
  513. return nil, err
  514. }
  515. if resp.Code != http.StatusOK {
  516. return nil, errors.New(resp.Msg)
  517. }
  518. if resp.Data.Data.Id == 0 {
  519. return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg)
  520. }
  521. return resp.Data, nil
  522. }
  523. func (o *OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
  524. return false
  525. }
  526. func (o *OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
  527. return "", errors.New("failed to implement")
  528. }
  529. func (o *OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
  530. return nil, errors.New("failed to implement")
  531. }
  532. func (o *OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
  533. return nil, errors.New("failed to implement")
  534. }
  535. func (o *OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
  536. return nil, errors.New("failed to implement")
  537. }
  538. func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
  539. task, err := o.getTrainingTask(ctx, taskId)
  540. if err != nil {
  541. return "", err
  542. }
  543. codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
  544. if len(codePaths) != 3 {
  545. return "", errors.New("failed to get log, openI desc not set")
  546. }
  547. repoName := codePaths[0]
  548. tasklogurl := o.host + TaskLogUrl
  549. param := model.GetLogParam{
  550. UserName: o.userName,
  551. RepoName: repoName,
  552. Id: taskId,
  553. }
  554. b, _ := json.Marshal(param)
  555. byt := bytes.NewBuffer(b)
  556. resp := struct {
  557. Code int `json:"code"`
  558. Msg string `json:"msg"`
  559. Data string `json:"data"`
  560. }{}
  561. req := common.GetRestyRequest(common.TIMEOUT)
  562. r, _ := http.NewRequest("GET", tasklogurl, byt)
  563. req.RawRequest = r
  564. req.URL = tasklogurl
  565. _, err = req.
  566. SetHeader("Content-Type", "application/json").
  567. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  568. SetBody(byt).
  569. SetResult(&resp).
  570. Send()
  571. if err != nil {
  572. return "", err
  573. }
  574. if resp.Data == "" {
  575. return "waiting for logs", nil
  576. }
  577. return resp.Data, nil
  578. }
  579. func (o *OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) {
  580. taskDetailsUrl := o.host + TaskDetailsUrl
  581. param := model.TaskDetailParam{
  582. UserName: o.userName,
  583. RepoName: TESTREPO,
  584. Id: taskId,
  585. }
  586. b, _ := json.Marshal(param)
  587. byt := bytes.NewBuffer(b)
  588. resp := struct {
  589. Code int `json:"code"`
  590. Msg string `json:"msg"`
  591. Data model.TaskDetail `json:"data"`
  592. }{}
  593. req := common.GetRestyRequest(common.TIMEOUT)
  594. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  595. req.RawRequest = r
  596. req.URL = taskDetailsUrl
  597. _, err := req.
  598. SetHeader("Content-Type", "application/json").
  599. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  600. SetBody(byt).
  601. SetResult(&resp).
  602. Send()
  603. if err != nil {
  604. return nil, errors.New("failed to invoke taskDetails")
  605. }
  606. if resp.Data.Code != 0 && resp.Data.Msg != "" {
  607. return nil, errors.New(resp.Data.Msg)
  608. }
  609. return &resp.Data, nil
  610. }
  611. func (o *OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
  612. task, err := o.getTrainingTask(ctx, taskId)
  613. if err != nil {
  614. return nil, err
  615. }
  616. var resp collector.Task
  617. resp.Id = strconv.Itoa(task.Data.Task.Id)
  618. if task.Data.Task.StartTime != 0 {
  619. resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout)
  620. }
  621. if task.Data.Task.EndTime != 0 {
  622. resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout)
  623. }
  624. switch task.Data.Task.Status {
  625. case "SUCCEEDED":
  626. resp.Status = constants.Completed
  627. case "FAILED":
  628. resp.Status = constants.Failed
  629. case "CREATED_FAILED":
  630. resp.Status = constants.Failed
  631. case "RUNNING":
  632. resp.Status = constants.Running
  633. case "STOPPED":
  634. resp.Status = constants.Stopped
  635. case "PENDING":
  636. resp.Status = constants.Pending
  637. case "WAITING":
  638. resp.Status = constants.Waiting
  639. default:
  640. resp.Status = "undefined"
  641. }
  642. return &resp, nil
  643. }
  644. func (o *OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
  645. return "", errors.New("failed to implement")
  646. }
  647. func (o *OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
  648. return errors.New("failed to implement")
  649. }
  650. func (o *OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
  651. return nil, errors.New("failed to implement")
  652. }
  653. func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
  654. return 0, errors.New("failed to implement")
  655. }
  656. func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
  657. var jobType string
  658. if resrcType == "Inference" {
  659. jobType = ONLINEINFERENCE
  660. } else if resrcType == "Train" {
  661. jobType = TRAIN
  662. }
  663. var resources []interface{}
  664. res := &collector.ResourceSpec{
  665. ClusterId: strconv.FormatInt(o.participantId, 10),
  666. Tag: resrcType,
  667. }
  668. //clres := &collector.ClusterResource{}
  669. creationRequirelUrl := o.host + CreationRequirelUrl
  670. reposUrl := o.host + ReposUrl
  671. taskListUrl := o.host + TaskListUrl
  672. var wg sync.WaitGroup
  673. var ch = make(chan *collector.ClusterResource)
  674. var once sync.Once
  675. wg.Add(2)
  676. go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl)
  677. go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl)
  678. go func() {
  679. wg.Wait()
  680. close(ch)
  681. }()
  682. for v := range ch {
  683. resources = append(resources, v)
  684. }
  685. res.Resources = resources
  686. return res, nil
  687. }
  688. func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) {
  689. defer wg.Done()
  690. for c := range ComputeSource {
  691. wg.Add(1)
  692. i := c
  693. go func() {
  694. defer wg.Done()
  695. param := model.TaskCreationRequiredParam{
  696. UserName: o.userName,
  697. RepoName: TESTREPO,
  698. JobType: jobType,
  699. ComputeSource: ComputeSource[i],
  700. ClusterType: C2NET,
  701. }
  702. b, _ := json.Marshal(param)
  703. byt := bytes.NewBuffer(b)
  704. resp := struct {
  705. Code int `json:"code"`
  706. Msg string `json:"msg"`
  707. Data model.TaskCreationRequired `json:"data"`
  708. }{}
  709. req := common.GetRestyRequest(common.TIMEOUT)
  710. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  711. req.RawRequest = r
  712. req.URL = creationRequirelUrl
  713. _, err := req.
  714. SetHeader("Content-Type", "application/json").
  715. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  716. SetBody(byt).
  717. SetResult(&resp).
  718. Send()
  719. if err != nil {
  720. return
  721. }
  722. if len(resp.Data.Data.Specs.All) == 0 {
  723. return
  724. }
  725. // balance
  726. var balanceCheck = func() {
  727. balance := resp.Data.Data.PointAccount.Balance
  728. bal := &collector.Usage{}
  729. bal.Type = strings.ToUpper(BALANCE)
  730. bal.Total = &collector.UnitValue{
  731. Unit: POINT,
  732. Value: balance,
  733. }
  734. ch <- &collector.ClusterResource{Resource: bal}
  735. //rate
  736. var v float64
  737. v = 1
  738. rate := &collector.Usage{
  739. Type: strings.ToUpper(RATE),
  740. Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
  741. }
  742. ch <- &collector.ClusterResource{Resource: rate}
  743. }
  744. once.Do(balanceCheck)
  745. m := make(map[string]struct {
  746. Id int `json:"id"`
  747. AccCardsNum int `json:"acc_cards_num"`
  748. AccCardType string `json:"acc_card_type"`
  749. CpuCores int `json:"cpu_cores"`
  750. MemGiB int `json:"mem_gi_b"`
  751. GpuMemGiB int `json:"gpu_mem_gi_b"`
  752. ShareMemGiB int `json:"share_mem_gi_b"`
  753. ComputeResource string `json:"compute_resource"`
  754. UnitPrice int `json:"unit_price"`
  755. SourceSpecId string `json:"source_spec_id"`
  756. HasInternet int `json:"has_internet"`
  757. EnableVisualization bool `json:"enable_visualization"`
  758. })
  759. for _, s := range resp.Data.Data.Specs.All {
  760. e, ok := m[s.AccCardType]
  761. if ok {
  762. if s.AccCardsNum > e.AccCardsNum {
  763. m[s.AccCardType] = s
  764. }
  765. } else {
  766. m[s.AccCardType] = s
  767. }
  768. }
  769. for k, v := range m {
  770. bres := make([]*collector.Usage, 0)
  771. cres := &collector.ClusterResource{}
  772. card := &collector.Usage{
  773. Type: ComputeSource[i],
  774. Name: strings.ToUpper(k),
  775. Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  776. Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  777. }
  778. cpu := &collector.Usage{
  779. Type: strings.ToUpper(CPU),
  780. Name: strings.ToUpper(CPU),
  781. Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  782. Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  783. }
  784. mem := &collector.Usage{
  785. Type: strings.ToUpper(MEMORY),
  786. Name: strings.ToUpper(RAM),
  787. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  788. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  789. }
  790. vmem := &collector.Usage{
  791. Type: strings.ToUpper(MEMORY),
  792. Name: strings.ToUpper(VRAM),
  793. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  794. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  795. }
  796. //storage
  797. var s float64
  798. s = 1024
  799. storage := &collector.Usage{}
  800. storage.Type = STORAGE
  801. storage.Name = DISK
  802. storage.Total = &collector.UnitValue{
  803. Unit: GIGABYTE,
  804. Value: s,
  805. }
  806. storage.Available = &collector.UnitValue{
  807. Unit: GIGABYTE,
  808. Value: s,
  809. }
  810. bres = append(bres, storage)
  811. bres = append(bres, cpu)
  812. bres = append(bres, mem)
  813. bres = append(bres, vmem)
  814. cres.Resource = card
  815. cres.BaseResources = bres
  816. ch <- cres
  817. }
  818. }()
  819. }
  820. }
  821. func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) {
  822. defer wg.Done()
  823. reporesp := struct {
  824. Code int `json:"code"`
  825. Msg string `json:"msg"`
  826. Data []model.Repo `json:"data"`
  827. }{}
  828. reporeq := common.GetRestyRequest(common.TIMEOUT)
  829. repor, _ := http.NewRequest("GET", reposUrl, nil)
  830. reporeq.RawRequest = repor
  831. reporeq.URL = reposUrl
  832. _, err := reporeq.
  833. SetHeader("Content-Type", "application/json").
  834. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  835. SetResult(&reporesp).
  836. Send()
  837. if err != nil {
  838. return
  839. }
  840. if len(reporesp.Data) == 0 {
  841. return
  842. }
  843. // tasklist
  844. var runningJobs atomic.Int64
  845. var jwg sync.WaitGroup
  846. var errs []error
  847. var ech = make(chan error)
  848. jwg.Add(1)
  849. go func() {
  850. defer jwg.Done()
  851. for _, datum := range reporesp.Data {
  852. jwg.Add(1)
  853. dat := datum
  854. go func() {
  855. defer jwg.Done()
  856. param := model.TaskListParam{
  857. UserName: o.userName,
  858. RepoName: dat.Name,
  859. }
  860. b, _ := json.Marshal(param)
  861. byt := bytes.NewBuffer(b)
  862. resp := struct {
  863. Code int `json:"code"`
  864. Msg string `json:"msg"`
  865. Data model.TaskList `json:"data"`
  866. }{}
  867. req := common.GetRestyRequest(common.TIMEOUT)
  868. r, _ := http.NewRequest("GET", taskListUrl, byt)
  869. req.RawRequest = r
  870. req.URL = taskListUrl
  871. _, err := req.
  872. SetHeader("Content-Type", "application/json").
  873. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  874. SetBody(byt).
  875. SetResult(&resp).
  876. Send()
  877. if err != nil {
  878. // assume occupied running tasks
  879. ech <- err
  880. return
  881. }
  882. if len(resp.Data.Data.Tasks) == 0 {
  883. return
  884. }
  885. for _, task := range resp.Data.Data.Tasks {
  886. if task.Task.Status == RUNNING {
  887. runningJobs.Add(1)
  888. }
  889. }
  890. }()
  891. }
  892. }()
  893. go func() {
  894. jwg.Wait()
  895. close(ech)
  896. }()
  897. for v := range ech {
  898. errs = append(errs, v)
  899. }
  900. // running tasks num
  901. var runningNum int64
  902. runningNum = runningJobs.Load()
  903. run := &collector.Usage{}
  904. run.Type = strings.ToUpper(RUNNINGTASK)
  905. if len(errs) == 0 {
  906. run.Total = &collector.UnitValue{
  907. Unit: NUMBER,
  908. Value: runningNum,
  909. }
  910. ch <- &collector.ClusterResource{Resource: run}
  911. } else {
  912. runningNum = int64(len(errs)) * 4
  913. run.Total = &collector.UnitValue{
  914. Unit: NUMBER,
  915. Value: runningNum,
  916. }
  917. ch <- &collector.ClusterResource{Resource: run}
  918. }
  919. }
  920. func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {
  921. taskDetailsUrl := o.host + TaskOnlineInferUrl
  922. param := model.TaskDetailParam{
  923. UserName: o.userName,
  924. RepoName: repoName,
  925. Id: taskId,
  926. }
  927. b, _ := json.Marshal(param)
  928. byt := bytes.NewBuffer(b)
  929. resp := model.SelfEndpointUrlResp{}
  930. req := common.GetRestyRequest(common.TIMEOUT)
  931. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  932. req.RawRequest = r
  933. req.URL = taskDetailsUrl
  934. _, err := req.
  935. SetHeader("Content-Type", "application/json").
  936. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  937. SetBody(byt).
  938. SetResult(&resp).
  939. Send()
  940. if err != nil {
  941. return "", err
  942. }
  943. if resp.Code != http.StatusOK {
  944. return "", errors.New(resp.Msg)
  945. }
  946. return resp.Data.Url, nil
  947. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.