You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commithpctasklogic.go 4.3 kB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package hpc
  2. import (
  3. "context"
  4. "errors"
  5. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  7. "gitlink.org.cn/JointCloud/pcm-hpc/slurm"
  8. "strconv"
  9. "time"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "github.com/zeromicro/go-zero/core/logx"
  13. )
  14. type CommitHpcTaskLogic struct {
  15. logx.Logger
  16. ctx context.Context
  17. svcCtx *svc.ServiceContext
  18. }
  19. func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
  20. return &CommitHpcTaskLogic{
  21. Logger: logx.WithContext(ctx),
  22. ctx: ctx,
  23. svcCtx: svcCtx,
  24. }
  25. }
  26. func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
  27. var clusterInfo types.ClusterInfo
  28. l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
  29. if len(clusterInfo.Id) == 0 {
  30. return resp, errors.New("cluster not found")
  31. }
  32. // 构建主任务结构体
  33. taskModel := models.Task{
  34. Name: req.Name,
  35. Description: req.Description,
  36. CommitTime: time.Now(),
  37. Status: "Running",
  38. AdapterTypeDict: "2",
  39. }
  40. // 保存任务数据到数据库
  41. tx := l.svcCtx.DbEngin.Create(&taskModel)
  42. if tx.Error != nil {
  43. return nil, tx.Error
  44. }
  45. var adapterName string
  46. l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName)
  47. if len(adapterName) == 0 || adapterName == "" {
  48. return nil, errors.New("no corresponding adapter found")
  49. }
  50. hpcInfo := models.TaskHpc{
  51. TaskId: taskModel.Id,
  52. AdapterId: clusterInfo.AdapterId,
  53. AdapterName: adapterName,
  54. ClusterId: req.ClusterId,
  55. ClusterName: clusterInfo.Name,
  56. Name: taskModel.Name,
  57. CmdScript: req.CmdScript,
  58. StartTime: time.Now().String(),
  59. CardCount: req.CardCount,
  60. WorkDir: req.WorkDir,
  61. WallTime: req.WallTime,
  62. AppType: req.AppType,
  63. AppName: req.AppName,
  64. Queue: req.Queue,
  65. SubmitType: req.SubmitType,
  66. NNode: req.NNode,
  67. Account: clusterInfo.Username,
  68. StdInput: req.StdInput,
  69. Partition: req.Partition,
  70. CreatedTime: time.Now(),
  71. UpdatedTime: time.Now(),
  72. Status: "Running",
  73. }
  74. hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir
  75. tx = l.svcCtx.DbEngin.Create(&hpcInfo)
  76. if tx.Error != nil {
  77. return nil, tx.Error
  78. }
  79. // 提交job到指定集群
  80. jobId, err := submitJob(&hpcInfo, &clusterInfo)
  81. if err != nil {
  82. return nil, err
  83. }
  84. // 保存操作记录
  85. noticeInfo := clientCore.NoticeInfo{
  86. AdapterId: clusterInfo.AdapterId,
  87. AdapterName: adapterName,
  88. ClusterId: req.ClusterId,
  89. ClusterName: clusterInfo.Name,
  90. NoticeType: "create",
  91. TaskName: req.Name,
  92. Incident: "任务创建中",
  93. CreatedTime: time.Now(),
  94. }
  95. result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
  96. if result.Error != nil {
  97. logx.Errorf("Task creation failure, err: %v", result.Error)
  98. }
  99. resp = &types.CommitHpcTaskResp{
  100. JobId: string(jobId),
  101. }
  102. return resp, nil
  103. }
  104. func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo) (string, error) {
  105. client, err := slurm.NewClient(slurm.ClientOptions{
  106. URL: clusterInfo.Server,
  107. ClientVersion: clusterInfo.Version,
  108. RestUserName: clusterInfo.Username,
  109. Token: clusterInfo.Token})
  110. if err != nil {
  111. return "", err
  112. }
  113. job, err := client.Job(slurm.JobOptions{})
  114. if err != nil {
  115. return "", err
  116. }
  117. // 封装请求参数
  118. submitReq := slurm.JobOptions{
  119. Script: hpcInfo.CmdScript,
  120. Job: &slurm.JobProperties{
  121. Account: hpcInfo.Account,
  122. Name: hpcInfo.Name,
  123. NTasks: 1,
  124. CurrentWorkingDirectory: hpcInfo.WorkDir,
  125. Partition: hpcInfo.Partition,
  126. Environment: map[string]string{"PATH": clusterInfo.EnvPath,
  127. "LD_LIBRARY_PATH": clusterInfo.EnvLdPath},
  128. },
  129. }
  130. submitReq.Job.StandardOutput = submitReq.Job.CurrentWorkingDirectory + "/%j.out"
  131. submitReq.Job.StandardError = submitReq.Job.CurrentWorkingDirectory + "/%j.err"
  132. jobResp, err := job.SubmitJob(submitReq)
  133. if err != nil {
  134. return "", err
  135. }
  136. jobId := strconv.Itoa(jobResp.JobId)
  137. return jobId, nil
  138. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.