You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

slurm.go 4.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package hpcservice
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-resty/resty/v2"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/restyclient"
  12. "net/http"
  13. )
  14. type ParticipantHpc struct {
  15. participantId int64
  16. platform string
  17. host string
  18. userName string
  19. accessToken string
  20. *restyclient.RestyClient
  21. }
  22. const (
  23. BackendSlurm = "slurm"
  24. JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
  25. SubmitTaskUrl = "/api/v1/jobs"
  26. CancelTaskUrl = "/api/v1/jobs/cancel/{backend}/{jobId}"
  27. JobLogUrl = "/api/v1/jobs/logs/{backend}/{jobId}"
  28. )
  29. func NewHpc(host string, id int64, platform string) *ParticipantHpc {
  30. return &ParticipantHpc{
  31. host: host,
  32. participantId: id,
  33. platform: platform,
  34. RestyClient: restyclient.InitClient(host, ""),
  35. }
  36. }
  37. func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) {
  38. reqUrl := c.host + JobDetailUrl
  39. hpcResp := &collector.HpcJobDetailResp{}
  40. httpClient := resty.New().R()
  41. _, err := httpClient.SetHeader("Content-Type", "application/json").
  42. SetPathParam("jobId", taskId).
  43. SetPathParam("backend", "slurm").
  44. SetResult(&hpcResp).
  45. Get(reqUrl)
  46. if err != nil {
  47. return nil, err
  48. }
  49. var resp collector.Task
  50. resp.Id = hpcResp.Data.ID
  51. if !hpcResp.Data.StartTime.IsZero() {
  52. resp.Start = hpcResp.Data.StartTime.Format(constants.Layout)
  53. }
  54. if !hpcResp.Data.EndTime.IsZero() {
  55. resp.End = hpcResp.Data.EndTime.Format(constants.Layout)
  56. }
  57. switch hpcResp.Data.StatusText {
  58. case "COMPLETED":
  59. resp.Status = constants.Completed
  60. case "FAILED":
  61. resp.Status = constants.Failed
  62. case "CREATED_FAILED":
  63. resp.Status = constants.Failed
  64. case "RUNNING":
  65. resp.Status = constants.Running
  66. case "STOPPED":
  67. resp.Status = constants.Stopped
  68. case "PENDING":
  69. resp.Status = constants.Pending
  70. case "WAITING":
  71. resp.Status = constants.Waiting
  72. case "CANCELLED":
  73. resp.Status = constants.Cancelled
  74. default:
  75. resp.Status = "undefined"
  76. }
  77. return &resp, nil
  78. }
  79. func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
  80. reqUrl := c.host + SubmitTaskUrl
  81. resp := types.CommitHpcTaskResp{}
  82. logx.WithContext(ctx).Infof("提交任务到超算集群, url: %s, req: %+v", reqUrl, req)
  83. httpClient := resty.New().R()
  84. _, err := httpClient.SetHeaders(
  85. map[string]string{
  86. "Content-Type": "application/json",
  87. "traceId": result.TraceIDFromContext(ctx),
  88. }).SetBody(req).
  89. SetResult(&resp).
  90. Post(reqUrl)
  91. if err != nil {
  92. return nil, err
  93. }
  94. if resp.Code != http.StatusOK {
  95. return nil, fmt.Errorf(resp.Msg)
  96. }
  97. return &resp, nil
  98. }
  99. func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string) error {
  100. reqUrl := c.host + CancelTaskUrl
  101. resp := types.CommonResp{}
  102. logx.WithContext(ctx).Infof("取消超算集群任务, url: %s, jobId: %s", reqUrl, jobId)
  103. httpClient := resty.New().R()
  104. _, err := httpClient.SetHeaders(
  105. map[string]string{
  106. "Content-Type": "application/json",
  107. "traceId": result.TraceIDFromContext(ctx),
  108. }).SetPathParams(map[string]string{
  109. "backend": BackendSlurm,
  110. "jobId": jobId,
  111. }).SetResult(&resp).Delete(reqUrl)
  112. if err != nil {
  113. return err
  114. }
  115. if resp.Code != http.StatusOK {
  116. return fmt.Errorf(resp.Msg)
  117. }
  118. return nil
  119. }
  120. func (c *ParticipantHpc) GetTaskLogs(ctx context.Context, jobId string) (interface{}, error) {
  121. logx.WithContext(ctx).Infof("获取超算集群任务日志, url: %s, jobId: %s", JobLogUrl, jobId)
  122. if jobId == "" {
  123. return nil, fmt.Errorf("jobId is empty")
  124. }
  125. resp := types.CommonResp{}
  126. _, err := c.Request(JobLogUrl, http.MethodGet, func(req *resty.Request) {
  127. req.SetHeaders(map[string]string{
  128. "Content-Type": "application/json",
  129. "traceId": result.TraceIDFromContext(ctx),
  130. }).SetPathParams(map[string]string{
  131. "backend": BackendSlurm,
  132. "jobId": jobId,
  133. }).SetResult(&resp)
  134. })
  135. if err != nil {
  136. return nil, err
  137. }
  138. if resp.Code != http.StatusOK {
  139. return nil, fmt.Errorf(resp.Msg)
  140. }
  141. return resp, nil
  142. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.