Browse Source

调度结构修改

Former-commit-id: 39acb91436
pull/9/head
tzwang 2 years ago
parent
commit
f4e10c489d
6 changed files with 178 additions and 3 deletions
  1. +3
    -3
      api/internal/algo/strategyService.go
  2. +26
    -0
      api/internal/pkg/scheduler/aiScheduler.go
  3. +66
    -0
      api/internal/pkg/scheduler/cloudScheduler.go
  4. +4
    -0
      api/internal/pkg/scheduler/commonScheduler.go
  5. +22
    -0
      api/internal/pkg/scheduler/hpcScheduler.go
  6. +57
    -0
      api/internal/pkg/scheduler/scheduler.go

api/internal/algo/schedule.go → api/internal/algo/strategyService.go View File

@@ -1,11 +1,11 @@
package algo

type scheduleService interface {
type strategyService interface {
computeMaxScore() (*Task, error)
}

func ScheduleWithFullCollaboration(scheduleService scheduleService, ProviderList []*Provider) (*Task, error) {
task, err := scheduleService.computeMaxScore()
func ScheduleWithFullCollaboration(strategyService strategyService, ProviderList []*Provider) (*Task, error) {
task, err := strategyService.computeMaxScore()
if err != nil {
return nil, err
}

+ 26
- 0
api/internal/pkg/scheduler/aiScheduler.go View File

@@ -0,0 +1,26 @@
package scheduler

import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"gitlink.org.cn/jcce-pcm/utils/tool"
)

type aiScheduler struct {
yamlString string
}

func NewAiScheduler(val string) *aiScheduler {
return &aiScheduler{yamlString: val}
}

func (cs aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
ai := model.Ai{
ParticipantId: participantIds[0],
TaskId: task.TaskId,
Status: "Saved",
YamlString: cs.yamlString,
}
tool.Convert(task.Metadata, &ai)
return ai, nil
}

+ 66
- 0
api/internal/pkg/scheduler/cloudScheduler.go View File

@@ -1 +1,67 @@
package scheduler

import (
"bytes"
"encoding/json"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
kyaml "k8s.io/apimachinery/pkg/util/yaml"
)

type cloudScheduler struct {
}

func NewCloudScheduler() *cloudScheduler {
return &cloudScheduler{}
}

func (cs cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
bytes, err := json.Marshal(task.Metadata)
if err != nil {
return nil, err
}
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId)
cloud.YamlString = string(bytes)
if len(participantIds) != 0 {
cloud.ParticipantId = participantIds[0]
}
return cloud, nil
}

func (cs cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
var cloud model.Cloud
d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
var err error
for {
var rawObj runtime.RawExtension
err = d.Decode(&rawObj)
if err == io.EOF {
break
}
if err != nil {
}
obj := &unstructured.Unstructured{}
syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
if err != nil {
}

unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
}

unstructureObj := &unstructured.Unstructured{Object: unstructuredMap}
cloud = model.Cloud{
TaskId: taskId,
ApiVersion: unstructureObj.GetAPIVersion(),
Name: unstructureObj.GetName(),
Kind: unstructureObj.GetKind(),
Namespace: unstructureObj.GetNamespace(),
Status: "Saved",
}
}
return cloud
}

+ 4
- 0
api/internal/pkg/scheduler/commonScheduler.go View File

@@ -7,6 +7,10 @@ import (
"time"
)

type scheduleService interface {
getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error)
}

func MatchLabels(dbEngin *gorm.DB, task *types.TaskInfo) ([]int64, error) {
var ids []int64
count := 0


+ 22
- 0
api/internal/pkg/scheduler/hpcScheduler.go View File

@@ -0,0 +1,22 @@
package scheduler

import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"gitlink.org.cn/jcce-pcm/utils/tool"
)

type hpcScheduler struct {
yamlString string
}

func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int) (interface{}, error) {
hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",
//ParticipantId: participantId[0],
YamlString: h.yamlString,
}
tool.Convert(task.Metadata, &hpc)
return hpc, nil
}

+ 57
- 0
api/internal/pkg/scheduler/scheduler.go View File

@@ -0,0 +1,57 @@
package scheduler

import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gorm.io/gorm"
)

type scheduler struct {
task *types.TaskInfo
participantIds []int64
scheduleService scheduleService
}

func NewScheduler(task *types.TaskInfo, val string) (*scheduler, error) {
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task}, nil
}

func (s scheduler) matchLabels(dbEngin *gorm.DB, task *types.TaskInfo) {
var ids []int64
count := 0
for key := range task.MatchLabels {
var participantId []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, task.MatchLabels[key]).Scan(&participantId)
if count == 0 {
ids = participantId
}
//if len(participantId) == 0 || len(ids) == 0 {
// return nil, nil
//}
ids = intersect(ids, participantId)
count++
}
s.participantIds = micsSlice(ids, 1)
}

func (s scheduler) saveToDb(dbEngin *gorm.DB) error {
if len(s.participantIds) == 0 {
return errors.New("participantIds 为空")
}
structForDb, err := s.scheduleService().getNewStructForDb(s.task, s.participantIds)
if err != nil {
return err
}
tx := dbEngin.Create(&structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
return nil
}

Loading…
Cancel
Save