Browse Source

UpdateHpcTaskStatus

Signed-off-by: jagger <cossjie@foxmail.com>
pull/464/head
jagger 7 months ago
parent
commit
a988a0aa52
3 changed files with 11 additions and 4 deletions
  1. +9
    -0
      internal/logic/hpc/commithpctasklogic.go
  2. +0
    -4
      internal/scheduler/service/hpc/slurm.go
  3. +2
    -0
      internal/scheduler/service/utils/status/hpc_task_sync.go

+ 9
- 0
internal/logic/hpc/commithpctasklogic.go View File

@@ -39,6 +39,7 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com
} }


func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
req.Parameters["jobName"] = generateJobName(req)
reqStr, _ := jsoniter.MarshalToString(req) reqStr, _ := jsoniter.MarshalToString(req)
yaml := utils.StringToYaml(reqStr) yaml := utils.StringToYaml(reqStr)
var clusterInfo types.ClusterInfo var clusterInfo types.ClusterInfo
@@ -156,3 +157,11 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10) resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10)
return resp, nil return resp, nil
} }

// generateJobName 根据条件生成 jobName
func generateJobName(req *types.CommitHpcTaskReq) string {
if req.OperateType == "" {
return req.Name
}
return req.Name + "_" + req.OperateType
}

+ 0
- 4
internal/scheduler/service/hpc/slurm.go View File

@@ -3,7 +3,6 @@ package hpcservice
import ( import (
"context" "context"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@@ -39,11 +38,9 @@ func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector
SetPathParam("backend", "slurm"). SetPathParam("backend", "slurm").
SetResult(&hpcResp). SetResult(&hpcResp).
Get(reqUrl) Get(reqUrl)
logx.Info("远程调用p端接口开始")
if err != nil { if err != nil {
return nil, err return nil, err
} }
logx.Info("远程调用p端接口完成")
var resp collector.Task var resp collector.Task
resp.Id = hpcResp.Data.ID resp.Id = hpcResp.Data.ID
if !hpcResp.Data.StartTime.IsZero() { if !hpcResp.Data.StartTime.IsZero() {
@@ -76,7 +73,6 @@ func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector


func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) { func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
reqUrl := c.host + SubmitTaskUrl reqUrl := c.host + SubmitTaskUrl
req.Parameters["jobName"] = req.Name + "_" + req.OperateType
resp := types.CommitHpcTaskResp{} resp := types.CommitHpcTaskResp{}
httpClient := resty.New().R() httpClient := resty.New().R()
_, err := httpClient.SetHeader("Content-Type", "application/json"). _, err := httpClient.SetHeader("Content-Type", "application/json").


+ 2
- 0
internal/scheduler/service/utils/status/hpc_task_sync.go View File

@@ -74,6 +74,7 @@ func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败") svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
hpc.Status = hpcTask.Status hpc.Status = hpcTask.Status
task.Status = hpcTask.Status task.Status = hpcTask.Status
log.Info().Msgf("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败") _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
} }
case constants.Completed: case constants.Completed:
@@ -81,6 +82,7 @@ func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成") svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
hpc.Status = hpcTask.Status hpc.Status = hpcTask.Status
task.Status = hpcTask.Status task.Status = hpcTask.Status
log.Info().Msgf("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成") _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
} }
default: default:


Loading…
Cancel
Save