Browse Source

UpdateHpcTaskStatus

Signed-off-by: jagger <cossjie@foxmail.com>
pull/464/head
jagger 7 months ago
parent
commit
c7941604f6
4 changed files with 63 additions and 150 deletions
  1. +1
    -7
      internal/cron/cron.go
  2. +1
    -2
      internal/logic/hpc/commithpctasklogic.go
  3. +4
    -4
      internal/middleware/logmiddleware.go
  4. +57
    -137
      internal/scheduler/service/utils/status/hpc_task_sync.go

+ 1
- 7
internal/cron/cron.go View File

@@ -52,13 +52,7 @@ func AddCronGroup(svc *svc.ServiceContext) {


//更新hpc任务状态 //更新hpc任务状态
svc.Cron.AddFunc("*/5 * * * * ?", func() { svc.Cron.AddFunc("*/5 * * * * ?", func() {
list, err := GetHpcTaskList(svc)
if err != nil {
logx.Errorf(err.Error())
return
}
status.UpdateTaskStatusByHpc(svc, list)
status.UpdateTaskHpcStatus(svc, list)
status.UpdateHpcTaskStatus(svc)
}) })


} }

+ 1
- 2
internal/logic/hpc/commithpctasklogic.go View File

@@ -6,7 +6,6 @@ import (
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv" "strconv"
@@ -85,7 +84,6 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Backend: req.Backend, Backend: req.Backend,
OperateType: req.OperateType, OperateType: req.OperateType,
CmdScript: req.Parameters["cmdScript"], CmdScript: req.Parameters["cmdScript"],
StartTime: time.Now().Format(constants.Layout),
CardCount: cardCount, CardCount: cardCount,
WorkDir: req.Parameters["workDir"], WorkDir: req.Parameters["workDir"],
WallTime: req.Parameters["wallTime"], WallTime: req.Parameters["wallTime"],
@@ -155,5 +153,6 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
if updates.Error != nil { if updates.Error != nil {
return nil, updates.Error return nil, updates.Error
} }
resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10)
return resp, nil return resp, nil
} }

+ 4
- 4
internal/middleware/logmiddleware.go View File

@@ -20,7 +20,7 @@ func LogMiddleware(next http.HandlerFunc) http.HandlerFunc {
proxy := &responseProxy{w: w} proxy := &responseProxy{w: w}
requestLog(r) requestLog(r)
next(proxy, r) next(proxy, r)
logx.Infof("LogMiddleware response uri:%s jsonResult :%+v", r.RequestURI, string(proxy.body))
logx.Debug("LogMiddleware response uri:%s jsonResult :%+v", r.RequestURI, string(proxy.body))
} }
} }


@@ -42,15 +42,15 @@ func (p *responseProxy) WriteHeader(statusCode int) {


func requestLog(r *http.Request) { func requestLog(r *http.Request) {
// 打印所有header // 打印所有header
logx.Infof("LogMiddleware request uri:%s header :%+v", r.RequestURI, r.Header)
logx.Debug("LogMiddleware request uri:%s header :%+v", r.RequestURI, r.Header)
// json日志 // json日志
if withJsonBody(r) { if withJsonBody(r) {
requestDump, err := httputil.DumpRequest(r, true) requestDump, err := httputil.DumpRequest(r, true)
logx.Infof("LogMiddleware request uri:%s jsonParams :%+v, err:%+v", r.RequestURI, string(requestDump), err)
logx.Debug("LogMiddleware request uri:%s jsonParams :%+v, err:%+v", r.RequestURI, string(requestDump), err)
} else { } else {
// form表单日志和其他 // form表单日志和其他
formParams, err := httpx.GetFormValues(r) formParams, err := httpx.GetFormValues(r)
logx.Infof("LogMiddleware request uri:%s formParams :%+v, err:%+v", r.RequestURI, formParams, err)
logx.Debug("LogMiddleware request uri:%s formParams :%+v, err:%+v", r.RequestURI, formParams, err)
} }
} }




+ 57
- 137
internal/scheduler/service/utils/status/hpc_task_sync.go View File

@@ -1,9 +1,7 @@
package status package status


import ( import (
"fmt"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
@@ -11,23 +9,20 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http" "net/http"
"strconv" "strconv"
"sync"
) )


func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc) error {
func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error {
report := &jcs.JobStatusReportReq{ report := &jcs.JobStatusReportReq{
TaskName: task.Name, TaskName: task.Name,
TaskID: hpcTask.JobId,
TaskID: strconv.FormatInt(task.Id, 10),
Messages: make([]*jcs.ReportMessage, 0), Messages: make([]*jcs.ReportMessage, 0),
} }


jobMsg := &jcs.ReportMessage{ jobMsg := &jcs.ReportMessage{
Status: true,
Message: "",
Status: status,
Message: message,
ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10), ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10),
Output: hpcTask.WorkDir, Output: hpcTask.WorkDir,
} }
@@ -42,148 +37,73 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc
return nil return nil
} }


