Browse Source

feat: add task failure analysis endpoint and logic

Signed-off-by: jagger <cossjie@foxmail.com>
pull/541/head
jagger 2 months ago
parent
commit
c206111e47
5 changed files with 129 additions and 1 deletions
  1. +6
    -0
      internal/handler/routes.go
  2. +14
    -0
      internal/handler/xjlab/task.go
  3. +82
    -0
      internal/logic/xjlab/task_analyze.go
  4. +1
    -1
      internal/scheduler/service/collector/hpc_collector.go
  5. +26
    -0
      internal/scheduler/service/hpc/slurm.go

+ 6
- 0
internal/handler/routes.go View File

@@ -1757,6 +1757,12 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/xjlab/taskStatusStatistics",
Handler: xjlab.TaskStatusStatisticsHandler(serverCtx),
},
{
//单任务失败分析
Method: http.MethodGet,
Path: "/xjlab/task/failureAnalyze",
Handler: xjlab.TaskFailureAnalyzeHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)


+ 14
- 0
internal/handler/xjlab/task.go View File

@@ -62,3 +62,17 @@ func TaskStatusStatisticsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
result.HttpResult(r, w, resp, err)
}
}

func TaskFailureAnalyzeHandler(ctx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.FId
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}

l := xjlab.NewTaskFailureAnalyzeLogic(r.Context(), ctx)
resp, err := l.TaskFailureAnalyze(&req)
result.HttpResult(r, w, resp, err)
}
}

+ 82
- 0
internal/logic/xjlab/task_analyze.go View File

@@ -0,0 +1,82 @@
package xjlab

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
)

type TaskFailureAnalyzeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
hpcService *service.HpcService
}

func NewTaskFailureAnalyzeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskFailureAnalyzeLogic {
cache := make(map[string]interface{}, 10)
hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache)
if err != nil {
return nil
}
return &TaskFailureAnalyzeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
hpcService: hpcService,
}
}

func (l *TaskFailureAnalyzeLogic) TaskFailureAnalyze(req *types.FId) (interface{}, error) {
task := &models.Task{}
var resp interface{}
if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) {
return nil, errors.New("记录不存在")
}
switch task.AdapterTypeDict {
case constants.AdapterTypeCloud:
return nil, nil
case constants.AdapterTypeAI:
return nil, nil
case constants.AdapterTypeHPC:
// 获取HPC任务失败分析
usage, err := l.GetHpcTaskFailureAnalyze(req)
if err != nil {
return nil, err
}
resp = usage
}
return resp, nil
}

func (l *TaskFailureAnalyzeLogic) GetHpcTaskFailureAnalyze(req *types.FId) (resp interface{}, err error) {
var hpcR TaskHPCResult
tx := l.svcCtx.DbEngin.Raw(
"SELECT t.id, hpc.job_id ,hpc.adapter_id ,hpc.cluster_id FROM task t "+
"INNER JOIN task_hpc hpc ON t.id = hpc.task_id "+
"WHERE adapter_type_dict = 2 AND t.id = ?",
req.Id,
).Scan(&hpcR).Error
if tx != nil {
return nil, fmt.Errorf("数据库查询失败: %v", tx.Error)
}
if hpcR.ID == 0 {
return nil, fmt.Errorf("任务不存在")
}

// 获取资源使用情况
resp, err = l.hpcService.HpcExecutorAdapterMap[hpcR.AdapterId].GetHpcTaskFailureAnalyze(l.ctx, hpcR.JobID, hpcR.ClusterId)
if err != nil {
return nil, err
}

return resp, nil
}

+ 1
- 1
internal/scheduler/service/collector/hpc_collector.go View File

@@ -14,7 +14,7 @@ type HPCCollector interface {
CancelTask(ctx context.Context, jobId string, clusterId string) error
GetTaskLogs(ctx context.Context, jobId string, clusterId string) (interface{}, error)
GetTaskResourceUsage(ctx context.Context, jobId string, clusterId string) (interface{}, error)
//RescheduleTask(ctx context.Context, jobId string, clusterId string) error
GetHpcTaskFailureAnalyze(ctx context.Context, jobId string, clusterId string) (interface{}, error)
}

type JobInfo struct {


+ 26
- 0
internal/scheduler/service/hpc/slurm.go View File

@@ -30,6 +30,7 @@ const (
JobLogUrl = "/api/v1/hpc/jobs/logs/{clusterId}/{jobId}"
CancelTaskUrl = "/api/v1/hpc/jobs/cancel/{clusterId}/{jobId}"
JobResourceUsageUrl = "/api/v1/hpc/jobs/resource/usage/{clusterId}/{jobId}"
JobFailureAnalyze = "/api/v1/hpc/task/analyze"
)

func NewHpc(host string, id int64, platform string) *ParticipantHpc {
@@ -205,3 +206,28 @@ func (c *ParticipantHpc) GetTaskResourceUsage(ctx context.Context, jobId string,
}
return resp, nil
}

func (c *ParticipantHpc) GetHpcTaskFailureAnalyze(ctx context.Context, jobId string, clusterId string) (interface{}, error) {
logx.WithContext(ctx).Infof("获取超算集群任务失败分析, url: %s, jobId: %s", JobFailureAnalyze, jobId)
if jobId == "" {
return nil, fmt.Errorf("jobId is empty")
}
resp := types.CommonResp{}
_, err := c.Request(JobFailureAnalyze, http.MethodPost, func(req *resty.Request) {
req.SetHeaders(map[string]string{
"Content-Type": "application/json",
"traceId": result.TraceIDFromContext(ctx),
}).SetBody(map[string]string{
"JobId": jobId,
"clusterId": clusterId,
"clusterType": "hpc",
}).SetResult(&resp)
})
if err != nil {
return nil, err
}
if resp.Code != http.StatusOK {
return nil, fmt.Errorf(resp.Msg)
}
return resp, nil
}

Loading…
Cancel
Save