You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

jobset.go 14 kB

2 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
8 months ago
8 months ago
8 months ago
2 months ago
2 months ago
2 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. package sch
  2. import (
  3. "fmt"
  4. "io"
  5. "net/url"
  6. "strings"
  7. "gitlink.org.cn/cloudream/common/pkgs/logger"
  8. schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
  9. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  10. "gitlink.org.cn/cloudream/common/utils/http2"
  11. "gitlink.org.cn/cloudream/common/utils/serder"
  12. )
  13. type GetClusterInfoReq struct {
  14. Type string `json:"type"`
  15. IDs []schsdk.ClusterID `json:"clusterIDs"`
  16. }
  17. func (c *Client) GetClusterInfo(req GetClusterInfoReq, token string) ([]ClusterDetail, error) {
  18. targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources")
  19. if err != nil {
  20. return nil, err
  21. }
  22. resourceType := Train
  23. if req.Type != "" {
  24. resourceType = req.Type
  25. }
  26. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  27. Body: GetClusterInfoReq{
  28. Type: resourceType,
  29. },
  30. Header: map[string]string{
  31. "Authorization": token,
  32. },
  33. })
  34. if err != nil {
  35. return nil, err
  36. }
  37. //all, err := io.ReadAll(resp.Body)
  38. //str := string(all)
  39. //println(str)
  40. contType := resp.Header.Get("Content-Type")
  41. if strings.Contains(contType, http2.ContentTypeJSON) {
  42. var codeResp response[[]ClusterDetail]
  43. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  44. return nil, fmt.Errorf("parsing response: %w", err)
  45. }
  46. // 该接口传入参数后查询效率很低,所以需要在这里特殊处理
  47. if codeResp.Code == ResponseCodeOK {
  48. var results []ClusterDetail
  49. for _, cluster := range codeResp.Data {
  50. for _, id := range req.IDs {
  51. if cluster.ClusterId == id {
  52. results = append(results, cluster)
  53. }
  54. }
  55. }
  56. return results, nil
  57. }
  58. return nil, codeResp.ToError()
  59. }
  60. return nil, fmt.Errorf("unknow response content type: %s", contType)
  61. }
  62. type CreateInferenceJobResp struct {
  63. TaskId string `json:"taskId"`
  64. }
  65. type CreateAIJobReq struct {
  66. Name string `json:"name"`
  67. Description string `json:"description"`
  68. JobResources schsdk.JobResources `json:"jobResources"`
  69. DataDistributes DataDistribute `json:"dataDistributes"`
  70. }
  71. type CommonJsonData struct {
  72. ID string `json:"id"`
  73. Name string `json:"name"`
  74. }
  75. type DataDistribute struct {
  76. Dataset []DatasetDistribute `json:"dataset"`
  77. Code []CodeDistribute `json:"code"`
  78. Image []ImageDistribute `json:"image"`
  79. Model []ModelDistribute `json:"model"`
  80. }
  81. type DataDetail struct {
  82. ClusterID schsdk.ClusterID `json:"clusterID"`
  83. //StorageID cdssdk.StorageID `json:"storageID"`
  84. StorageID cdssdk.StorageID
  85. JsonData string `json:"jsonData"`
  86. }
  87. type DatasetDistribute struct {
  88. DataName string `json:"dataName"`
  89. PackageID cdssdk.PackageID `json:"packageID"`
  90. Clusters []DataDetail `json:"clusters"`
  91. }
  92. type CodeDistribute struct {
  93. DataName string `json:"dataName"`
  94. PackageID cdssdk.PackageID `json:"packageID"`
  95. Output string `json:"output"`
  96. Clusters []DataDetail `json:"clusters"`
  97. }
  98. type ImageDistribute struct {
  99. DataName string `json:"dataName"`
  100. //PackageID cdssdk.PackageID `json:"packageID"`
  101. ImageID schsdk.ImageID `json:"packageID"`
  102. Clusters []DataDetail `json:"clusters"`
  103. }
  104. type ModelDistribute struct {
  105. DataName string `json:"dataName"`
  106. PackageID cdssdk.PackageID `json:"packageID"`
  107. Clusters []DataDetail `json:"clusters"`
  108. }
  109. type CreateJobResp struct {
  110. TaskID TaskID `json:"taskID"`
  111. TaskName string `json:"taskName"`
  112. ScheduleDatas []ScheduleData `json:"scheduleDatas"`
  113. }
  114. type ScheduleData struct {
  115. DataType string `json:"dataType"`
  116. PackageID cdssdk.PackageID `json:"packageID"`
  117. StorageType string `json:"storageType"`
  118. ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
  119. }
  120. func (c *Client) CreateInferenceJob(req CreateAIJobReq, token string) (*CreateInferenceJobResp, error) {
  121. targetUrl, err := url.JoinPath(c.baseURL, "inference/createTask")
  122. if err != nil {
  123. return nil, err
  124. }
  125. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  126. Body: req,
  127. Header: map[string]string{
  128. "Authorization": token,
  129. },
  130. })
  131. if err != nil {
  132. return nil, err
  133. }
  134. contType := resp.Header.Get("Content-Type")
  135. if strings.Contains(contType, http2.ContentTypeJSON) {
  136. var codeResp response2[CreateInferenceJobResp]
  137. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  138. return nil, fmt.Errorf("parsing response: %w", err)
  139. }
  140. if codeResp.Code == ResponseCodeOK {
  141. return &codeResp.Data, nil
  142. }
  143. return nil, fmt.Errorf("error: %s", codeResp.Message)
  144. }
  145. return nil, fmt.Errorf("unknow response content type: %s", contType)
  146. }
  147. type StopInferenceJobReq struct {
  148. AdapterID string `json:"adapterId"`
  149. ClusterID schsdk.ClusterID `json:"clusterId"`
  150. ID string `json:"id"`
  151. InstanceID string `json:"instanceId"`
  152. }
  153. func (c *Client) StopInferenceJob(req StopInferenceJobReq, token string) error {
  154. targetUrl, err := url.JoinPath(c.baseURL, "inference/stopDeployInstance")
  155. if err != nil {
  156. return err
  157. }
  158. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  159. Body: req,
  160. Header: map[string]string{
  161. "Authorization": token,
  162. },
  163. })
  164. if err != nil {
  165. return err
  166. }
  167. contType := resp.Header.Get("Content-Type")
  168. if strings.Contains(contType, http2.ContentTypeJSON) {
  169. var codeResp response2[CreateInferenceJobResp]
  170. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  171. return fmt.Errorf("parsing response: %w", err)
  172. }
  173. if codeResp.Code == ResponseCodeOK {
  174. return nil
  175. }
  176. return fmt.Errorf("error: %s", codeResp.Message)
  177. }
  178. return fmt.Errorf("unknow response content type: %s", contType)
  179. }
  180. func (c *Client) CreateJob(req CreateAIJobReq, token string) (*CreateJobResp, error) {
  181. targetUrl, err := url.JoinPath(c.baseURL, "schedule/createTask")
  182. if err != nil {
  183. return nil, err
  184. }
  185. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  186. Body: req,
  187. Header: map[string]string{
  188. "Authorization": token,
  189. },
  190. })
  191. if err != nil {
  192. return nil, err
  193. }
  194. //all, err := io.ReadAll(resp.Body)
  195. //println(string(all))
  196. contType := resp.Header.Get("Content-Type")
  197. if strings.Contains(contType, http2.ContentTypeJSON) {
  198. var codeResp response2[CreateJobResp]
  199. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  200. return nil, fmt.Errorf("parsing response: %w", err)
  201. }
  202. if codeResp.Code == ResponseCodeOK {
  203. return &codeResp.Data, nil
  204. }
  205. return nil, fmt.Errorf("error: %s", codeResp.Message)
  206. }
  207. return nil, fmt.Errorf("unknow response content type: %s", contType)
  208. }
  209. type RunJobReq struct {
  210. TaskID TaskID `json:"taskID"`
  211. ScheduledDatas []DataScheduleResults `json:"scheduledDatas"`
  212. }
  213. type DataScheduleResult struct {
  214. Clusters []DataDetail `json:"clusters"`
  215. PackageID cdssdk.PackageID `json:"packageID"`
  216. PackageFullPath string `json:"packageFullPath"`
  217. Status bool `json:"status"`
  218. Msg string `json:"msg"`
  219. }
  220. type DataScheduleResults struct {
  221. DataType string `json:"dataType"`
  222. Results []DataScheduleResult `json:"results"`
  223. }
  224. func (c *Client) RunJob(req RunJobReq, token string) error {
  225. targetUrl, err := url.JoinPath(c.baseURL, "schedule/runTask")
  226. if err != nil {
  227. return err
  228. }
  229. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  230. Body: req,
  231. Header: map[string]string{
  232. "Authorization": token,
  233. },
  234. })
  235. if err != nil {
  236. return err
  237. }
  238. contType := resp.Header.Get("Content-Type")
  239. if strings.Contains(contType, http2.ContentTypeJSON) {
  240. var codeResp response2[string]
  241. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  242. return fmt.Errorf("parsing response: %w", err)
  243. }
  244. if codeResp.Code == ResponseCodeOK {
  245. return nil
  246. }
  247. return fmt.Errorf("error: %s", codeResp.Message)
  248. }
  249. return fmt.Errorf("unknow response content type: %s", contType)
  250. }
  251. type CancelJobReq struct {
  252. TaskID TaskID `json:"taskID"`
  253. Msg string `json:"msg"`
  254. }
  255. func (c *Client) CancelJob(req CancelJobReq) error {
  256. targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources")
  257. if err != nil {
  258. return err
  259. }
  260. resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req})
  261. if err != nil {
  262. return err
  263. }
  264. contType := resp.Header.Get("Content-Type")
  265. if strings.Contains(contType, http2.ContentTypeJSON) {
  266. var codeResp response[string]
  267. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  268. return fmt.Errorf("parsing response: %w", err)
  269. }
  270. if codeResp.Code == ResponseCodeOK {
  271. return nil
  272. }
  273. return codeResp.ToError()
  274. }
  275. return fmt.Errorf("unknow response content type: %s", contType)
  276. }
  277. type GetResourceSpecReq struct {
  278. ClusterID schsdk.ClusterID `form:"clusterId"`
  279. Tag string `form:"tag"`
  280. PageNum int64 `form:"pageNum"`
  281. PageSize int64 `form:"pageSize"`
  282. Status string `form:"status"`
  283. ClusterType string `form:"clusterType"`
  284. }
  285. type GetResourceSpecResp struct {
  286. Total int64 `json:"total"`
  287. PageNum int64 `json:"pageNum"`
  288. PageSize int64 `json:"pageSize"`
  289. List []ResourceDetail `json:"list"`
  290. }
  291. func (c *Client) GetResourceSpec(req GetResourceSpecReq, token string) ([]ResourceDetail, int64, error) {
  292. targetUrl, err := url.JoinPath(c.baseURL, "core/ai/resourceSpec/page")
  293. if err != nil {
  294. return nil, 0, err
  295. }
  296. req.Status = "1"
  297. resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
  298. Query: req,
  299. Header: map[string]string{
  300. "Authorization": token,
  301. },
  302. })
  303. //all, err := io.ReadAll(resp.Body)
  304. //println(string(all))
  305. if err != nil {
  306. return nil, 0, err
  307. }
  308. contType := resp.Header.Get("Content-Type")
  309. if strings.Contains(contType, http2.ContentTypeJSON) {
  310. var codeResp response2[GetResourceSpecResp]
  311. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  312. return nil, 0, fmt.Errorf("parsing response: %w", err)
  313. }
  314. if codeResp.Code == ResponseCodeOK {
  315. return codeResp.Data.List, codeResp.Data.Total, nil
  316. }
  317. return nil, 0, codeResp.ToError()
  318. }
  319. return nil, 0, fmt.Errorf("unknow response content type: %s", contType)
  320. }
  321. type BindReq struct {
  322. Name string `json:"name"`
  323. Desc string `json:"desc"`
  324. ClusterId string `json:"clusterId"`
  325. DataSource DataSource `json:"src"`
  326. BindParam BindParam `json:"param"`
  327. }
  328. type DataSource struct {
  329. JCS JCS `json:"jcs"`
  330. }
  331. type JCS struct {
  332. UserID cdssdk.UserID `json:"userID"`
  333. PackageId cdssdk.PackageID `json:"packageId"`
  334. BucketID cdssdk.BucketID `json:"bucketID"`
  335. }
  336. type BindParam struct {
  337. BootFile string `json:"bootFile"`
  338. DefaultBranch string `json:"defaultBranch"`
  339. RepoName string `json:"repoName"`
  340. }
  341. type BindResp struct {
  342. ID string `json:"id"`
  343. Name string `json:"name"`
  344. }
  345. func (c *Client) BindCode(req BindReq, token string) (*BindResp, error) {
  346. targetUrl, err := url.JoinPath(c.baseURL, "ai/createAlgorithm")
  347. if err != nil {
  348. return nil, err
  349. }
  350. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  351. Body: req,
  352. Header: map[string]string{
  353. "Authorization": token,
  354. },
  355. })
  356. if err != nil {
  357. return nil, err
  358. }
  359. contType := resp.Header.Get("Content-Type")
  360. if strings.Contains(contType, http2.ContentTypeJSON) {
  361. var codeResp response2[BindResp]
  362. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  363. return nil, fmt.Errorf("parsing response: %w", err)
  364. }
  365. if codeResp.Code == ResponseCodeOK {
  366. return &codeResp.Data, nil
  367. }
  368. return nil, fmt.Errorf("error: %s", codeResp.Message)
  369. }
  370. return nil, fmt.Errorf("unknow response content type: %s", contType)
  371. }
  372. func (c *Client) BindModel(req BindReq, token string) (*BindResp, error) {
  373. targetUrl, err := url.JoinPath(c.baseURL, "ai/createModel")
  374. if err != nil {
  375. return nil, err
  376. }
  377. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  378. Body: req,
  379. Header: map[string]string{
  380. "Authorization": token,
  381. },
  382. })
  383. if err != nil {
  384. return nil, err
  385. }
  386. //all, err := io.ReadAll(resp.Body)
  387. //println(string(all))
  388. contType := resp.Header.Get("Content-Type")
  389. if strings.Contains(contType, http2.ContentTypeJSON) {
  390. var codeResp response2[BindResp]
  391. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  392. return nil, fmt.Errorf("parsing response: %w", err)
  393. }
  394. if codeResp.Code == ResponseCodeOK {
  395. return &codeResp.Data, nil
  396. }
  397. return nil, fmt.Errorf("error: %s", codeResp.Message)
  398. }
  399. return nil, fmt.Errorf("unknow response content type: %s", contType)
  400. }
  401. func (c *Client) BindDataSet(req BindReq, token string) (*BindResp, error) {
  402. targetUrl, err := url.JoinPath(c.baseURL, "ai/createDataSet")
  403. if err != nil {
  404. return nil, err
  405. }
  406. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  407. Body: req,
  408. Header: map[string]string{
  409. "Authorization": token,
  410. },
  411. })
  412. if err != nil {
  413. return nil, err
  414. }
  415. //all, err := io.ReadAll(resp.Body)
  416. //println(string(all))
  417. contType := resp.Header.Get("Content-Type")
  418. if strings.Contains(contType, http2.ContentTypeJSON) {
  419. var codeResp response2[BindResp]
  420. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  421. return nil, fmt.Errorf("parsing response: %w", err)
  422. }
  423. if codeResp.Code == ResponseCodeOK {
  424. return &codeResp.Data, nil
  425. }
  426. return nil, fmt.Errorf("error: %s", codeResp.Message)
  427. }
  428. all, err := io.ReadAll(resp.Body)
  429. logger.Errorf("BindDataSet error: %s, url: %s", string(all), targetUrl)
  430. return nil, fmt.Errorf("unknow response content type: %s", contType)
  431. }
  432. func (c *Client) DataReturn(req BindReq, token string) (*BindResp, error) {
  433. targetUrl, err := url.JoinPath(c.baseURL, "ai/task/sync")
  434. if err != nil {
  435. return nil, err
  436. }
  437. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  438. Body: req,
  439. Header: map[string]string{
  440. "Authorization": token,
  441. },
  442. })
  443. if err != nil {
  444. return nil, err
  445. }
  446. contType := resp.Header.Get("Content-Type")
  447. if strings.Contains(contType, http2.ContentTypeJSON) {
  448. var codeResp response2[BindResp]
  449. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  450. return nil, fmt.Errorf("parsing response: %w", err)
  451. }
  452. if codeResp.Code == ResponseCodeOK {
  453. return &codeResp.Data, nil
  454. }
  455. return nil, fmt.Errorf("error: %s", codeResp.Message)
  456. }
  457. return nil, fmt.Errorf("unknow response content type: %s", contType)
  458. }