// 更新主表的超算任务状态
func UpdateTaskStatusByHpc(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
// UpdateHpcTaskStatus 更新超算任务状态,并通知中间件
func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
svc.Scheduler.HpcService.TaskSyncLock.Lock() svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock() defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()

for _, task := range tasklist {
hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id)
taskList := make([]*models.TaskHpc, 0)
sqlStr := `select * from task_hpc where job_id!='' and status not in('Failed','Completed','Cancelled') order by created_time desc limit 10`
db := svc.DbEngin.Raw(sqlStr).Scan(&taskList)
if db.Error != nil {
logx.Errorf(db.Error.Error())
return
}
for _, hpc := range taskList {
//更新task表的超算任务状态
task := &types.TaskModel{}
tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
break
}
h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId)
if err != nil { if err != nil {
logx.Errorf(err.Error()) logx.Errorf(err.Error())
return
}
if len(hpcTaskList) == 0 {
break break
} }
logx.Errorf("############ Report Status Message Before switch %s", task.Status)
if len(hpcTaskList) == 1 {
logx.Errorf("############ Report Status Message Switch %s", hpcTaskList[0].Status)
switch hpcTaskList[0].Status {

case constants.Completed:
task.Status = constants.Succeeded
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
case constants.Running:
task.Status = constants.Running
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
case constants.Failed:
task.Status = constants.Failed
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
default:
task.Status = hpcTaskList[0].Status
switch hpcTask.Status {
case constants.Running:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
} }
task.StartTime = hpcTaskList[0].StartTime
task.EndTime = hpcTaskList[0].EndTime
err := svc.Scheduler.HpcStorages.UpdateTask(task)
if err != nil {
return
case constants.Failed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
_ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
} }
break
}
logx.Errorf("############ Report Status Message After switch %s", task.Status)
for i := len(hpcTaskList) - 1; i >= 0; i-- {
if hpcTaskList[i].StartTime == "" {
task.Status = hpcTaskList[i].Status
hpcTaskList = append(hpcTaskList[:i], hpcTaskList[i+1:]...)
case constants.Completed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
_ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
} }
}
if len(hpcTaskList) == 0 {
err := svc.Scheduler.HpcStorages.UpdateTask(task)
if err != nil {
break
default:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
} }
break
} }
}
}

// UpdateTaskHpcStatus 更新task_hpc表的任务状态
func UpdateTaskHpcStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
for _, task := range tasklist {
hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id)
task.StartTime = hpcTask.Start
task.EndTime = hpcTask.End
hpc.StartTime = hpcTask.Start
hpc.EndTime = hpcTask.End
logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime)
err = svc.Scheduler.HpcStorages.UpdateTask(task)
if err != nil { if err != nil {
logx.Errorf(err.Error()) logx.Errorf(err.Error())
return
}
if len(hpcTaskList) == 0 {
return
break
} }
updateHpcTask(svc, hpcTaskList...)
}

}

func updateHpcTask(svc *svc.ServiceContext, hpcTaskList ...*models.TaskHpc) {
var wg sync.WaitGroup
for _, hpc := range hpcTaskList {
t := hpc
if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
continue
err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc)
if err != nil {
logx.Errorf(err.Error())
break
} }
wg.Add(1)
go func() {
h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)].GetTask(h.Context(), t.JobId)
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}

msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
if hpcTask == nil {
wg.Done()
return
}
switch hpcTask.Status {
case constants.Running:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
t.Status = hpcTask.Status
}
case constants.Failed:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
t.Status = hpcTask.Status
}
case constants.Completed:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
t.Status = hpcTask.Status
}
default:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
t.Status = hpcTask.Status
}
}
t.StartTime = hpcTask.Start
t.EndTime = hpcTask.End
err = svc.Scheduler.HpcStorages.UpdateHpcTask(t)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Done()
}()
} }
wg.Wait()
} }

Loading…
Cancel
Save