Browse Source

Merge pull request 'update aijoblog' (#539) from tzwang/pcm-coordinator:master into master

pull/541/head
tzwang 2 months ago
parent
commit
3ee60519e9
2 changed files with 38 additions and 2 deletions
  1. +27
    -2
      internal/logic/schedule/schedulegetaijoblogloglogic.go
  2. +11
    -0
      internal/scheduler/database/aiStorage.go

+ 27
- 2
internal/logic/schedule/schedulegetaijoblogloglogic.go View File

@@ -2,6 +2,9 @@ package schedule


import ( import (
"context" "context"
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv"


"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@@ -26,11 +29,33 @@ func NewScheduleGetAiJobLogLogLogic(ctx context.Context, svcCtx *svc.ServiceCont
func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) { func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) {
resp = &types.AiJobLogResp{} resp = &types.AiJobLogResp{}


id, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskIdByClusterIdAndTaskId(req.ClusterId, req.TaskId)
taskId, err := strconv.ParseInt(req.TaskId, 10, 64)
if err != nil { if err != nil {
return nil, err return nil, err
} }
log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, id, req.InstanceNum)
task, err := l.svcCtx.Scheduler.AiStorages.GetTaskById(taskId)
if err != nil {
return nil, err
}

aiTasks, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByTaskId(req.TaskId)
if err != nil {
return nil, err
}

if len(aiTasks) == 0 && task.Status == constants.Failed {
return nil, errors.New("submit failed, no log available")
} else if len(aiTasks) > 1 {
return nil, errors.New("multiple ai task found")
}

aiTask := aiTasks[0]
adapterId := strconv.FormatInt(aiTask.AdapterId, 10)
clusterId := strconv.FormatInt(aiTask.ClusterId, 10)

jobId := aiTask.JobId

log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapterId][clusterId].GetTrainingTaskLog(l.ctx, jobId, req.InstanceNum)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 11
- 0
internal/scheduler/database/aiStorage.go View File

@@ -99,6 +99,17 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, e
return resp, nil return resp, nil
} }


func (s *AiStorage) GetAiTasksByTaskId(taskId string) ([]*models.TaskAi, error) {
var resp []*models.TaskAi
db := s.DbEngin.Model(&models.TaskAi{}).Table("task_ai")
db = db.Where("task_id = ?", taskId)
err := db.Order("commit_time desc").Find(&resp).Error
if err != nil {
return nil, err
}
return resp, nil
}

func (s *AiStorage) GetAiTaskListById(id int64) ([]*models.TaskAi, error) { func (s *AiStorage) GetAiTaskListById(id int64) ([]*models.TaskAi, error) {
var aiTaskList []*models.TaskAi var aiTaskList []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList) tx := s.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList)


Loading…
Cancel
Save