You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

jobset.go 6.5 kB

8 months ago
8 months ago
8 months ago
8 months ago

  1. package sch
  2. import (
  3. "fmt"
  4. schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
  5. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  6. "gitlink.org.cn/cloudream/common/utils/http2"
  7. "gitlink.org.cn/cloudream/common/utils/serder"
  8. "net/url"
  9. "strings"
  10. )
  11. type GetClusterInfoReq struct {
  12. IDs []schsdk.ClusterID `json:"clusterIDs"`
  13. }
  14. //type GetClusterInfoResp struct {
  15. // Data []ClusterDetail `json:"data"`
  16. // TraceId string `json:"traceId"`
  17. //}
  18. func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) {
  19. targetUrl, err := url.JoinPath(c.baseURL, "/queryResources")
  20. if err != nil {
  21. return nil, err
  22. }
  23. //resp, err := http2.PostJSON(targetUrl, http2.RequestParam{Body: req})
  24. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{})
  25. if err != nil {
  26. return nil, err
  27. }
  28. contType := resp.Header.Get("Content-Type")
  29. if strings.Contains(contType, http2.ContentTypeJSON) {
  30. var codeResp response[[]ClusterDetail]
  31. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  32. return nil, fmt.Errorf("parsing response: %w", err)
  33. }
  34. // 该接口传入参数后查询效率很低,所以需要在这里特殊处理
  35. if codeResp.Code == ResponseCodeOK {
  36. var results []ClusterDetail
  37. for _, cluster := range codeResp.Data {
  38. for _, id := range req.IDs {
  39. if cluster.ClusterId == id {
  40. results = append(results, cluster)
  41. }
  42. }
  43. }
  44. return results, nil
  45. }
  46. return nil, codeResp.ToError()
  47. }
  48. return nil, fmt.Errorf("unknow response content type: %s", contType)
  49. }
  50. type CreateJobReq struct {
  51. Name string `json:"name"`
  52. Description string `json:"description"`
  53. JobResources schsdk.JobResources `json:"jobResources"`
  54. DataDistribute DataDistribute `json:"dataDistributes"`
  55. }
  56. type CommonJsonData struct {
  57. ID string `json:"id"`
  58. Name string `json:"name"`
  59. }
  60. type DataDistribute struct {
  61. Dataset []DatasetDistribute `json:"dataset"`
  62. Code []CodeDistribute `json:"code"`
  63. Image []ImageDistribute `json:"image"`
  64. Model []ModelDistribute `json:"model"`
  65. }
  66. type DataDetail struct {
  67. ClusterID schsdk.ClusterID `json:"clusterID"`
  68. //StorageID cdssdk.StorageID `json:"storageID"`
  69. StorageID cdssdk.StorageID
  70. JsonData string `json:"jsonData"`
  71. }
  72. type DatasetDistribute struct {
  73. DataName string `json:"dataName"`
  74. PackageID cdssdk.PackageID `json:"packageID"`
  75. Clusters []DataDetail `json:"clusters"`
  76. }
  77. type CodeDistribute struct {
  78. DataName string `json:"dataName"`
  79. PackageID cdssdk.PackageID `json:"packageID"`
  80. Clusters []DataDetail `json:"clusters"`
  81. }
  82. type ImageDistribute struct {
  83. DataName string `json:"dataName"`
  84. //PackageID cdssdk.PackageID `json:"packageID"`
  85. ImageID schsdk.ImageID `json:"packageID"`
  86. Clusters []DataDetail `json:"clusters"`
  87. }
  88. type ModelDistribute struct {
  89. DataName string `json:"dataName"`
  90. PackageID cdssdk.PackageID `json:"packageID"`
  91. Clusters []DataDetail `json:"clusters"`
  92. }
  93. type CreateJobResp struct {
  94. TaskID TaskID `json:"taskID"`
  95. ScheduleDatas []ScheduleData `json:"scheduleDatas"`
  96. }
  97. type ScheduleData struct {
  98. DataType string `json:"dataType"`
  99. PackageID cdssdk.PackageID `json:"packageID"`
  100. StorageType string `json:"storageType"`
  101. ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
  102. }
  103. func (c *Client) CreateJob(req CreateJobReq, token string) (*CreateJobResp, error) {
  104. targetUrl, err := url.JoinPath(c.baseURL, "/createTask")
  105. if err != nil {
  106. return nil, err
  107. }
  108. // 将req转换成json,并打印
  109. //req2, err := serder.ObjectToJSONEx(req)
  110. //if err != nil {
  111. // return nil, fmt.Errorf("request to json: %w", err)
  112. //}
  113. //fmt.Println(string(req2))
  114. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  115. Body: req,
  116. Header: map[string]string{
  117. "Authorization": token,
  118. },
  119. })
  120. if err != nil {
  121. return nil, err
  122. }
  123. contType := resp.Header.Get("Content-Type")
  124. if strings.Contains(contType, http2.ContentTypeJSON) {
  125. var codeResp respons2[CreateJobResp]
  126. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  127. return nil, fmt.Errorf("parsing response: %w", err)
  128. }
  129. if codeResp.Code == ResponseCodeOK {
  130. return &codeResp.Data, nil
  131. }
  132. return nil, fmt.Errorf("error: %s", codeResp.Message)
  133. }
  134. return nil, fmt.Errorf("unknow response content type: %s", contType)
  135. }
  136. type RunJobReq struct {
  137. TaskID TaskID `json:"taskID"`
  138. ScheduledDatas []DataScheduleResults `json:"scheduledDatas"`
  139. }
  140. type DataScheduleResult struct {
  141. Clusters []DataDetail `json:"clusters"`
  142. PackageID cdssdk.PackageID `json:"packageID"`
  143. PackageFullPath string `json:"packageFullPath"`
  144. Status bool `json:"status"`
  145. Msg string `json:"msg"`
  146. }
  147. type DataScheduleResults struct {
  148. DataType string `json:"dataType"`
  149. Results []DataScheduleResult `json:"results"`
  150. }
  151. func (c *Client) RunJob(req RunJobReq, token string) error {
  152. targetUrl, err := url.JoinPath(c.baseURL, "runTask")
  153. if err != nil {
  154. return err
  155. }
  156. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  157. Body: req,
  158. Header: map[string]string{
  159. "Authorization": token,
  160. },
  161. })
  162. if err != nil {
  163. return err
  164. }
  165. contType := resp.Header.Get("Content-Type")
  166. if strings.Contains(contType, http2.ContentTypeJSON) {
  167. var codeResp respons2[string]
  168. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  169. return fmt.Errorf("parsing response: %w", err)
  170. }
  171. if codeResp.Code == ResponseCodeOK {
  172. return nil
  173. }
  174. return fmt.Errorf("error: %s", codeResp.Message)
  175. }
  176. return fmt.Errorf("unknow response content type: %s", contType)
  177. }
  178. type CancelJobReq struct {
  179. TaskID TaskID `json:"taskID"`
  180. Msg string `json:"msg"`
  181. }
  182. func (c *Client) CancelJob(req CancelJobReq) error {
  183. targetUrl, err := url.JoinPath(c.baseURL, "/queryResources")
  184. if err != nil {
  185. return err
  186. }
  187. resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req})
  188. if err != nil {
  189. return err
  190. }
  191. contType := resp.Header.Get("Content-Type")
  192. if strings.Contains(contType, http2.ContentTypeJSON) {
  193. var codeResp response[string]
  194. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  195. return fmt.Errorf("parsing response: %w", err)
  196. }
  197. if codeResp.Code == ResponseCodeOK {
  198. return nil
  199. }
  200. return codeResp.ToError()
  201. }
  202. return fmt.Errorf("unknow response content type: %s", contType)
  203. }