Browse Source

update statusReport

pull/433/head
tzwang 8 months ago
parent
commit
1ec680a49a
3 changed files with 33 additions and 13 deletions
  1. +8
    -8
      internal/scheduler/schedulers/aiScheduler.go
  2. +3
    -5
      internal/scheduler/service/utils/jcs/middleware.go
  3. +22
    -0
      internal/scheduler/service/utils/status/taskStatusSync.go

+ 8
- 8
internal/scheduler/schedulers/aiScheduler.go View File

@@ -259,9 +259,9 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass


//report msg //report msg
report := &jcs.JobStatusReportReq{ report := &jcs.JobStatusReportReq{
JobSetID: "",
LocalJobID: "",
Messages: make([]*jcs.ReportMessage, 0),
TaskName: "",
TaskID: strconv.FormatInt(taskId, 10),
Messages: make([]*jcs.ReportMessage, 0),
} }


var errmsg string var errmsg string
@@ -282,11 +282,10 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass


//add report msg //add report msg
jobMsg := &jcs.ReportMessage{ jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false, Status: false,
Message: msg, Message: msg,
ClusterID: e.clusterId, ClusterID: e.clusterId,
Output: "",
} }
report.Messages = append(report.Messages, jobMsg) report.Messages = append(report.Messages, jobMsg)
} }
@@ -311,17 +310,18 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
} }
//add report msg //add report msg
jobMsg := &jcs.ReportMessage{ jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false, Status: false,
Message: s.Msg, Message: s.Msg,
ClusterID: s.ClusterId, ClusterID: s.ClusterId,
Output: "",
} }
report.Messages = append(report.Messages, jobMsg) report.Messages = append(report.Messages, jobMsg)
} }


//report status //report status
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.Url, report)
if mode == executor.SUBMIT_MODE_STORAGE_SCHEDULE {
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.Url, report)
}


logx.Errorf(errors.New(errmsg).Error()) logx.Errorf(errors.New(errmsg).Error())
return errors.New(errmsg) return errors.New(errmsg)


+ 3
- 5
internal/scheduler/service/utils/jcs/middleware.go View File

@@ -5,13 +5,11 @@ import (
) )


type JobStatusReportReq struct { type JobStatusReportReq struct {
JobSetID string `json:"jobSetID"`
LocalJobID string `json:"localJobID"`
Messages []*ReportMessage `json:"messages"`
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Messages []*ReportMessage `json:"messages"`
} }
type ReportMessage struct { type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"` Status bool `json:"status"`
Message string `json:"message"` Message string `json:"message"`
ClusterID string `json:"clusterID"` ClusterID string `json:"clusterID"`


+ 22
- 0
internal/scheduler/service/utils/status/taskStatusSync.go View File

@@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@@ -61,6 +62,7 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
if len(aiTask) == 1 { if len(aiTask) == 1 {
if aiTask[0].Status == constants.Completed { if aiTask[0].Status == constants.Completed {
task.Status = constants.Succeeded task.Status = constants.Succeeded
_ = reportStatusMessages(svc, task, aiTask[0])
} else { } else {
task.Status = aiTask[0].Status task.Status = aiTask[0].Status
} }
@@ -142,6 +144,26 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
} }
} }


func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
report := &jcs.JobStatusReportReq{
TaskName: task.Name,
TaskID: strconv.FormatInt(task.Id, 10),
Messages: make([]*jcs.ReportMessage, 0),
}
//add report msg
jobMsg := &jcs.ReportMessage{
Status: true,
Message: "",
ClusterID: strconv.FormatInt(aiTask.ClusterId, 10),
Output: aiTask.JobId,
}
report.Messages = append(report.Messages, jobMsg)

_ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.Url, report)

return nil
}

func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) { func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id) aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
if err != nil { if err != nil {


Loading…
Cancel
Save