Browse Source

调度结构修改2

Former-commit-id: 01bea42264
pull/9/head
tzwang 2 years ago
parent
commit
afb036b7d2
5 changed files with 45 additions and 72 deletions
  1. +8
    -19
      api/internal/mqs/kq/ScheduleAi.go
  2. +7
    -20
      api/internal/mqs/kq/ScheduleCloud.go
  3. +11
    -25
      api/internal/mqs/kq/ScheduleHpc.go
  4. +5
    -1
      api/internal/pkg/scheduler/hpcScheduler.go
  5. +14
    -7
      api/internal/pkg/scheduler/scheduler.go

+ 8
- 19
api/internal/mqs/kq/ScheduleAi.go View File

@@ -2,12 +2,8 @@ package kq

import (
"context"
"encoding/json"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"gitlink.org.cn/jcce-pcm/utils/tool"
)

/*
@@ -27,25 +23,18 @@ func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleA
}

func (l *ScheduleAiMq) Consume(_, val string) error {
// 接受消息
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)

participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler.NewAiScheduler(val)
schdl, err := scheduler.NewScheduler(aiSchdl, val)
if err != nil {
return err
}
ai := model.Ai{
ParticipantId: participantId[0],
TaskId: task.TaskId,
Status: "Saved",
YamlString: val,
}
tool.Convert(task.Metadata, &ai)
schdl.MatchLabels(l.svcCtx.DbEngin)

// 存储数据
tx := l.svcCtx.DbEngin.Create(&ai)
if tx.Error != nil {
return tx.Error
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {
return err
}
return nil
}

+ 7
- 20
api/internal/mqs/kq/ScheduleCloud.go View File

@@ -3,11 +3,8 @@ package kq
import (
"bytes"
"context"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -67,28 +64,18 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
}

func (l *ScheduleCloudMq) Consume(_, val string) error {
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)
// 根据标签筛选过滤
participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// 接受消息, 根据标签筛选过滤
cloudSchdl := scheduler.NewCloudScheduler()
schdl, err := scheduler.NewScheduler(cloudSchdl, val)
if err != nil {
return err
}
// 构建提交作业到云算的结构体
bytes, err := json.Marshal(task.Metadata)
schdl.MatchLabels(l.svcCtx.DbEngin)

// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {
return err
}
cloud := UnMarshalK8sStruct(string(bytes), task.TaskId)
cloud.YamlString = string(bytes)
if len(participantId) != 0 {
cloud.ParticipantId = participantId[0]
}
// 存储数据
tx := l.svcCtx.DbEngin.Create(&cloud)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
return nil
}

+ 11
- 25
api/internal/mqs/kq/ScheduleHpc.go View File

@@ -2,12 +2,8 @@ package kq

import (
"context"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"gitlink.org.cn/jcce-pcm/utils/tool"
)

/*
@@ -27,28 +23,18 @@ func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedule
}

func (l *ScheduleHpcMq) Consume(_, val string) error {
// 接受消息
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)
//if len(task.MatchLabels) != 0 {
// participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// if err != nil {
// return err
// }
//}

hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",
//ParticipantId: participantId[0],
YamlString: val,
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler.NewHpcScheduler(val)
schdl, err := scheduler.NewScheduler(hpcSchdl, val)
if err != nil {
return err
}
tool.Convert(task.Metadata, &hpc)
schdl.MatchLabels(l.svcCtx.DbEngin)

// 存储数据
tx := l.svcCtx.DbEngin.Create(&hpc)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {
return err
}
return nil
}

+ 5
- 1
api/internal/pkg/scheduler/hpcScheduler.go View File

@@ -10,7 +10,11 @@ type hpcScheduler struct {
yamlString string
}

func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int) (interface{}, error) {
func NewHpcScheduler(val string) *hpcScheduler {
return &hpcScheduler{yamlString: val}
}

func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",


+ 14
- 7
api/internal/pkg/scheduler/scheduler.go View File

@@ -14,20 +14,27 @@ type scheduler struct {
scheduleService scheduleService
}

func NewScheduler(task *types.TaskInfo, val string) (*scheduler, error) {
func NewScheduler(scheduleService scheduleService, val string) (*scheduler, error) {
var task *types.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task}, nil
return &scheduler{task: task, scheduleService: scheduleService}, nil
}

func (s scheduler) matchLabels(dbEngin *gorm.DB, task *types.TaskInfo) {
func (s scheduler) MatchLabels(dbEngin *gorm.DB) {
//if len(task.MatchLabels) != 0 {
// participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// if err != nil {
// return err
// }
//}
var ids []int64
count := 0
for key := range task.MatchLabels {
for key := range s.task.MatchLabels {
var participantId []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, task.MatchLabels[key]).Scan(&participantId)
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantId)
if count == 0 {
ids = participantId
}
@@ -40,7 +47,7 @@ func (s scheduler) matchLabels(dbEngin *gorm.DB, task *types.TaskInfo) {
s.participantIds = micsSlice(ids, 1)
}

func (s scheduler) saveToDb(dbEngin *gorm.DB) error {
func (s scheduler) SaveToDb(dbEngin *gorm.DB) error {
if len(s.participantIds) == 0 {
return errors.New("participantIds 为空")
}
@@ -48,7 +55,7 @@ func (s scheduler) saveToDb(dbEngin *gorm.DB) error {
if err != nil {
return err
}
tx := dbEngin.Create(&structForDb)
tx := dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error


Loading…
Cancel
Save