You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

openi.go 29 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
2 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
4 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago

  1. package storeLink
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/json-iterator/go"
  9. "github.com/rs/zerolog/log"
  10. openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  17. "gitlink.org.cn/JointCloud/pcm-openi/common"
  18. "gitlink.org.cn/JointCloud/pcm-openi/model"
  19. "mime/multipart"
  20. "net/http"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. )
  28. const (
  29. DEBUG = "DEBUG"
  30. TRAIN = "TRAIN"
  31. INFERENCE = "INFERENCE"
  32. C2NET = "C2Net"
  33. TESTREPO = "testrepo"
  34. ONLINEINFERENCE = "ONLINEINFERENCE" //online inference
  35. )
  36. const (
  37. CreationRequirelUrl = "/api/v1/task/creationRequired"
  38. TaskCreatelUrl = "/api/v1/task/create"
  39. ReposUrl = "/api/v1/user/repos"
  40. TaskListUrl = "/api/v1/task/list"
  41. TaskDetailsUrl = "/api/v1/task/detail"
  42. TaskLogUrl = "/api/v1/task/log"
  43. TaskStopUrl = "/api/v1/task/stop"
  44. TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl"
  45. )
  46. // compute source
  47. var (
  48. ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
  49. )
  50. type ResourceSpecOpenI struct {
  51. ResType string
  52. Name string
  53. Number int64
  54. }
  55. type OpenI struct {
  56. participantId int64
  57. platform string
  58. host string
  59. userName string
  60. accessToken string
  61. }
  62. func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI {
  63. return &OpenI{
  64. host: host,
  65. participantId: id,
  66. userName: name,
  67. accessToken: token,
  68. platform: platform,
  69. }
  70. }
  71. func (o *OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
  72. switch mode {
  73. case executor.SUBMIT_MODE_JOINT_CLOUD:
  74. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  75. var repoName string
  76. codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
  77. if len(codePaths) != 3 {
  78. return nil, fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
  79. }
  80. repoName = codePaths[0]
  81. spec := &ResourceSpecOpenI{}
  82. for _, res := range option.ResourcesRequired {
  83. typeName, ok := res["type"]
  84. if !ok {
  85. continue
  86. }
  87. name, ok := res["name"]
  88. if !ok {
  89. continue
  90. }
  91. for _, s := range ComputeSource {
  92. switch typeName {
  93. case s:
  94. num, ok := res["number"]
  95. if !ok {
  96. continue
  97. }
  98. n := openIcom.ConvertTypeToString(num)
  99. val, err := strconv.ParseInt(n, 10, 64)
  100. if err != nil {
  101. return nil, err
  102. }
  103. spec.ResType = s
  104. spec.Name = openIcom.ConvertTypeToString(name)
  105. spec.Number = val
  106. break
  107. }
  108. }
  109. }
  110. if spec.ResType == "" || spec.Name == "" {
  111. return nil, errors.New("resource spec not found")
  112. }
  113. creationRequirelUrl := o.host + CreationRequirelUrl
  114. param := model.TaskCreationRequiredParam{
  115. UserName: o.userName,
  116. RepoName: repoName,
  117. JobType: TRAIN,
  118. ComputeSource: spec.ResType,
  119. ClusterType: C2NET,
  120. }
  121. b, _ := json.Marshal(param)
  122. byt := bytes.NewBuffer(b)
  123. resp := struct {
  124. Code int `json:"code"`
  125. Msg string `json:"msg"`
  126. Data model.TaskCreationRequired `json:"data"`
  127. }{}
  128. req := common.GetRestyRequest(common.TIMEOUT)
  129. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  130. req.RawRequest = r
  131. req.URL = creationRequirelUrl
  132. _, err := req.
  133. SetHeader("Content-Type", "application/json").
  134. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  135. SetBody(byt).
  136. SetResult(&resp).
  137. Send()
  138. if err != nil {
  139. return nil, errors.New("failed to invoke TaskCreationRequired; " + err.Error())
  140. }
  141. if len(resp.Data.Data.Specs.All) == 0 {
  142. return nil, errors.New("TaskCreationRequired specs are empty")
  143. }
  144. for _, s := range resp.Data.Data.Specs.All {
  145. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  146. if int(spec.Number) == s.AccCardsNum {
  147. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  148. break
  149. }
  150. }
  151. }
  152. if option.ResourceId == "" {
  153. return nil, errors.New("can not find spec Id")
  154. }
  155. option.ComputeCard = spec.Name
  156. }
  157. task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  158. if err != nil {
  159. return nil, err
  160. }
  161. return task, nil
  162. }
  163. func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  164. taskCreatelUrl := o.host + TaskCreatelUrl
  165. var repoName string
  166. var branchName string
  167. var bootFile string
  168. codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
  169. if len(codePaths) != 3 {
  170. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  171. }
  172. specs := strings.Split(resourceId, FORWARD_SLASH)
  173. specId, err := strconv.ParseInt(specs[0], 10, 0)
  174. if err != nil {
  175. return nil, err
  176. }
  177. computeSource := specs[1]
  178. repoName = codePaths[0]
  179. branchName = codePaths[1]
  180. bootFile = codePaths[2]
  181. // params
  182. var parameters struct {
  183. Parameter []struct {
  184. Label string `json:"label"`
  185. Value string `json:"value"`
  186. } `json:"parameter"`
  187. }
  188. for _, param := range params {
  189. s := strings.Split(param, COMMA)
  190. st := struct {
  191. Label string `json:"label"`
  192. Value string `json:"value"`
  193. }{
  194. Label: s[0],
  195. Value: s[1],
  196. }
  197. parameters.Parameter = append(parameters.Parameter, st)
  198. }
  199. paramStr, _ := json.Marshal(parameters)
  200. // choose imageId and imageUrl
  201. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  202. if err != nil {
  203. return nil, err
  204. }
  205. taskParam := &model.CreateTaskParam{
  206. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  207. JobType: TRAIN,
  208. Cluster: C2NET,
  209. DisplayJobName: strings.ToLower(TRAIN + UNDERSCORE + utils.RandomString(10)),
  210. ComputeSource: computeSource,
  211. SpecId: int(specId),
  212. BranchName: branchName,
  213. ImageId: imgId,
  214. ImageUrl: imgUrl,
  215. DatasetUuidStr: datasetsId,
  216. Params: string(paramStr),
  217. BootFile: bootFile,
  218. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  219. WorkServerNumber: 1, // 运行节点数
  220. }
  221. param := model.CreateTaskReq{
  222. UserName: o.userName,
  223. RepoName: repoName,
  224. CreateTaskParam: taskParam,
  225. }
  226. resp := struct {
  227. Code int `json:"code"`
  228. Msg string `json:"msg"`
  229. Data model.CreateTask `json:"data"`
  230. }{}
  231. req := common.GetRestyRequest(common.TIMEOUT)
  232. _, err = req.
  233. SetHeader("Content-Type", "application/json").
  234. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  235. SetBody(&param).
  236. SetResult(&resp).
  237. Post(taskCreatelUrl)
  238. if err != nil {
  239. return nil, err
  240. }
  241. if resp.Code != 200 {
  242. return nil, errors.New(resp.Msg)
  243. }
  244. if resp.Data.Code != 0 {
  245. return nil, errors.New(resp.Msg)
  246. }
  247. if (resp.Data == model.CreateTask{}) {
  248. return nil, errors.New("failed to submit task, empty response")
  249. }
  250. return resp.Data, nil
  251. }
  252. func swapImageIdAndImageUrl(imageId string) (string, string, error) {
  253. if imageId == "" {
  254. return "", "", errors.New("imageId is empty")
  255. }
  256. var imgId string
  257. var imgUrl string
  258. parsedURL, err := url.Parse("http://" + imageId)
  259. if err != nil {
  260. return "", "", err
  261. }
  262. if utils.IsValidHostAddress(parsedURL.Host) {
  263. imgId = ""
  264. imgUrl = imageId
  265. } else {
  266. imgId = imageId
  267. imgUrl = ""
  268. }
  269. return imgId, imgUrl, nil
  270. }
  271. func (o *OpenI) Stop(ctx context.Context, id string) error {
  272. task, err := o.getTrainingTask(ctx, id)
  273. if err != nil {
  274. return err
  275. }
  276. codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
  277. if len(codePaths) != 3 {
  278. return errors.New("failed to stop, openI desc not set")
  279. }
  280. repoName := codePaths[0]
  281. taskStopUrl := o.host + TaskStopUrl
  282. param := model.StopTaskParam{
  283. UserName: o.userName,
  284. RepoName: repoName,
  285. Id: id,
  286. }
  287. resp := struct {
  288. Code int `json:"code"`
  289. Msg string `json:"msg"`
  290. Data model.StopTask `json:"data"`
  291. }{}
  292. req := common.GetRestyRequest(common.TIMEOUT)
  293. _, err = req.
  294. SetHeader("Content-Type", "application/json").
  295. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  296. SetBody(&param).
  297. SetResult(&resp).
  298. Post(taskStopUrl)
  299. if err != nil {
  300. return err
  301. }
  302. if resp.Code != http.StatusOK {
  303. return errors.New("failed to stop")
  304. }
  305. return nil
  306. }
  307. func (o *OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
  308. return nil, errors.New("failed to implement")
  309. }
  310. func (o *OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
  311. return nil, errors.New("failed to implement")
  312. }
  313. func (o *OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
  314. return false
  315. }
  316. func (o *OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
  317. err := o.Stop(ctx, id)
  318. if err != nil {
  319. return false
  320. }
  321. return true
  322. }
  323. func (o *OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
  324. task, err := o.getTrainingTask(ctx, id)
  325. if err != nil {
  326. return nil, err
  327. }
  328. description := task.Data.Task.Description
  329. //从描述中解析出repoName
  330. codePaths := strings.SplitN(description, FORWARD_SLASH, 3)
  331. if len(codePaths) != 3 {
  332. return nil, fmt.Errorf("algorithmId %s format is incorrect", description)
  333. }
  334. repoName := codePaths[0]
  335. var resp inference.DeployInstance
  336. resp.InstanceId = id
  337. resp.InstanceName = task.Data.Task.DisplayJobName
  338. resp.ModelName = task.Data.Task.PretrainModelName
  339. resp.ModelType = ""
  340. resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType
  341. resp.ClusterName = o.platform
  342. resp.ClusterType = TYPE_OPENI
  343. //获取在线推理url
  344. var inferUrl string
  345. if task.Data.Task.Status == "RUNNING" {
  346. inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName)
  347. if err != nil {
  348. return nil, err
  349. }
  350. }
  351. resp.InferUrl = inferUrl
  352. resp.Status = task.Data.Task.Status
  353. resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout)
  354. log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp)
  355. return &resp, nil
  356. }
  357. func (o *OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
  358. var repoName string
  359. codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
  360. if len(codePaths) != 3 {
  361. return "", fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
  362. }
  363. repoName = codePaths[0]
  364. spec := &ResourceSpecOpenI{}
  365. for _, res := range option.ResourcesRequired {
  366. typeName, ok := res["type"]
  367. if !ok {
  368. continue
  369. }
  370. name, ok := res["name"]
  371. if !ok {
  372. continue
  373. }
  374. for _, s := range ComputeSource {
  375. switch typeName {
  376. case s:
  377. num, ok := res["number"]
  378. if !ok {
  379. continue
  380. }
  381. n := openIcom.ConvertTypeToString(num)
  382. val, err := strconv.ParseInt(n, 10, 64)
  383. if err != nil {
  384. return "", err
  385. }
  386. spec.ResType = s
  387. spec.Name = openIcom.ConvertTypeToString(name)
  388. spec.Number = val
  389. break
  390. }
  391. }
  392. }
  393. if spec.ResType == "" || spec.Name == "" {
  394. return "", errors.New("resource spec not found")
  395. }
  396. creationRequirelUrl := o.host + CreationRequirelUrl
  397. param := model.TaskCreationRequiredParam{
  398. UserName: o.userName,
  399. RepoName: repoName,
  400. JobType: ONLINEINFERENCE,
  401. ComputeSource: spec.ResType,
  402. ClusterType: C2NET,
  403. }
  404. b, _ := json.Marshal(param)
  405. byt := bytes.NewBuffer(b)
  406. resp := struct {
  407. Code int `json:"code"`
  408. Msg string `json:"msg"`
  409. Data model.TaskCreationRequired `json:"data"`
  410. }{}
  411. req := common.GetRestyRequest(common.TIMEOUT)
  412. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  413. req.RawRequest = r
  414. req.URL = creationRequirelUrl
  415. _, err := req.
  416. SetHeader("Content-Type", "application/json").
  417. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  418. SetBody(byt).
  419. SetResult(&resp).
  420. Send()
  421. if err != nil {
  422. return "", errors.New("failed to invoke TaskCreationRequired")
  423. }
  424. if len(resp.Data.Data.Specs.All) == 0 {
  425. return "", errors.New("TaskCreationRequired specs are empty")
  426. }
  427. for _, s := range resp.Data.Data.Specs.All {
  428. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  429. if int(spec.Number) == s.AccCardsNum {
  430. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  431. break
  432. }
  433. }
  434. }
  435. if option.ResourceId == "" {
  436. return "", errors.New("can not find spec Id")
  437. }
  438. option.ComputeCard = spec.Name
  439. task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID)
  440. if err != nil {
  441. return "", err
  442. }
  443. ma, err := jsoniter.Marshal(task)
  444. if err != nil {
  445. return "", err
  446. }
  447. taskId := jsoniter.Get(ma, "data").Get("id").ToString()
  448. return taskId, nil
  449. }
  450. func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) {
  451. taskCreatelUrl := o.host + TaskCreatelUrl
  452. var repoName string
  453. var branchName string
  454. var bootFile string
  455. //从描述中解析出repoName
  456. codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
  457. if len(codePaths) != 3 {
  458. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  459. }
  460. specs := strings.Split(resourceId, FORWARD_SLASH)
  461. specId, err := strconv.ParseInt(specs[0], 10, 0)
  462. if err != nil {
  463. return nil, err
  464. }
  465. computeSource := specs[1]
  466. repoName = codePaths[0]
  467. branchName = codePaths[1]
  468. bootFile = strings.Join(codePaths[2:], "/")
  469. log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile)
  470. //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"
  471. // choose imageId and imageUrl
  472. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  473. if err != nil {
  474. return nil, err
  475. }
  476. taskParam := &model.CreateTaskParam{
  477. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  478. JobType: ONLINEINFERENCE,
  479. Cluster: C2NET,
  480. DisplayJobName: ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10),
  481. ComputeSource: computeSource,
  482. SpecId: int(specId),
  483. BranchName: branchName,
  484. ImageId: imgId,
  485. ImageUrl: imgUrl,
  486. PretrainModelIdStr: modelId,
  487. BootFile: bootFile,
  488. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  489. WorkServerNumber: 1, // 运行节点数
  490. }
  491. param := model.CreateTaskReq{
  492. UserName: o.userName,
  493. RepoName: repoName,
  494. CreateTaskParam: taskParam,
  495. }
  496. resp := struct {
  497. Code int `json:"code"`
  498. Msg string `json:"msg"`
  499. Data model.CreateTask `json:"data"`
  500. }{}
  501. req := common.GetRestyRequest(common.TIMEOUT)
  502. _, err = req.
  503. SetHeader("Content-Type", "application/json").
  504. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  505. SetBody(&param).
  506. SetResult(&resp).
  507. Post(taskCreatelUrl)
  508. if err != nil {
  509. return nil, err
  510. }
  511. if resp.Code != http.StatusOK {
  512. return nil, errors.New(resp.Msg)
  513. }
  514. if resp.Data.Data.Id == 0 {
  515. return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg)
  516. }
  517. return resp.Data, nil
  518. }
  519. func (o *OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
  520. return false
  521. }
  522. func (o *OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
  523. return "", errors.New("failed to implement")
  524. }
  525. func (o *OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
  526. return nil, errors.New("failed to implement")
  527. }
  528. func (o *OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
  529. return nil, errors.New("failed to implement")
  530. }
  531. func (o *OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
  532. return nil, errors.New("failed to implement")
  533. }
  534. func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
  535. task, err := o.getTrainingTask(ctx, taskId)
  536. if err != nil {
  537. return "", err
  538. }
  539. codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
  540. if len(codePaths) != 3 {
  541. return "", errors.New("failed to get log, openI desc not set")
  542. }
  543. repoName := codePaths[0]
  544. tasklogurl := o.host + TaskLogUrl
  545. param := model.GetLogParam{
  546. UserName: o.userName,
  547. RepoName: repoName,
  548. Id: taskId,
  549. }
  550. b, _ := json.Marshal(param)
  551. byt := bytes.NewBuffer(b)
  552. resp := struct {
  553. Code int `json:"code"`
  554. Msg string `json:"msg"`
  555. Data string `json:"data"`
  556. }{}
  557. req := common.GetRestyRequest(common.TIMEOUT)
  558. r, _ := http.NewRequest("GET", tasklogurl, byt)
  559. req.RawRequest = r
  560. req.URL = tasklogurl
  561. _, err = req.
  562. SetHeader("Content-Type", "application/json").
  563. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  564. SetBody(byt).
  565. SetResult(&resp).
  566. Send()
  567. if err != nil {
  568. return "", err
  569. }
  570. if resp.Data == "" {
  571. return "waiting for logs", nil
  572. }
  573. return resp.Data, nil
  574. }
  575. func (o *OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) {
  576. taskDetailsUrl := o.host + TaskDetailsUrl
  577. param := model.TaskDetailParam{
  578. UserName: o.userName,
  579. RepoName: TESTREPO,
  580. Id: taskId,
  581. }
  582. b, _ := json.Marshal(param)
  583. byt := bytes.NewBuffer(b)
  584. resp := struct {
  585. Code int `json:"code"`
  586. Msg string `json:"msg"`
  587. Data model.TaskDetail `json:"data"`
  588. }{}
  589. req := common.GetRestyRequest(common.TIMEOUT)
  590. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  591. req.RawRequest = r
  592. req.URL = taskDetailsUrl
  593. _, err := req.
  594. SetHeader("Content-Type", "application/json").
  595. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  596. SetBody(byt).
  597. SetResult(&resp).
  598. Send()
  599. if err != nil {
  600. return nil, errors.New("failed to invoke taskDetails")
  601. }
  602. if resp.Data.Code != 0 && resp.Data.Msg != "" {
  603. return nil, errors.New(resp.Data.Msg)
  604. }
  605. return &resp.Data, nil
  606. }
  607. func (o *OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
  608. task, err := o.getTrainingTask(ctx, taskId)
  609. if err != nil {
  610. return nil, err
  611. }
  612. var resp collector.Task
  613. resp.Id = strconv.Itoa(task.Data.Task.Id)
  614. if task.Data.Task.StartTime != 0 {
  615. resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout)
  616. }
  617. if task.Data.Task.EndTime != 0 {
  618. resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout)
  619. }
  620. switch task.Data.Task.Status {
  621. case "SUCCEEDED":
  622. resp.Status = constants.Completed
  623. case "FAILED":
  624. resp.Status = constants.Failed
  625. case "CREATED_FAILED":
  626. resp.Status = constants.Failed
  627. case "RUNNING":
  628. resp.Status = constants.Running
  629. case "STOPPED":
  630. resp.Status = constants.Stopped
  631. case "PENDING":
  632. resp.Status = constants.Pending
  633. case "WAITING":
  634. resp.Status = constants.Waiting
  635. default:
  636. resp.Status = "undefined"
  637. }
  638. return &resp, nil
  639. }
  640. func (o *OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
  641. return "", errors.New("failed to implement")
  642. }
  643. func (o *OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
  644. return errors.New("failed to implement")
  645. }
  646. func (o *OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
  647. return nil, errors.New("failed to implement")
  648. }
  649. func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
  650. return 0, errors.New("failed to implement")
  651. }
  652. func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
  653. var jobType string
  654. if resrcType == "Inference" {
  655. jobType = ONLINEINFERENCE
  656. } else if resrcType == "Train" {
  657. jobType = TRAIN
  658. }
  659. var resources []interface{}
  660. res := &collector.ResourceSpec{
  661. ClusterId: strconv.FormatInt(o.participantId, 10),
  662. Tag: resrcType,
  663. }
  664. //clres := &collector.ClusterResource{}
  665. creationRequirelUrl := o.host + CreationRequirelUrl
  666. reposUrl := o.host + ReposUrl
  667. taskListUrl := o.host + TaskListUrl
  668. var wg sync.WaitGroup
  669. var ch = make(chan *collector.ClusterResource)
  670. var once sync.Once
  671. wg.Add(2)
  672. go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl)
  673. go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl)
  674. go func() {
  675. wg.Wait()
  676. close(ch)
  677. }()
  678. for v := range ch {
  679. resources = append(resources, v)
  680. }
  681. res.Resources = resources
  682. return res, nil
  683. }
  684. func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) {
  685. defer wg.Done()
  686. for c := range ComputeSource {
  687. wg.Add(1)
  688. i := c
  689. go func() {
  690. defer wg.Done()
  691. param := model.TaskCreationRequiredParam{
  692. UserName: o.userName,
  693. RepoName: TESTREPO,
  694. JobType: jobType,
  695. ComputeSource: ComputeSource[i],
  696. ClusterType: C2NET,
  697. }
  698. b, _ := json.Marshal(param)
  699. byt := bytes.NewBuffer(b)
  700. resp := struct {
  701. Code int `json:"code"`
  702. Msg string `json:"msg"`
  703. Data model.TaskCreationRequired `json:"data"`
  704. }{}
  705. req := common.GetRestyRequest(common.TIMEOUT)
  706. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  707. req.RawRequest = r
  708. req.URL = creationRequirelUrl
  709. _, err := req.
  710. SetHeader("Content-Type", "application/json").
  711. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  712. SetBody(byt).
  713. SetResult(&resp).
  714. Send()
  715. if err != nil {
  716. return
  717. }
  718. if len(resp.Data.Data.Specs.All) == 0 {
  719. return
  720. }
  721. // balance
  722. var balanceCheck = func() {
  723. balance := resp.Data.Data.PointAccount.Balance
  724. bal := &collector.Usage{}
  725. bal.Type = strings.ToUpper(BALANCE)
  726. bal.Total = &collector.UnitValue{
  727. Unit: POINT,
  728. Value: balance,
  729. }
  730. ch <- &collector.ClusterResource{Resource: bal}
  731. //rate
  732. var v float64
  733. v = 1
  734. rate := &collector.Usage{
  735. Type: strings.ToUpper(RATE),
  736. Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
  737. }
  738. ch <- &collector.ClusterResource{Resource: rate}
  739. }
  740. once.Do(balanceCheck)
  741. m := make(map[string]struct {
  742. Id int `json:"id"`
  743. AccCardsNum int `json:"acc_cards_num"`
  744. AccCardType string `json:"acc_card_type"`
  745. CpuCores int `json:"cpu_cores"`
  746. MemGiB int `json:"mem_gi_b"`
  747. GpuMemGiB int `json:"gpu_mem_gi_b"`
  748. ShareMemGiB int `json:"share_mem_gi_b"`
  749. ComputeResource string `json:"compute_resource"`
  750. UnitPrice int `json:"unit_price"`
  751. SourceSpecId string `json:"source_spec_id"`
  752. HasInternet int `json:"has_internet"`
  753. EnableVisualization bool `json:"enable_visualization"`
  754. })
  755. for _, s := range resp.Data.Data.Specs.All {
  756. e, ok := m[s.AccCardType]
  757. if ok {
  758. if s.AccCardsNum > e.AccCardsNum {
  759. m[s.AccCardType] = s
  760. }
  761. } else {
  762. m[s.AccCardType] = s
  763. }
  764. }
  765. for k, v := range m {
  766. bres := make([]*collector.Usage, 0)
  767. cres := &collector.ClusterResource{}
  768. card := &collector.Usage{
  769. Type: ComputeSource[i],
  770. Name: strings.ToUpper(k),
  771. Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  772. Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  773. }
  774. cpu := &collector.Usage{
  775. Type: strings.ToUpper(CPU),
  776. Name: strings.ToUpper(CPU),
  777. Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  778. Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  779. }
  780. mem := &collector.Usage{
  781. Type: strings.ToUpper(MEMORY),
  782. Name: strings.ToUpper(RAM),
  783. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  784. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  785. }
  786. vmem := &collector.Usage{
  787. Type: strings.ToUpper(MEMORY),
  788. Name: strings.ToUpper(VRAM),
  789. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  790. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  791. }
  792. //storage
  793. var s float64
  794. s = 1024
  795. storage := &collector.Usage{}
  796. storage.Type = STORAGE
  797. storage.Name = DISK
  798. storage.Total = &collector.UnitValue{
  799. Unit: GIGABYTE,
  800. Value: s,
  801. }
  802. storage.Available = &collector.UnitValue{
  803. Unit: GIGABYTE,
  804. Value: s,
  805. }
  806. bres = append(bres, storage)
  807. bres = append(bres, cpu)
  808. bres = append(bres, mem)
  809. bres = append(bres, vmem)
  810. cres.Resource = card
  811. cres.BaseResources = bres
  812. ch <- cres
  813. }
  814. }()
  815. }
  816. }
  817. func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) {
  818. defer wg.Done()
  819. reporesp := struct {
  820. Code int `json:"code"`
  821. Msg string `json:"msg"`
  822. Data []model.Repo `json:"data"`
  823. }{}
  824. reporeq := common.GetRestyRequest(common.TIMEOUT)
  825. repor, _ := http.NewRequest("GET", reposUrl, nil)
  826. reporeq.RawRequest = repor
  827. reporeq.URL = reposUrl
  828. _, err := reporeq.
  829. SetHeader("Content-Type", "application/json").
  830. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  831. SetResult(&reporesp).
  832. Send()
  833. if err != nil {
  834. return
  835. }
  836. if len(reporesp.Data) == 0 {
  837. return
  838. }
  839. // tasklist
  840. var runningJobs atomic.Int64
  841. var jwg sync.WaitGroup
  842. var errs []error
  843. var ech = make(chan error)
  844. jwg.Add(1)
  845. go func() {
  846. defer jwg.Done()
  847. for _, datum := range reporesp.Data {
  848. jwg.Add(1)
  849. dat := datum
  850. go func() {
  851. defer jwg.Done()
  852. param := model.TaskListParam{
  853. UserName: o.userName,
  854. RepoName: dat.Name,
  855. }
  856. b, _ := json.Marshal(param)
  857. byt := bytes.NewBuffer(b)
  858. resp := struct {
  859. Code int `json:"code"`
  860. Msg string `json:"msg"`
  861. Data model.TaskList `json:"data"`
  862. }{}
  863. req := common.GetRestyRequest(common.TIMEOUT)
  864. r, _ := http.NewRequest("GET", taskListUrl, byt)
  865. req.RawRequest = r
  866. req.URL = taskListUrl
  867. _, err := req.
  868. SetHeader("Content-Type", "application/json").
  869. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  870. SetBody(byt).
  871. SetResult(&resp).
  872. Send()
  873. if err != nil {
  874. // assume occupied running tasks
  875. ech <- err
  876. return
  877. }
  878. if len(resp.Data.Data.Tasks) == 0 {
  879. return
  880. }
  881. for _, task := range resp.Data.Data.Tasks {
  882. if task.Task.Status == RUNNING {
  883. runningJobs.Add(1)
  884. }
  885. }
  886. }()
  887. }
  888. }()
  889. go func() {
  890. jwg.Wait()
  891. close(ech)
  892. }()
  893. for v := range ech {
  894. errs = append(errs, v)
  895. }
  896. // running tasks num
  897. var runningNum int64
  898. runningNum = runningJobs.Load()
  899. run := &collector.Usage{}
  900. run.Type = strings.ToUpper(RUNNINGTASK)
  901. if len(errs) == 0 {
  902. run.Total = &collector.UnitValue{
  903. Unit: NUMBER,
  904. Value: runningNum,
  905. }
  906. ch <- &collector.ClusterResource{Resource: run}
  907. } else {
  908. runningNum = int64(len(errs)) * 4
  909. run.Total = &collector.UnitValue{
  910. Unit: NUMBER,
  911. Value: runningNum,
  912. }
  913. ch <- &collector.ClusterResource{Resource: run}
  914. }
  915. }
  916. func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {
  917. taskDetailsUrl := o.host + TaskOnlineInferUrl
  918. param := model.TaskDetailParam{
  919. UserName: o.userName,
  920. RepoName: repoName,
  921. Id: taskId,
  922. }
  923. b, _ := json.Marshal(param)
  924. byt := bytes.NewBuffer(b)
  925. resp := model.SelfEndpointUrlResp{}
  926. req := common.GetRestyRequest(common.TIMEOUT)
  927. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  928. req.RawRequest = r
  929. req.URL = taskDetailsUrl
  930. _, err := req.
  931. SetHeader("Content-Type", "application/json").
  932. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  933. SetBody(byt).
  934. SetResult(&resp).
  935. Send()
  936. if err != nil {
  937. return "", err
  938. }
  939. if resp.Code != http.StatusOK {
  940. return "", errors.New(resp.Msg)
  941. }
  942. return resp.Data.Url, nil
  943. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.