You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

openi.go 28 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
  1. package storeLink
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/json-iterator/go"
  9. "github.com/rs/zerolog/log"
  10. openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  17. "gitlink.org.cn/JointCloud/pcm-openi/common"
  18. "gitlink.org.cn/JointCloud/pcm-openi/model"
  19. "mime/multipart"
  20. "net/http"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. )
  28. const (
  29. DEBUG = "DEBUG"
  30. TRAIN = "TRAIN"
  31. INFERENCE = "INFERENCE"
  32. C2NET = "C2Net"
  33. TESTREPO = "testrepo"
  34. ONLINEINFERENCE = "ONLINEINFERENCE" //online inference
  35. )
  36. const (
  37. CreationRequirelUrl = "/api/v1/task/creationRequired"
  38. TaskCreatelUrl = "/api/v1/task/create"
  39. ReposUrl = "/api/v1/user/repos"
  40. TaskListUrl = "/api/v1/task/list"
  41. TaskDetailsUrl = "/api/v1/task/detail"
  42. TaskLogUrl = "/api/v1/task/log"
  43. TaskStopUrl = "/api/v1/task/stop"
  44. TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl"
  45. )
  46. // compute source
  47. var (
  48. ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
  49. )
  50. type ResourceSpecOpenI struct {
  51. ResType string
  52. Name string
  53. Number int64
  54. }
  55. type OpenI struct {
  56. participantId int64
  57. platform string
  58. host string
  59. userName string
  60. accessToken string
  61. }
  62. func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI {
  63. return &OpenI{
  64. host: host,
  65. participantId: id,
  66. userName: name,
  67. accessToken: token,
  68. platform: platform,
  69. }
  70. }
  71. func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
  72. switch mode {
  73. case executor.SUBMIT_MODE_JOINT_CLOUD:
  74. case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
  75. var repoName string
  76. codePaths := strings.Split(option.AlgorithmId, FORWARD_SLASH)
  77. if len(codePaths) != 3 {
  78. return nil, errors.New("algorithmId format is incorrect")
  79. }
  80. repoName = codePaths[0]
  81. spec := &ResourceSpecOpenI{}
  82. for _, res := range option.ResourcesRequired {
  83. typeName, ok := res["type"]
  84. if !ok {
  85. continue
  86. }
  87. name, ok := res["name"]
  88. if !ok {
  89. continue
  90. }
  91. for _, s := range ComputeSource {
  92. switch typeName {
  93. case s:
  94. num, ok := res["number"]
  95. if !ok {
  96. continue
  97. }
  98. n := openIcom.ConvertTypeToString(num)
  99. val, err := strconv.ParseInt(n, 10, 64)
  100. if err != nil {
  101. return nil, err
  102. }
  103. spec.ResType = s
  104. spec.Name = openIcom.ConvertTypeToString(name)
  105. spec.Number = val
  106. break
  107. }
  108. }
  109. }
  110. if spec.ResType == "" || spec.Name == "" {
  111. return nil, errors.New("resource spec not found")
  112. }
  113. creationRequirelUrl := o.host + CreationRequirelUrl
  114. param := model.TaskCreationRequiredParam{
  115. UserName: o.userName,
  116. RepoName: repoName,
  117. JobType: TRAIN,
  118. ComputeSource: spec.ResType,
  119. ClusterType: C2NET,
  120. }
  121. b, _ := json.Marshal(param)
  122. byt := bytes.NewBuffer(b)
  123. resp := struct {
  124. Code int `json:"code"`
  125. Msg string `json:"msg"`
  126. Data model.TaskCreationRequired `json:"data"`
  127. }{}
  128. req := common.GetRestyRequest(common.TIMEOUT)
  129. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  130. req.RawRequest = r
  131. req.URL = creationRequirelUrl
  132. _, err := req.
  133. SetHeader("Content-Type", "application/json").
  134. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  135. SetBody(byt).
  136. SetResult(&resp).
  137. Send()
  138. if err != nil {
  139. return nil, errors.New("failed to invoke TaskCreationRequired")
  140. }
  141. if len(resp.Data.Data.Specs.All) == 0 {
  142. return nil, errors.New("TaskCreationRequired specs are empty")
  143. }
  144. for _, s := range resp.Data.Data.Specs.All {
  145. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  146. if int(spec.Number) == s.AccCardsNum {
  147. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  148. break
  149. }
  150. }
  151. }
  152. if option.ResourceId == "" {
  153. return nil, errors.New("can not find spec Id")
  154. }
  155. option.ComputeCard = spec.Name
  156. }
  157. task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
  158. if err != nil {
  159. return nil, err
  160. }
  161. return task, nil
  162. }
  163. func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
  164. taskCreatelUrl := o.host + TaskCreatelUrl
  165. var repoName string
  166. var branchName string
  167. var bootFile string
  168. codePaths := strings.Split(algorithmId, FORWARD_SLASH)
  169. if len(codePaths) != 3 {
  170. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  171. }
  172. specs := strings.Split(resourceId, FORWARD_SLASH)
  173. specId, err := strconv.ParseInt(specs[0], 10, 0)
  174. if err != nil {
  175. return nil, err
  176. }
  177. computeSource := specs[1]
  178. repoName = codePaths[0]
  179. branchName = codePaths[1]
  180. bootFile = codePaths[2]
  181. // params
  182. var parameters struct {
  183. Parameter []struct {
  184. Label string `json:"label"`
  185. Value string `json:"value"`
  186. } `json:"parameter"`
  187. }
  188. for _, param := range params {
  189. s := strings.Split(param, COMMA)
  190. st := struct {
  191. Label string `json:"label"`
  192. Value string `json:"value"`
  193. }{
  194. Label: s[0],
  195. Value: s[1],
  196. }
  197. parameters.Parameter = append(parameters.Parameter, st)
  198. }
  199. paramStr, _ := json.Marshal(parameters)
  200. // choose imageId and imageUrl
  201. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  202. if err != nil {
  203. return nil, err
  204. }
  205. taskParam := &model.CreateTaskParam{
  206. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  207. JobType: TRAIN,
  208. Cluster: C2NET,
  209. DisplayJobName: TRAIN + UNDERSCORE + utils.RandomString(10),
  210. ComputeSource: computeSource,
  211. SpecId: int(specId),
  212. BranchName: branchName,
  213. ImageId: imgId,
  214. ImageUrl: imgUrl,
  215. DatasetUuidStr: datasetsId,
  216. Params: string(paramStr),
  217. BootFile: bootFile,
  218. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  219. WorkServerNumber: 1, // 运行节点数
  220. }
  221. param := model.CreateTaskReq{
  222. UserName: o.userName,
  223. RepoName: repoName,
  224. CreateTaskParam: taskParam,
  225. }
  226. resp := struct {
  227. Code int `json:"code"`
  228. Msg string `json:"msg"`
  229. Data model.CreateTask `json:"data"`
  230. }{}
  231. req := common.GetRestyRequest(common.TIMEOUT)
  232. _, err = req.
  233. SetHeader("Content-Type", "application/json").
  234. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  235. SetBody(&param).
  236. SetResult(&resp).
  237. Post(taskCreatelUrl)
  238. if err != nil {
  239. return nil, err
  240. }
  241. if resp.Code != 200 {
  242. return nil, errors.New(resp.Msg)
  243. }
  244. if resp.Data.Code != 0 {
  245. return nil, errors.New(resp.Msg)
  246. }
  247. if (resp.Data == model.CreateTask{}) {
  248. return nil, errors.New("failed to submit task")
  249. }
  250. return resp.Data, nil
  251. }
  252. func swapImageIdAndImageUrl(imageId string) (string, string, error) {
  253. if imageId == "" {
  254. return "", "", errors.New("imageId is empty")
  255. }
  256. var imgId string
  257. var imgUrl string
  258. parsedURL, err := url.Parse("http://" + imageId)
  259. if err != nil {
  260. return "", "", err
  261. }
  262. if utils.IsValidHostAddress(parsedURL.Host) {
  263. imgId = ""
  264. imgUrl = imageId
  265. } else {
  266. imgId = imageId
  267. imgUrl = ""
  268. }
  269. return imgId, imgUrl, nil
  270. }
  271. func (o OpenI) Stop(ctx context.Context, id string) error {
  272. task, err := o.getTrainingTask(ctx, id)
  273. if err != nil {
  274. return err
  275. }
  276. codePaths := strings.Split(task.Data.Task.Description, FORWARD_SLASH)
  277. if len(codePaths) != 3 {
  278. return errors.New("failed to stop, openI desc not set")
  279. }
  280. repoName := codePaths[0]
  281. taskStopUrl := o.host + TaskStopUrl
  282. param := model.StopTaskParam{
  283. UserName: o.userName,
  284. RepoName: repoName,
  285. Id: id,
  286. }
  287. resp := struct {
  288. Code int `json:"code"`
  289. Msg string `json:"msg"`
  290. Data model.StopTask `json:"data"`
  291. }{}
  292. req := common.GetRestyRequest(common.TIMEOUT)
  293. _, err = req.
  294. SetHeader("Content-Type", "application/json").
  295. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  296. SetBody(&param).
  297. SetResult(&resp).
  298. Post(taskStopUrl)
  299. if err != nil {
  300. return err
  301. }
  302. if resp.Code != http.StatusOK {
  303. return errors.New("failed to stop")
  304. }
  305. return nil
  306. }
  307. func (o OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
  308. return nil, errors.New("failed to implement")
  309. }
  310. func (o OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
  311. return nil, errors.New("failed to implement")
  312. }
  313. func (o OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
  314. return false
  315. }
  316. func (o OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
  317. return false
  318. }
  319. func (o OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
  320. task, err := o.getTrainingTask(ctx, id)
  321. if err != nil {
  322. return nil, err
  323. }
  324. description := task.Data.Task.Description
  325. codePaths := strings.Split(description, FORWARD_SLASH)
  326. //从描述中解析出repoName
  327. if len(codePaths) < 3 {
  328. return nil, errors.New("description format is incorrect")
  329. }
  330. repoName := codePaths[0]
  331. var resp inference.DeployInstance
  332. resp.InstanceId = id
  333. resp.InstanceName = task.Data.Task.DisplayJobName
  334. resp.ModelName = task.Data.Task.PretrainModelName
  335. resp.ModelType = ""
  336. resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType
  337. resp.ClusterName = o.platform
  338. resp.ClusterType = TYPE_OPENI
  339. //获取在线推理url
  340. var inferUrl string
  341. if task.Data.Task.Status == "RUNNING" {
  342. inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName)
  343. if err != nil {
  344. return nil, err
  345. }
  346. }
  347. resp.InferUrl = inferUrl
  348. resp.Status = task.Data.Task.Status
  349. resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout)
  350. log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp)
  351. return &resp, nil
  352. }
  353. func (o OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
  354. var repoName string
  355. codePaths := strings.Split(option.AlgorithmId, FORWARD_SLASH)
  356. if len(codePaths) < 3 {
  357. return "", errors.New("algorithmId format is incorrect")
  358. }
  359. repoName = codePaths[0]
  360. spec := &ResourceSpecOpenI{}
  361. for _, res := range option.ResourcesRequired {
  362. typeName, ok := res["type"]
  363. if !ok {
  364. continue
  365. }
  366. name, ok := res["name"]
  367. if !ok {
  368. continue
  369. }
  370. for _, s := range ComputeSource {
  371. switch typeName {
  372. case s:
  373. num, ok := res["number"]
  374. if !ok {
  375. continue
  376. }
  377. n := openIcom.ConvertTypeToString(num)
  378. val, err := strconv.ParseInt(n, 10, 64)
  379. if err != nil {
  380. return "", err
  381. }
  382. spec.ResType = s
  383. spec.Name = openIcom.ConvertTypeToString(name)
  384. spec.Number = val
  385. break
  386. }
  387. }
  388. }
  389. if spec.ResType == "" || spec.Name == "" {
  390. return "", errors.New("resource spec not found")
  391. }
  392. creationRequirelUrl := o.host + CreationRequirelUrl
  393. param := model.TaskCreationRequiredParam{
  394. UserName: o.userName,
  395. RepoName: repoName,
  396. JobType: ONLINEINFERENCE,
  397. ComputeSource: spec.ResType,
  398. ClusterType: C2NET,
  399. }
  400. b, _ := json.Marshal(param)
  401. byt := bytes.NewBuffer(b)
  402. resp := struct {
  403. Code int `json:"code"`
  404. Msg string `json:"msg"`
  405. Data model.TaskCreationRequired `json:"data"`
  406. }{}
  407. req := common.GetRestyRequest(common.TIMEOUT)
  408. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  409. req.RawRequest = r
  410. req.URL = creationRequirelUrl
  411. _, err := req.
  412. SetHeader("Content-Type", "application/json").
  413. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  414. SetBody(byt).
  415. SetResult(&resp).
  416. Send()
  417. if err != nil {
  418. return "", errors.New("failed to invoke TaskCreationRequired")
  419. }
  420. if len(resp.Data.Data.Specs.All) == 0 {
  421. return "", errors.New("TaskCreationRequired specs are empty")
  422. }
  423. for _, s := range resp.Data.Data.Specs.All {
  424. if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
  425. if int(spec.Number) == s.AccCardsNum {
  426. option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
  427. break
  428. }
  429. }
  430. }
  431. if option.ResourceId == "" {
  432. return "", errors.New("can not find spec Id")
  433. }
  434. option.ComputeCard = spec.Name
  435. task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID)
  436. if err != nil {
  437. return "", err
  438. }
  439. ma, err := jsoniter.Marshal(task)
  440. if err != nil {
  441. return "", err
  442. }
  443. taskId := jsoniter.Get(ma, "data").Get("id").ToString()
  444. return taskId, nil
  445. }
  446. func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) {
  447. taskCreatelUrl := o.host + TaskCreatelUrl
  448. var repoName string
  449. var branchName string
  450. var bootFile string
  451. codePaths := strings.Split(algorithmId, FORWARD_SLASH)
  452. if len(codePaths) < 3 {
  453. return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
  454. }
  455. specs := strings.Split(resourceId, FORWARD_SLASH)
  456. specId, err := strconv.ParseInt(specs[0], 10, 0)
  457. if err != nil {
  458. return nil, err
  459. }
  460. computeSource := specs[1]
  461. repoName = codePaths[0]
  462. branchName = codePaths[1]
  463. bootFile = strings.Join(codePaths[2:], "/")
  464. log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile)
  465. //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"
  466. // choose imageId and imageUrl
  467. imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
  468. if err != nil {
  469. return nil, err
  470. }
  471. taskParam := &model.CreateTaskParam{
  472. Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
  473. JobType: ONLINEINFERENCE,
  474. Cluster: C2NET,
  475. DisplayJobName: ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10),
  476. ComputeSource: computeSource,
  477. SpecId: int(specId),
  478. BranchName: branchName,
  479. ImageId: imgId,
  480. ImageUrl: imgUrl,
  481. PretrainModelIdStr: modelId,
  482. BootFile: bootFile,
  483. HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
  484. WorkServerNumber: 1, // 运行节点数
  485. }
  486. param := model.CreateTaskReq{
  487. UserName: o.userName,
  488. RepoName: repoName,
  489. CreateTaskParam: taskParam,
  490. }
  491. resp := struct {
  492. Code int `json:"code"`
  493. Msg string `json:"msg"`
  494. Data model.CreateTask `json:"data"`
  495. }{}
  496. req := common.GetRestyRequest(common.TIMEOUT)
  497. _, err = req.
  498. SetHeader("Content-Type", "application/json").
  499. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  500. SetBody(&param).
  501. SetResult(&resp).
  502. Post(taskCreatelUrl)
  503. if err != nil {
  504. return nil, err
  505. }
  506. if resp.Code != http.StatusOK {
  507. return nil, errors.New(resp.Msg)
  508. }
  509. if resp.Data.Data.Id == 0 {
  510. return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg)
  511. }
  512. return resp.Data, nil
  513. }
  514. func (o OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
  515. return false
  516. }
  517. func (o OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
  518. return "", errors.New("failed to implement")
  519. }
  520. func (o OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
  521. return nil, errors.New("failed to implement")
  522. }
  523. func (o OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
  524. return nil, errors.New("failed to implement")
  525. }
  526. func (o OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
  527. return nil, errors.New("failed to implement")
  528. }
  529. func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
  530. task, err := o.getTrainingTask(ctx, taskId)
  531. if err != nil {
  532. return "", err
  533. }
  534. codePaths := strings.Split(task.Data.Task.Description, FORWARD_SLASH)
  535. if len(codePaths) != 3 {
  536. return "", errors.New("failed to get log, openI desc not set")
  537. }
  538. repoName := codePaths[0]
  539. tasklogurl := o.host + TaskLogUrl
  540. param := model.GetLogParam{
  541. UserName: o.userName,
  542. RepoName: repoName,
  543. Id: taskId,
  544. }
  545. b, _ := json.Marshal(param)
  546. byt := bytes.NewBuffer(b)
  547. resp := struct {
  548. Code int `json:"code"`
  549. Msg string `json:"msg"`
  550. Data string `json:"data"`
  551. }{}
  552. req := common.GetRestyRequest(common.TIMEOUT)
  553. r, _ := http.NewRequest("GET", tasklogurl, byt)
  554. req.RawRequest = r
  555. req.URL = tasklogurl
  556. _, err = req.
  557. SetHeader("Content-Type", "application/json").
  558. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  559. SetBody(byt).
  560. SetResult(&resp).
  561. Send()
  562. if err != nil {
  563. return "", errors.New("failed to invoke taskDetails")
  564. }
  565. if resp.Data == "" {
  566. return "waiting for logs", nil
  567. }
  568. return resp.Data, nil
  569. }
  570. func (o OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) {
  571. taskDetailsUrl := o.host + TaskDetailsUrl
  572. param := model.TaskDetailParam{
  573. UserName: o.userName,
  574. RepoName: TESTREPO,
  575. Id: taskId,
  576. }
  577. b, _ := json.Marshal(param)
  578. byt := bytes.NewBuffer(b)
  579. resp := struct {
  580. Code int `json:"code"`
  581. Msg string `json:"msg"`
  582. Data model.TaskDetail `json:"data"`
  583. }{}
  584. req := common.GetRestyRequest(common.TIMEOUT)
  585. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  586. req.RawRequest = r
  587. req.URL = taskDetailsUrl
  588. _, err := req.
  589. SetHeader("Content-Type", "application/json").
  590. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  591. SetBody(byt).
  592. SetResult(&resp).
  593. Send()
  594. if err != nil {
  595. return nil, errors.New("failed to invoke taskDetails")
  596. }
  597. if resp.Data.Code != 0 && resp.Data.Msg != "" {
  598. return nil, errors.New(resp.Data.Msg)
  599. }
  600. return &resp.Data, nil
  601. }
  602. func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
  603. task, err := o.getTrainingTask(ctx, taskId)
  604. if err != nil {
  605. return nil, err
  606. }
  607. var resp collector.Task
  608. resp.Id = strconv.Itoa(task.Data.Task.Id)
  609. if task.Data.Task.StartTime != 0 {
  610. resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout)
  611. }
  612. if task.Data.Task.EndTime != 0 {
  613. resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout)
  614. }
  615. switch task.Data.Task.Status {
  616. case "SUCCEEDED":
  617. resp.Status = constants.Completed
  618. case "FAILED":
  619. resp.Status = constants.Failed
  620. case "CREATED_FAILED":
  621. resp.Status = constants.Failed
  622. case "RUNNING":
  623. resp.Status = constants.Running
  624. case "STOPPED":
  625. resp.Status = constants.Stopped
  626. case "PENDING":
  627. resp.Status = constants.Pending
  628. case "WAITING":
  629. resp.Status = constants.Waiting
  630. default:
  631. resp.Status = "undefined"
  632. }
  633. return &resp, nil
  634. }
  635. func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
  636. return "", errors.New("failed to implement")
  637. }
  638. func (o OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
  639. return errors.New("failed to implement")
  640. }
  641. func (o OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
  642. return nil, errors.New("failed to implement")
  643. }
  644. func (o OpenI) GetUserBalance(ctx context.Context) (float64, error) {
  645. return 0, errors.New("failed to implement")
  646. }
  647. func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
  648. var resources []interface{}
  649. res := &collector.ResourceSpec{
  650. ClusterId: strconv.FormatInt(o.participantId, 10),
  651. }
  652. //clres := &collector.ClusterResource{}
  653. creationRequirelUrl := o.host + CreationRequirelUrl
  654. reposUrl := o.host + ReposUrl
  655. taskListUrl := o.host + TaskListUrl
  656. var wg sync.WaitGroup
  657. var ch = make(chan *collector.ClusterResource)
  658. var once sync.Once
  659. wg.Add(2)
  660. go func() {
  661. defer wg.Done()
  662. for c := range ComputeSource {
  663. wg.Add(1)
  664. i := c
  665. go func() {
  666. defer wg.Done()
  667. param := model.TaskCreationRequiredParam{
  668. UserName: o.userName,
  669. RepoName: TESTREPO,
  670. JobType: TRAIN,
  671. ComputeSource: ComputeSource[i],
  672. ClusterType: C2NET,
  673. }
  674. b, _ := json.Marshal(param)
  675. byt := bytes.NewBuffer(b)
  676. resp := struct {
  677. Code int `json:"code"`
  678. Msg string `json:"msg"`
  679. Data model.TaskCreationRequired `json:"data"`
  680. }{}
  681. req := common.GetRestyRequest(common.TIMEOUT)
  682. r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
  683. req.RawRequest = r
  684. req.URL = creationRequirelUrl
  685. _, err := req.
  686. SetHeader("Content-Type", "application/json").
  687. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  688. SetBody(byt).
  689. SetResult(&resp).
  690. Send()
  691. if err != nil {
  692. return
  693. }
  694. if len(resp.Data.Data.Specs.All) == 0 {
  695. return
  696. }
  697. // balance
  698. var balanceCheck = func() {
  699. balance := resp.Data.Data.PointAccount.Balance
  700. bal := &collector.Usage{}
  701. bal.Type = strings.ToUpper(BALANCE)
  702. bal.Total = &collector.UnitValue{
  703. Unit: POINT,
  704. Value: balance,
  705. }
  706. ch <- &collector.ClusterResource{Resource: bal}
  707. //rate
  708. var v float64
  709. v = 1
  710. rate := &collector.Usage{
  711. Type: strings.ToUpper(RATE),
  712. Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
  713. }
  714. ch <- &collector.ClusterResource{Resource: rate}
  715. }
  716. once.Do(balanceCheck)
  717. m := make(map[string]struct {
  718. Id int `json:"id"`
  719. AccCardsNum int `json:"acc_cards_num"`
  720. AccCardType string `json:"acc_card_type"`
  721. CpuCores int `json:"cpu_cores"`
  722. MemGiB int `json:"mem_gi_b"`
  723. GpuMemGiB int `json:"gpu_mem_gi_b"`
  724. ShareMemGiB int `json:"share_mem_gi_b"`
  725. ComputeResource string `json:"compute_resource"`
  726. UnitPrice int `json:"unit_price"`
  727. SourceSpecId string `json:"source_spec_id"`
  728. HasInternet int `json:"has_internet"`
  729. EnableVisualization bool `json:"enable_visualization"`
  730. })
  731. for _, s := range resp.Data.Data.Specs.All {
  732. e, ok := m[s.AccCardType]
  733. if ok {
  734. if s.AccCardsNum > e.AccCardsNum {
  735. m[s.AccCardType] = s
  736. }
  737. } else {
  738. m[s.AccCardType] = s
  739. }
  740. }
  741. for k, v := range m {
  742. bres := make([]*collector.Usage, 0)
  743. cres := &collector.ClusterResource{}
  744. card := &collector.Usage{
  745. Type: ComputeSource[i],
  746. Name: strings.ToUpper(k),
  747. Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  748. Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
  749. }
  750. cpu := &collector.Usage{
  751. Type: strings.ToUpper(CPU),
  752. Name: strings.ToUpper(CPU),
  753. Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  754. Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
  755. }
  756. mem := &collector.Usage{
  757. Type: strings.ToUpper(MEMORY),
  758. Name: strings.ToUpper(RAM),
  759. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  760. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
  761. }
  762. vmem := &collector.Usage{
  763. Type: strings.ToUpper(MEMORY),
  764. Name: strings.ToUpper(VRAM),
  765. Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  766. Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
  767. }
  768. //storage
  769. var s float64
  770. s = 1024
  771. storage := &collector.Usage{}
  772. storage.Type = STORAGE
  773. storage.Name = DISK
  774. storage.Total = &collector.UnitValue{
  775. Unit: GIGABYTE,
  776. Value: s,
  777. }
  778. storage.Available = &collector.UnitValue{
  779. Unit: GIGABYTE,
  780. Value: s,
  781. }
  782. bres = append(bres, storage)
  783. bres = append(bres, cpu)
  784. bres = append(bres, mem)
  785. bres = append(bres, vmem)
  786. cres.Resource = card
  787. cres.BaseResources = bres
  788. ch <- cres
  789. }
  790. }()
  791. }
  792. }()
  793. // repos
  794. go func() {
  795. defer wg.Done()
  796. reporesp := struct {
  797. Code int `json:"code"`
  798. Msg string `json:"msg"`
  799. Data []model.Repo `json:"data"`
  800. }{}
  801. reporeq := common.GetRestyRequest(common.TIMEOUT)
  802. repor, _ := http.NewRequest("GET", reposUrl, nil)
  803. reporeq.RawRequest = repor
  804. reporeq.URL = reposUrl
  805. _, err := reporeq.
  806. SetHeader("Content-Type", "application/json").
  807. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  808. SetResult(&reporesp).
  809. Send()
  810. if err != nil {
  811. return
  812. }
  813. if len(reporesp.Data) == 0 {
  814. return
  815. }
  816. // tasklist
  817. var runningJobs atomic.Int64
  818. var jwg sync.WaitGroup
  819. var errs []error
  820. var ech = make(chan error)
  821. jwg.Add(1)
  822. go func() {
  823. defer jwg.Done()
  824. for _, datum := range reporesp.Data {
  825. jwg.Add(1)
  826. dat := datum
  827. go func() {
  828. defer jwg.Done()
  829. param := model.TaskListParam{
  830. UserName: o.userName,
  831. RepoName: dat.Name,
  832. }
  833. b, _ := json.Marshal(param)
  834. byt := bytes.NewBuffer(b)
  835. resp := struct {
  836. Code int `json:"code"`
  837. Msg string `json:"msg"`
  838. Data model.TaskList `json:"data"`
  839. }{}
  840. req := common.GetRestyRequest(common.TIMEOUT)
  841. r, _ := http.NewRequest("GET", taskListUrl, byt)
  842. req.RawRequest = r
  843. req.URL = taskListUrl
  844. _, err := req.
  845. SetHeader("Content-Type", "application/json").
  846. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  847. SetBody(byt).
  848. SetResult(&resp).
  849. Send()
  850. if err != nil {
  851. // assume occupied running tasks
  852. ech <- err
  853. return
  854. }
  855. if len(resp.Data.Data.Tasks) == 0 {
  856. return
  857. }
  858. for _, task := range resp.Data.Data.Tasks {
  859. if task.Task.Status == RUNNING {
  860. runningJobs.Add(1)
  861. }
  862. }
  863. }()
  864. }
  865. }()
  866. go func() {
  867. jwg.Wait()
  868. close(ech)
  869. }()
  870. for v := range ech {
  871. errs = append(errs, v)
  872. }
  873. // running tasks num
  874. var runningNum int64
  875. runningNum = runningJobs.Load()
  876. run := &collector.Usage{}
  877. run.Type = strings.ToUpper(RUNNINGTASK)
  878. if len(errs) == 0 {
  879. run.Total = &collector.UnitValue{
  880. Unit: NUMBER,
  881. Value: runningNum,
  882. }
  883. ch <- &collector.ClusterResource{Resource: run}
  884. } else {
  885. runningNum = int64(len(errs)) * 4
  886. run.Total = &collector.UnitValue{
  887. Unit: NUMBER,
  888. Value: runningNum,
  889. }
  890. ch <- &collector.ClusterResource{Resource: run}
  891. }
  892. }()
  893. go func() {
  894. wg.Wait()
  895. close(ch)
  896. }()
  897. for v := range ch {
  898. resources = append(resources, v)
  899. }
  900. res.Resources = resources
  901. return res, nil
  902. }
  903. func (o OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {
  904. taskDetailsUrl := o.host + TaskOnlineInferUrl
  905. param := model.TaskDetailParam{
  906. UserName: o.userName,
  907. RepoName: repoName,
  908. Id: taskId,
  909. }
  910. b, _ := json.Marshal(param)
  911. byt := bytes.NewBuffer(b)
  912. resp := model.SelfEndpointUrlResp{}
  913. req := common.GetRestyRequest(common.TIMEOUT)
  914. r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
  915. req.RawRequest = r
  916. req.URL = taskDetailsUrl
  917. _, err := req.
  918. SetHeader("Content-Type", "application/json").
  919. SetQueryParam(common.ACCESSTOKEN, o.accessToken).
  920. SetBody(byt).
  921. SetResult(&resp).
  922. Send()
  923. if err != nil {
  924. return "", err
  925. }
  926. if resp.Code != http.StatusOK {
  927. return "", errors.New(resp.Msg)
  928. }
  929. return resp.Data.Url, nil
  930. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.