Browse Source

调度结构修改

Former-commit-id: a45bd574fe
pull/9/head
tzwang 2 years ago
parent
commit
e0b48ecb39
11 changed files with 93 additions and 25 deletions
  1. +5
    -2
      api/etc/pcm.yaml
  2. +1
    -0
      api/internal/config/config.go
  3. +1
    -1
      api/internal/mqs/ScheduleAi.go
  4. +1
    -1
      api/internal/mqs/ScheduleCloud.go
  5. +1
    -1
      api/internal/mqs/ScheduleHpc.go
  6. +3
    -0
      api/internal/svc/servicecontext.go
  7. +1
    -1
      pkg/scheduler/aiScheduler.go
  8. +2
    -2
      pkg/scheduler/cloudScheduler.go
  9. +1
    -1
      pkg/scheduler/common.go
  10. +1
    -1
      pkg/scheduler/hpcScheduler.go
  11. +76
    -15
      pkg/scheduler/scheduler.go

+ 5
- 2
api/etc/pcm.yaml View File

@@ -4,10 +4,13 @@ NacosConfig:
ServerConfigs:
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: nacos.jcce.dev
# - IpAddr: 10.101.15.7
# Port: 8848
- IpAddr: 119.45.100.73
Port: 8848
ClientConfig:
NamespaceId: test
NamespaceId: tzwang
# NamespaceId: test
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:


+ 1
- 0
api/internal/config/config.go View File

@@ -38,6 +38,7 @@ type Config struct {
CephRpcConf zrpc.RpcClientConf
OpenstackRpcConf zrpc.RpcClientConf
OctopusRpcConf zrpc.RpcClientConf
PcmCoreRpcConf zrpc.RpcClientConf
NexusUrl string
JccScheduleUrl string
MinioConf struct {


+ 1
- 1
api/internal/mqs/ScheduleAi.go View File

@@ -39,7 +39,7 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler2.NewAiScheduler(val)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil {
return err
}


+ 1
- 1
api/internal/mqs/ScheduleCloud.go View File

@@ -39,7 +39,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq {
func (l *CloudMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler.NewCloudScheduler()
schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin)
schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
if err != nil {
return err
}


+ 1
- 1
api/internal/mqs/ScheduleHpc.go View File

@@ -39,7 +39,7 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq {
func (l *HpcMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler2.NewHpcScheduler(val)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil {
return err
}


+ 3
- 0
api/internal/svc/servicecontext.go View File

@@ -27,6 +27,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
@@ -55,6 +56,7 @@ type ServiceContext struct {
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes
ParticipantRpc participantservice.ParticipantService
}

func NewServiceContext(c config.Config) *ServiceContext {
@@ -101,6 +103,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,


+ 1
- 1
pkg/scheduler/aiScheduler.go View File

@@ -41,7 +41,7 @@ func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId
return ai, nil
}

func (as *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
func (as *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
return nil, nil
}



+ 2
- 2
pkg/scheduler/cloudScheduler.go View File

@@ -36,7 +36,7 @@ func NewCloudScheduler() *cloudScheduler {
return &cloudScheduler{}
}

func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
//参数为空返回 nil
if len(providers) == 0 || task == nil {
return nil, nil
@@ -48,7 +48,7 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
if err != nil {
return nil, err
}
return taskResult, nil
return taskResult.MaxscoreStrategy, nil
}

func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) {


pkg/scheduler/commonScheduler.go → pkg/scheduler/common.go View File

@@ -24,7 +24,7 @@ import (

type scheduleService interface {
getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error)
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error)
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error)
genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider)
}


+ 1
- 1
pkg/scheduler/hpcScheduler.go View File

@@ -42,7 +42,7 @@ func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId
return hpc, nil
}

func (h *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
func (h *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
return nil, nil
}



+ 76
- 15
pkg/scheduler/scheduler.go View File

@@ -15,28 +15,36 @@
package scheduler

import (
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
)

type Replicas int64

type ParticipantId int64

type scheduler struct {
task *response.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
result map[ParticipantId]Replicas
participantRpc participantservice.ParticipantService
}

func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB) (*scheduler, error) {
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) {
var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin}, nil
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}

func (s *scheduler) MatchLabels() {
@@ -92,19 +100,10 @@ func (s *scheduler) AssignAndSchedule() error {
return nil
}

//调度结果 ParticipantId
for i, e := range strategy.ResourcePerTask {

if len(e) != 0 {
for _, ej := range e {
if ej == 1 {
s.task.ParticipantId = providerList[i].Pid
break
}
}
} else {
continue
}
//调度结果
err = s.assignReplicasToResult(strategy, providerList)
if err != nil {
return err
}

return nil
@@ -130,3 +129,65 @@ func (s *scheduler) SaveToDb() error {
func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider) {
return s.scheduleService.genTaskAndProviders(s.task, s.dbEngin)
}

func (s *scheduler) checkAvailableParticipants() error {
resp, err := s.participantRpc.ListParticipant(context.Background(), nil)
if err != nil {
return err
}

if resp.Code != 200 {
return errors.New("集群列表查询失败")
}

var workingIds []int64
for _, e := range resp.Data {
if e.ClientState == "UNKNOWN" {
continue
}
workingIds = append(workingIds, e.ParticipantId)
}

for id, _ := range s.result {
if !contains(workingIds, int64(id)) {
return errors.Errorf("集群 %d 不可用", id)
}
}

if len(s.result) == 0 {
return errors.New("可用集群为空")
}

return nil
}

func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error {

if len(strategy.Tasksolution) == 0 {
return errors.New("调度失败, 未能获取调度结果")
}

for i, e := range strategy.Tasksolution {
if e == 0 {
continue
}
s.result[ParticipantId(providerList[i].Pid)] = Replicas(e)
}

// 查询集群是否可用
err := s.checkAvailableParticipants()
if err != nil {
return err
}

return nil
}

func contains(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

Loading…
Cancel
Save