diff --git a/internal/handler/routes.go b/internal/handler/routes.go index b6acb0b3..b810acb6 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -1757,6 +1757,12 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/xjlab/taskStatusStatistics", Handler: xjlab.TaskStatusStatisticsHandler(serverCtx), }, + { + //单任务失败分析 + Method: http.MethodGet, + Path: "/xjlab/task/failureAnalyze", + Handler: xjlab.TaskFailureAnalyzeHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/internal/handler/xjlab/task.go b/internal/handler/xjlab/task.go index b59dd6a6..e8a3f1e8 100644 --- a/internal/handler/xjlab/task.go +++ b/internal/handler/xjlab/task.go @@ -62,3 +62,17 @@ func TaskStatusStatisticsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { result.HttpResult(r, w, resp, err) } } + +func TaskFailureAnalyzeHandler(ctx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.FId + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + + l := xjlab.NewTaskFailureAnalyzeLogic(r.Context(), ctx) + resp, err := l.TaskFailureAnalyze(&req) + result.HttpResult(r, w, resp, err) + } +} diff --git a/internal/logic/schedule/schedulegetaijoblogloglogic.go b/internal/logic/schedule/schedulegetaijoblogloglogic.go index a8529b1c..22d10f6d 100644 --- a/internal/logic/schedule/schedulegetaijoblogloglogic.go +++ b/internal/logic/schedule/schedulegetaijoblogloglogic.go @@ -2,6 +2,9 @@ package schedule import ( "context" + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "strconv" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -26,11 +29,33 @@ func NewScheduleGetAiJobLogLogLogic(ctx context.Context, svcCtx *svc.ServiceCont func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) { resp = &types.AiJobLogResp{} - id, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskIdByClusterIdAndTaskId(req.ClusterId, req.TaskId) + taskId, err := strconv.ParseInt(req.TaskId, 10, 64) if err != nil { return nil, err } - log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, id, req.InstanceNum) + task, err := l.svcCtx.Scheduler.AiStorages.GetTaskById(taskId) + if err != nil { + return nil, err + } + + aiTasks, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByTaskId(req.TaskId) + if err != nil { + return nil, err + } + + if len(aiTasks) == 0 && task.Status == constants.Failed { + return nil, errors.New("submit failed, no log available") + } else if len(aiTasks) > 1 { + return nil, errors.New("multiple ai task found") + } + + aiTask := aiTasks[0] + adapterId := strconv.FormatInt(aiTask.AdapterId, 10) + clusterId := strconv.FormatInt(aiTask.ClusterId, 10) + + jobId := aiTask.JobId + + log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapterId][clusterId].GetTrainingTaskLog(l.ctx, jobId, req.InstanceNum) if err != nil { return nil, err } diff --git a/internal/logic/xjlab/task_analyze.go b/internal/logic/xjlab/task_analyze.go new file mode 100644 index 00000000..14c18622 --- /dev/null +++ b/internal/logic/xjlab/task_analyze.go @@ -0,0 +1,82 @@ +package xjlab + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gorm.io/gorm" +) + +type TaskFailureAnalyzeLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext + hpcService *service.HpcService +} + +func NewTaskFailureAnalyzeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskFailureAnalyzeLogic { + cache := make(map[string]interface{}, 10) + hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache) + if err != nil { + return nil + } + return &TaskFailureAnalyzeLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + hpcService: hpcService, + } +} + +func (l *TaskFailureAnalyzeLogic) TaskFailureAnalyze(req *types.FId) (interface{}, error) { + task := &models.Task{} + var resp interface{} + if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) { + return nil, errors.New("记录不存在") + } + switch task.AdapterTypeDict { + case constants.AdapterTypeCloud: + return nil, nil + case constants.AdapterTypeAI: + return nil, nil + case constants.AdapterTypeHPC: + // 获取HPC任务失败分析 + usage, err := l.GetHpcTaskFailureAnalyze(req) + if err != nil { + return nil, err + } + resp = usage + } + return resp, nil +} + +func (l *TaskFailureAnalyzeLogic) GetHpcTaskFailureAnalyze(req *types.FId) (resp interface{}, err error) { + var hpcR TaskHPCResult + tx := l.svcCtx.DbEngin.Raw( + "SELECT t.id, hpc.job_id ,hpc.adapter_id ,hpc.cluster_id FROM task t "+ + "INNER JOIN task_hpc hpc ON t.id = hpc.task_id "+ + "WHERE adapter_type_dict = 2 AND t.id = ?", + req.Id, + ).Scan(&hpcR).Error + if tx != nil { + return nil, fmt.Errorf("数据库查询失败: %v", tx.Error) + } + if hpcR.ID == 0 { + return nil, fmt.Errorf("任务不存在") + } + + // 获取资源使用情况 + resp, err = l.hpcService.HpcExecutorAdapterMap[hpcR.AdapterId].GetHpcTaskFailureAnalyze(l.ctx, hpcR.JobID, hpcR.ClusterId) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index 3dc99ca6..4937b444 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -99,6 +99,17 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, e return resp, nil } +func (s *AiStorage) GetAiTasksByTaskId(taskId string) ([]*models.TaskAi, error) { + var resp []*models.TaskAi + db := s.DbEngin.Model(&models.TaskAi{}).Table("task_ai") + db = db.Where("task_id = ?", taskId) + err := db.Order("commit_time desc").Find(&resp).Error + if err != nil { + return nil, err + } + return resp, nil +} + func (s *AiStorage) GetAiTaskListById(id int64) ([]*models.TaskAi, error) { var aiTaskList []*models.TaskAi tx := s.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList) diff --git a/internal/scheduler/service/collector/hpc_collector.go b/internal/scheduler/service/collector/hpc_collector.go index 6e4c70be..a5286daf 100644 --- a/internal/scheduler/service/collector/hpc_collector.go +++ b/internal/scheduler/service/collector/hpc_collector.go @@ -14,7 +14,7 @@ type HPCCollector interface { CancelTask(ctx context.Context, jobId string, clusterId string) error GetTaskLogs(ctx context.Context, jobId string, clusterId string) (interface{}, error) GetTaskResourceUsage(ctx context.Context, jobId string, clusterId string) (interface{}, error) - //RescheduleTask(ctx context.Context, jobId string, clusterId string) error + GetHpcTaskFailureAnalyze(ctx context.Context, jobId string, clusterId string) (interface{}, error) } type JobInfo struct { diff --git a/internal/scheduler/service/hpc/slurm.go b/internal/scheduler/service/hpc/slurm.go index 2ef06e30..31d92334 100644 --- a/internal/scheduler/service/hpc/slurm.go +++ b/internal/scheduler/service/hpc/slurm.go @@ -30,6 +30,7 @@ const ( JobLogUrl = "/api/v1/hpc/jobs/logs/{clusterId}/{jobId}" CancelTaskUrl = "/api/v1/hpc/jobs/cancel/{clusterId}/{jobId}" JobResourceUsageUrl = "/api/v1/hpc/jobs/resource/usage/{clusterId}/{jobId}" + JobFailureAnalyze = "/api/v1/hpc/task/analyze" ) func NewHpc(host string, id int64, platform string) *ParticipantHpc { @@ -205,3 +206,28 @@ func (c *ParticipantHpc) GetTaskResourceUsage(ctx context.Context, jobId string, } return resp, nil } + +func (c *ParticipantHpc) GetHpcTaskFailureAnalyze(ctx context.Context, jobId string, clusterId string) (interface{}, error) { + logx.WithContext(ctx).Infof("获取超算集群任务失败分析, url: %s, jobId: %s", JobFailureAnalyze, jobId) + if jobId == "" { + return nil, fmt.Errorf("jobId is empty") + } + resp := types.CommonResp{} + _, err := c.Request(JobFailureAnalyze, http.MethodPost, func(req *resty.Request) { + req.SetHeaders(map[string]string{ + "Content-Type": "application/json", + "traceId": result.TraceIDFromContext(ctx), + }).SetBody(map[string]string{ + "JobId": jobId, + "clusterId": clusterId, + "clusterType": "hpc", + }).SetResult(&resp) + }) + if err != nil { + return nil, err + } + if resp.Code != http.StatusOK { + return nil, fmt.Errorf(resp.Msg) + } + return resp, nil +} diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index 7ab1a454..57ce1364 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -240,7 +240,7 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame JobType: TRAIN, Cluster: C2NET, - DisplayJobName: TRAIN + UNDERSCORE + utils.RandomString(10), + DisplayJobName: strings.ToLower(TRAIN + UNDERSCORE + utils.RandomString(10)), ComputeSource: computeSource, SpecId: int(specId), BranchName: branchName, diff --git a/pkg/constants/task.go b/pkg/constants/task.go index e9fb5841..ff4ce490 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -16,19 +16,20 @@ package constants // 任务状态类型 const ( - Saved = "Saved" - Running = "Running" WaitDelete = "WaitDelete" Deleted = "Deleted" - Completed = "Completed" - Succeeded = "Succeeded" - Failed = "Failed" WaitRestart = "WaitRestart" WaitPause = "WaitPause" WaitStart = "WaitStart" - Pending = "Pending" - Stopped = "Stopped" - Deploying = "Deploying" - Cancelled = "Cancelled" - Waiting = "Waiting" + + Completed = "Completed" //已完成 + Succeeded = "Succeeded" //成功 + Failed = "Failed" //失败 + Saved = "Saved" //已保存 + Running = "Running" //运行中 + Pending = "Pending" //挂起 + Stopped = "Stopped" //已停止 + Deploying = "Deploying" //部署中 + Cancelled = "Cancelled" //已取消 + Waiting = "Waiting" //等待中 )