You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commithpctasklogic.go 4.3 kB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package hpc
  2. import (
  3. "context"
  4. "errors"
  5. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  7. "gitlink.org.cn/JointCloud/pcm-hpc/slurm"
  8. "strconv"
  9. "time"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "github.com/zeromicro/go-zero/core/logx"
  13. )
  14. type CommitHpcTaskLogic struct {
  15. logx.Logger
  16. ctx context.Context
  17. svcCtx *svc.ServiceContext
  18. }
  19. func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
  20. return &CommitHpcTaskLogic{
  21. Logger: logx.WithContext(ctx),
  22. ctx: ctx,
  23. svcCtx: svcCtx,
  24. }
  25. }
  26. func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
  27. var clusterInfo types.ClusterInfo
  28. l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
  29. if len(clusterInfo.Id) == 0 {
  30. return resp, errors.New("cluster not found")
  31. }
  32. // 构建主任务结构体
  33. taskModel := models.Task{
  34. Name: req.Name,
  35. Description: req.Description,
  36. CommitTime: time.Now(),
  37. Status: "Running",
  38. AdapterTypeDict: "2",
  39. }
  40. // 保存任务数据到数据库
  41. tx := l.svcCtx.DbEngin.Create(&taskModel)
  42. if tx.Error != nil {
  43. return nil, tx.Error
  44. }
  45. var adapterName string
  46. l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName)
  47. if len(adapterName) == 0 || adapterName == "" {
  48. return nil, errors.New("no corresponding adapter found")
  49. }
  50. clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
  51. hpcInfo := models.TaskHpc{
  52. TaskId: taskModel.Id,
  53. AdapterId: clusterInfo.AdapterId,
  54. AdapterName: adapterName,
  55. ClusterId: clusterId,
  56. ClusterName: clusterInfo.Name,
  57. Name: taskModel.Name,
  58. CmdScript: req.CmdScript,
  59. StartTime: time.Now().String(),
  60. CardCount: req.CardCount,
  61. WorkDir: req.WorkDir,
  62. WallTime: req.WallTime,
  63. AppType: req.AppType,
  64. AppName: req.AppName,
  65. Queue: req.Queue,
  66. SubmitType: req.SubmitType,
  67. NNode: req.NNode,
  68. Account: clusterInfo.Username,
  69. StdInput: req.StdInput,
  70. Partition: req.Partition,
  71. CreatedTime: time.Now(),
  72. UpdatedTime: time.Now(),
  73. Status: "Running",
  74. }
  75. hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir
  76. tx = l.svcCtx.DbEngin.Create(&hpcInfo)
  77. if tx.Error != nil {
  78. return nil, tx.Error
  79. }
  80. // 提交job到指定集群
  81. jobId, err := submitJob(&hpcInfo, &clusterInfo)
  82. if err != nil {
  83. return nil, err
  84. }
  85. // 保存操作记录
  86. noticeInfo := clientCore.NoticeInfo{
  87. AdapterId: clusterInfo.AdapterId,
  88. AdapterName: adapterName,
  89. ClusterId: clusterId,
  90. ClusterName: clusterInfo.Name,
  91. NoticeType: "create",
  92. TaskName: req.Name,
  93. Incident: "任务创建中",
  94. CreatedTime: time.Now(),
  95. }
  96. result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
  97. if result.Error != nil {
  98. logx.Errorf("Task creation failure, err: %v", result.Error)
  99. }
  100. resp = &types.CommitHpcTaskResp{
  101. JobId: string(jobId),
  102. }
  103. return resp, nil
  104. }
  105. func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo) (string, error) {
  106. client, err := slurm.NewClient(slurm.ClientOptions{
  107. URL: clusterInfo.Server,
  108. ClientVersion: clusterInfo.Version,
  109. RestUserName: clusterInfo.Username,
  110. Token: clusterInfo.Token})
  111. if err != nil {
  112. return "", err
  113. }
  114. job, err := client.Job(slurm.JobOptions{})
  115. if err != nil {
  116. return "", err
  117. }
  118. // 封装请求参数
  119. submitReq := slurm.JobOptions{
  120. Script: hpcInfo.CmdScript,
  121. Job: &slurm.JobProperties{
  122. Account: hpcInfo.Account,
  123. Name: hpcInfo.Name,
  124. NTasks: 1,
  125. CurrentWorkingDirectory: hpcInfo.WorkDir,
  126. Partition: hpcInfo.Partition,
  127. Environment: map[string]string{"PATH": clusterInfo.EnvPath,
  128. "LD_LIBRARY_PATH": clusterInfo.EnvLdPath},
  129. },
  130. }
  131. submitReq.Job.StandardOutput = submitReq.Job.CurrentWorkingDirectory + "/%j.out"
  132. submitReq.Job.StandardError = submitReq.Job.CurrentWorkingDirectory + "/%j.err"
  133. jobResp, err := job.SubmitJob(clusterInfo.ProxyAddress, submitReq)
  134. if err != nil {
  135. return "", err
  136. }
  137. jobId := strconv.Itoa(jobResp.JobId)
  138. return jobId, nil
  139. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.