Browse Source

updated scheduleResult

Former-commit-id: 38db46a22a
pull/176/head
tzwang 1 year ago
parent
commit
cda53ae916
6 changed files with 90 additions and 12 deletions
  1. +1
    -0
      api/desc/schedule/pcm-schedule.api
  2. +11
    -2
      api/internal/logic/ai/getcentertasklistlogic.go
  3. +70
    -8
      api/internal/logic/core/pagelisttasklogic.go
  4. +1
    -1
      api/internal/scheduler/database/aiStorage.go
  5. +2
    -0
      api/internal/scheduler/schedulers/aiScheduler.go
  6. +5
    -1
      api/internal/storeLink/octopus.go

+ 1
- 0
api/desc/schedule/pcm-schedule.api View File

@@ -19,6 +19,7 @@ type (
ScheduleResult { ScheduleResult {
ClusterId string `json:"clusterId"` ClusterId string `json:"clusterId"`
TaskId string `json:"taskId"` TaskId string `json:"taskId"`
Card string `json:"card"`
Strategy string `json:"strategy"` Strategy string `json:"strategy"`
Replica int32 `json:"replica"` Replica int32 `json:"replica"`
Msg string `json:"msg"` Msg string `json:"msg"`


+ 11
- 2
api/internal/logic/ai/getcentertasklistlogic.go View File

@@ -2,6 +2,8 @@ package ai


import ( import (
"context" "context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv" "strconv"
"sync" "sync"
@@ -46,6 +48,9 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
if err != nil { if err != nil {
continue continue
} }
if len(taskList) == 0 {
continue
}
for _, task := range taskList { for _, task := range taskList {
var elapsed time.Duration var elapsed time.Duration
switch task.Status { switch task.Status {
@@ -82,7 +87,6 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
return resp, nil return resp, nil
} }

} }


func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
@@ -92,15 +96,20 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
if err != nil { if err != nil {
continue continue
} }
if len(taskList) == 0 {
continue
}
for _, task := range taskList { for _, task := range taskList {
t := task t := task
if t.Status == constants.Completed || t.JobId == "" {
if t.Status == constants.Completed {
continue continue
} }
wg.Add(1) wg.Add(1)
go func() { go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil { if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done() wg.Done()
return return
} }


+ 70
- 8
api/internal/logic/core/pagelisttasklogic.go View File

@@ -2,12 +2,16 @@ package core


import ( import (
"context" "context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"strconv"
"sync"
"time" "time"


"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
@@ -53,8 +57,9 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
} }


// 更新智算任务状态 // 更新智算任务状态
var ch = make(chan struct{})
go l.updateAitaskStatus(list, ch)
chs := [2]chan struct{}{make(chan struct{}), make(chan struct{})}
go l.updateTaskStatus(list, chs[0])
go l.updateAiTaskStatus(list, chs[1])


for _, model := range list { for _, model := range list {
if model.StartTime != "" && model.EndTime == "" { if model.StartTime != "" && model.EndTime == "" {
@@ -72,15 +77,18 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
resp.PageNum = req.PageNum resp.PageNum = req.PageNum
resp.Total = total resp.Total = total


select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil
for _, ch := range chs {
select {
case <-ch:
return
case <-time.After(1 * time.Second):
return
}
} }
return
} }


func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for _, task := range tasks { for _, task := range tasks {
if task.AdapterTypeDict != 1 { if task.AdapterTypeDict != 1 {
continue continue
@@ -150,8 +158,62 @@ func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan


tx = l.svcCtx.DbEngin.Table("task").Updates(task) tx = l.svcCtx.DbEngin.Table("task").Updates(task)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return return
} }
} }
ch <- struct{}{} ch <- struct{}{}
} }

func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
var wg sync.WaitGroup
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
}
if task.Status == constants.Succeeded {
continue
}

var aiTaskList []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}

if len(aiTaskList) == 0 {
continue
}

for _, aitask := range aiTaskList {
t := aitask
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

+ 1
- 1
api/internal/scheduler/database/aiStorage.go View File

@@ -73,7 +73,7 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo,


func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) { func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
var resp []*models.TaskAi var resp []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Order("commit_time desc").Scan(&resp)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
return nil, tx.Error return nil, tx.Error


+ 2
- 0
api/internal/scheduler/schedulers/aiScheduler.go View File

@@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
@@ -222,6 +223,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
} }
} }
} }
logx.Errorf(errors.New(errmsg).Error())
return nil, errors.New(errmsg) return nil, errors.New(errmsg)
} }




+ 5
- 1
api/internal/storeLink/octopus.go View File

@@ -493,7 +493,11 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll
} }
jobresp, ok := (resp).(*octopus.GetTrainJobResp) jobresp, ok := (resp).(*octopus.GetTrainJobResp)
if !jobresp.Success || !ok { if !jobresp.Success || !ok {
return nil, errors.New("get training task failed")
if jobresp.Error != nil {
return nil, errors.New(jobresp.Error.Message)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
} }
var task collector.Task var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id task.Id = jobresp.Payload.TrainJob.Id


Loading…
Cancel
Save