You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

slurm.go 3.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package hpcservice
  2. import (
  3. "context"
  4. "github.com/go-resty/resty/v2"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
  10. )
  11. type ParticipantHpc struct {
  12. participantId int64
  13. platform string
  14. host string
  15. userName string
  16. accessToken string
  17. }
  18. const (
  19. BackendSlurm = "slurm"
  20. JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
  21. SubmitTaskUrl = "/api/v1/jobs"
  22. CancelTaskUrl = "/api/v1/jobs/cancel/{backend}/{jobId}"
  23. JobLogUrl = "/api/v1/jobs/logs/{backend}/{jobId}"
  24. )
  25. func NewHpc(host string, id int64, platform string) *ParticipantHpc {
  26. return &ParticipantHpc{
  27. host: host,
  28. participantId: id,
  29. platform: platform,
  30. }
  31. }
  32. func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) {
  33. reqUrl := c.host + JobDetailUrl
  34. hpcResp := &collector.HpcJobDetailResp{}
  35. httpClient := resty.New().R()
  36. _, err := httpClient.SetHeader("Content-Type", "application/json").
  37. SetPathParam("jobId", taskId).
  38. SetPathParam("backend", "slurm").
  39. SetResult(&hpcResp).
  40. Get(reqUrl)
  41. if err != nil {
  42. return nil, err
  43. }
  44. var resp collector.Task
  45. resp.Id = hpcResp.Data.ID
  46. if !hpcResp.Data.StartTime.IsZero() {
  47. resp.Start = hpcResp.Data.StartTime.Format(constants.Layout)
  48. }
  49. if !hpcResp.Data.EndTime.IsZero() {
  50. resp.End = hpcResp.Data.EndTime.Format(constants.Layout)
  51. }
  52. switch hpcResp.Data.StatusText {
  53. case "COMPLETED":
  54. resp.Status = constants.Completed
  55. case "FAILED":
  56. resp.Status = constants.Failed
  57. case "CREATED_FAILED":
  58. resp.Status = constants.Failed
  59. case "RUNNING":
  60. resp.Status = constants.Running
  61. case "STOPPED":
  62. resp.Status = constants.Stopped
  63. case "PENDING":
  64. resp.Status = constants.Pending
  65. case "WAITING":
  66. resp.Status = constants.Waiting
  67. case "CANCELLED":
  68. resp.Status = constants.Cancelled
  69. default:
  70. resp.Status = "undefined"
  71. }
  72. return &resp, nil
  73. }
  74. func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
  75. reqUrl := c.host + SubmitTaskUrl
  76. resp := types.CommitHpcTaskResp{}
  77. logx.WithContext(ctx).Infof("提交任务到超算集群, url: %s, req: %+v", reqUrl, req)
  78. httpClient := resty.New().R()
  79. _, err := httpClient.SetHeaders(
  80. map[string]string{
  81. "Content-Type": "application/json",
  82. "traceId": result.TraceIDFromContext(ctx),
  83. }).SetBody(req).
  84. SetResult(&resp).
  85. Post(reqUrl)
  86. if err != nil {
  87. return nil, err
  88. }
  89. return &resp, nil
  90. }
  91. func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string) error {
  92. reqUrl := c.host + CancelTaskUrl
  93. resp := types.CommonResp{}
  94. logx.WithContext(ctx).Infof("取消超算集群任务, url: %s, jobId: %s", reqUrl, jobId)
  95. httpClient := resty.New().R()
  96. _, err := httpClient.SetHeaders(
  97. map[string]string{
  98. "Content-Type": "application/json",
  99. "traceId": result.TraceIDFromContext(ctx),
  100. }).SetPathParams(map[string]string{
  101. "backend": BackendSlurm,
  102. "jobId": jobId,
  103. }).SetResult(&resp).Delete(reqUrl)
  104. if err != nil {
  105. return err
  106. }
  107. return nil
  108. }
  109. func (c *ParticipantHpc) GetTaskLogs(ctx context.Context, jobId string) (interface{}, error) {
  110. reqUrl := c.host + JobLogUrl
  111. resp := types.CommonResp{}
  112. logx.WithContext(ctx).Infof("获取超算集群任务日志, url: %s, jobId: %s", reqUrl, jobId)
  113. httpClient := resty.New().R()
  114. _, err := httpClient.SetHeaders(
  115. map[string]string{
  116. "Content-Type": "application/json",
  117. "traceId": result.TraceIDFromContext(ctx),
  118. }).SetPathParams(map[string]string{
  119. "backend": BackendSlurm,
  120. "jobId": jobId,
  121. }).SetResult(&resp).Get(reqUrl)
  122. if err != nil {
  123. return nil, err
  124. }
  125. return resp, nil
  126. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.