Browse Source

多实例任务代码测试后调整

pull/45/head
JeshuaRen 1 year ago
parent
commit
33d9168bce
5 changed files with 3 additions and 993 deletions
  1. +0
    -337
      pkgs/prescheduler/calc_score.go
  2. +0
    -517
      pkgs/prescheduler/default_prescheduler.go
  3. +0
    -117
      pkgs/prescheduler/default_prescheduler_test.go
  4. +0
    -11
      pkgs/prescheduler/prescheduler.go
  5. +3
    -11
      sdks/scheduler/models.go

+ 0
- 337
pkgs/prescheduler/calc_score.go View File

@@ -1,337 +0,0 @@
package prescheduler

import (
"fmt"
"github.com/inhies/go-bytesize"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/common/utils/math2"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)

func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error {
for _, cc := range allCCs {
res, err := s.calcOneResourceScore(jobResource, &cc.CC)
if err != nil {
return err
}

cc.Resource = *res
}

return nil
}

// 划分节点资源等级,并计算资源得分
func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)

getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID))
if err != nil {
return nil, err
}

var resDetail resourcesDetail

//计算资源得分
totalScore := 0.0
maxLevel := 0
resKinds := 0

if requires.CPU > 0 {
res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.CPU.Level = ResourceLevel3
resDetail.CPU.Score = 0
} else {
resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU)
resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.CPU.Level)
totalScore += resDetail.CPU.Score
resKinds++
}

if requires.GPU > 0 {
res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.GPU.Level = ResourceLevel3
resDetail.GPU.Score = 0
} else {
resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU)
resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.GPU.Level)
totalScore += resDetail.GPU.Score
resKinds++
}

if requires.NPU > 0 {
res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.NPU.Level = ResourceLevel3
resDetail.NPU.Score = 0
} else {
resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU)
resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.NPU.Level)
totalScore += resDetail.NPU.Score
resKinds++
}

if requires.MLU > 0 {
res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.MLU.Level = ResourceLevel3
resDetail.MLU.Score = 0
} else {
resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU)
resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.MLU.Level)
totalScore += resDetail.MLU.Score
resKinds++
}

if requires.Storage > 0 {
res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Storage.Level = ResourceLevel3
resDetail.Storage.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}

resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage))
resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.Storage.Level)
totalScore += resDetail.Storage.Score
resKinds++
}

if requires.Memory > 0 {
res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Memory.Level = ResourceLevel3
resDetail.Memory.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}

resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory))
resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight
}

maxLevel = math2.Max(maxLevel, resDetail.Memory.Level)
totalScore += resDetail.Memory.Score
resKinds++
}

if resKinds == 0 {
return &resDetail, nil
}

resDetail.TotalScore = totalScore
resDetail.AvgScore = resDetail.AvgScore / float64(resKinds)
resDetail.MaxLevel = maxLevel

return &resDetail, nil
}

func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int {
if avai >= 1.5*need {
return ResourceLevel1
}

if avai >= need {
return ResourceLevel2
}

return ResourceLevel3
}

// 计算节点得分情况
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error {
// 只计算运控返回的可用计算中心上的存储服务的数据权重
cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
for _, cc := range allCCs {
cdsNodeToCC[cc.CC.CDSNodeID] = cc
}

//计算code相关得分
if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok {
codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc code file score: %w", err)
}
for id, score := range codeFileScores {
allCCs[id].Files.Code = *score
}
}

//计算dataset相关得分
if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok {
datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc dataset file score: %w", err)
}
for id, score := range datasetFileScores {
allCCs[id].Files.Dataset = *score
}
}

//计算image相关得分
if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok {
//计算image相关得分
imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc image file score: %w", err)
}
for id, score := range imageFileScores {
allCCs[id].Files.Image = *score
}
}

for _, cc := range allCCs {
cc.Files.TotalScore = cc.Files.Code.CachingScore +
cc.Files.Code.LoadingScore +
cc.Files.Dataset.CachingScore +
cc.Files.Dataset.LoadingScore +
cc.Files.Image.CachingScore +
cc.Files.Image.LoadingScore
}

return nil
}

// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)

ccFileScores := make(map[schsdk.CCID]*fileDetail)

// TODO UserID
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID))
if err != nil {
return nil, err
}

for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
if !ok {
continue
}

ccFileScores[cc.CC.CCID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}

// TODO UserID
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
if err != nil {
return nil, err
}

for _, cdsNodeID := range loadedResp.StgNodeIDs {
cc, ok := cdsNodeToCC[cdsNodeID]
if !ok {
continue
}

sfc, ok := ccFileScores[cc.CC.CCID]
if !ok {
sfc = &fileDetail{}
ccFileScores[cc.CC.CCID] = sfc
}

sfc.LoadingScore = 1 * LoadedWeight
sfc.IsLoaded = true
}

return ccFileScores, nil
}

// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)

magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
}
defer schglb.ManagerMQPool.Release(magCli)

imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID))
if err != nil {
return nil, fmt.Errorf("getting image info: %w", err)
}

ccFileScores := make(map[schsdk.CCID]*fileDetail)

if imageInfoResp.Image.CDSPackageID != nil {
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID))
if err != nil {
return nil, err
}

for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
if !ok {
continue
}

ccFileScores[cc.CC.CCID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
}

// 镜像的LoadingScore是判断是否导入到算力中心
for _, pcmImg := range imageInfoResp.PCMImages {
_, ok := allCCs[pcmImg.CCID]
if !ok {
continue
}

fsc, ok := ccFileScores[pcmImg.CCID]
if !ok {
fsc = &fileDetail{}
ccFileScores[pcmImg.CCID] = fsc
}

fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}

return ccFileScores, nil
}

+ 0
- 517
pkgs/prescheduler/default_prescheduler.go View File

@@ -1,517 +0,0 @@
package prescheduler

import (
"fmt"
"sort"

"github.com/samber/lo"

schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)

const (
//每个节点划分的资源等级:
// ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍
ResourceLevel1 = 1
// ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍
ResourceLevel2 = 2
// ResourceLevel3: 表示某些资源类型 小于一倍
ResourceLevel3 = 3

CpuResourceWeight float64 = 1
StgResourceWeight float64 = 1.2

CachingWeight float64 = 1
LoadedWeight float64 = 2
)

var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")

type candidate struct {
CC schmod.ComputingCenter
IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点
Resource resourcesDetail
Files filesDetail
}

type resourcesDetail struct {
CPU resourceDetail
GPU resourceDetail
NPU resourceDetail
MLU resourceDetail
Storage resourceDetail
Memory resourceDetail

TotalScore float64
AvgScore float64
MaxLevel int
}
type resourceDetail struct {
Level int
Score float64
}

type filesDetail struct {
Dataset fileDetail
Code fileDetail
Image fileDetail

TotalScore float64
}
type fileDetail struct {
CachingScore float64
LoadingScore float64
IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}

type schedulingJob struct {
Job schsdk.JobInfo
Afters []string
}

type CandidateArr []*candidate

func (a CandidateArr) Len() int { return len(a) }
func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CandidateArr) Less(i, j int) bool {
n1 := a[i]
n2 := a[j]

// 优先与所依赖的任务放到一起,但要求那个节点的资源足够
if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 {
return true
}
if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 {
return true
}

// 优先判断资源等级,资源等级越低,代表越满足需求
if n1.Resource.MaxLevel < n2.Resource.MaxLevel {
return true
}
if n1.Resource.MaxLevel > n2.Resource.MaxLevel {
return false
}

// 等级相同时,根据单项分值比较
switch n1.Resource.MaxLevel {
case ResourceLevel1:
// 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑
return n1.Files.TotalScore > n2.Files.TotalScore

case ResourceLevel2:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore

case ResourceLevel3:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
}

return false
}

type DefaultPreScheduler struct {
}

func NewDefaultPreScheduler() *DefaultPreScheduler {
return &DefaultPreScheduler{}
}

// ScheduleJobSet 任务集预调度
func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) {
jobSetScheme := &jobmod.JobSetPreScheduleScheme{
JobSchemes: make(map[string]jobmod.JobScheduleScheme),
}
filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme)

mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.ManagerMQPool.Release(mgrCli)

// 查询有哪些算力中心可用

allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter())
if err != nil {
return nil, nil, fmt.Errorf("getting all computing center info: %w", err)
}

ccs := make(map[schsdk.CCID]schmod.ComputingCenter)
for _, node := range allCC.ComputingCenters {
ccs[node.CCID] = node
}

if len(ccs) == 0 {
return nil, nil, ErrNoAvailableScheme
}

// 先根据任务配置,收集它们依赖的任务的LocalID
var schJobs []*schedulingJob
for _, job := range info.Jobs {
j := &schedulingJob{
Job: job,
}

if norJob, ok := job.(*schsdk.NormalJobInfo); ok {
if resFile, ok := norJob.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.DataReturnLocalJobID)
}

if resFile, ok := norJob.Files.Code.(*schsdk.DataReturnJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.DataReturnLocalJobID)
}
} else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok {
j.Afters = append(j.Afters, resJob.TargetLocalJobID)
}

schJobs = append(schJobs, j)
}

// 然后根据依赖进行排序
schJobs, ok := s.orderByAfters(schJobs)
if !ok {
return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set")
}

// 经过排序后,按顺序生成调度方案
for _, job := range schJobs {
if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok {
scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes)
if err != nil {
return nil, nil, err
}

jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme

// 检查数据文件的配置项,生成上传文件方案
s.fillNormarlJobLocalUploadScheme(norJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs)
}

if mulJob, ok := job.Job.(*schsdk.MultiInstanceJobInfo); ok {
scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes)
if err != nil {
return nil, nil, err
}

jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme

// 检查数据文件的配置项,生成上传文件方案
s.fillNormarlJobLocalUploadScheme(mulJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs)
}

// 回源任务目前不需要生成调度方案
}

return jobSetScheme, &schsdk.JobSetFilesUploadScheme{
LocalFileSchemes: lo.Values(filesUploadSchemes),
}, nil
}

// ScheduleJob 单个任务预调度
func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) {
filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme)

mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.ManagerMQPool.Release(mgrCli)

// 查询有哪些算力中心可用

allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter())
if err != nil {
return nil, nil, fmt.Errorf("getting all computing center info: %w", err)
}

ccs := make(map[schsdk.CCID]schmod.ComputingCenter)
for _, node := range allCC.ComputingCenters {
ccs[node.CCID] = node
}

if len(ccs) == 0 {
return nil, nil, ErrNoAvailableScheme
}

info := &schsdk.NormalJobInfo{
Files: instJobInfo.Files,
Runtime: instJobInfo.Runtime,
Resources: instJobInfo.Resources,
}

job := &schedulingJob{
Job: info,
}
scheme, err := s.scheduleForSingleJob(job, ccs)
if err != nil {
return nil, nil, err
}

// 检查数据文件的配置项,生成上传文件方案
s.fillNormarlJobLocalUploadScheme(info.Files, scheme.TargetCCID, filesUploadSchemes, ccs)

return scheme, &schsdk.JobFilesUploadScheme{
LocalFileSchemes: lo.Values(filesUploadSchemes),
}, nil
}

func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) {
type jobOrder struct {
Job *schedulingJob
Afters []string
}

var jobOrders []*jobOrder
for _, job := range jobs {
od := &jobOrder{
Job: job,
Afters: make([]string, len(job.Afters)),
}

copy(od.Afters, job.Afters)

jobOrders = append(jobOrders, od)
}

// 然后排序
var orderedJob []*schedulingJob
for {
rm := 0
for i, jo := range jobOrders {
// 找到没有依赖的任务,然后将其取出
if len(jo.Afters) == 0 {
orderedJob = append(orderedJob, jo.Job)

// 删除其他任务对它的引用
for _, job2 := range jobOrders {
job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() })
}

rm++
continue
}

jobOrders[i-rm] = jobOrders[i]
}

jobOrders = jobOrders[:len(jobOrders)-rm]
if len(jobOrders) == 0 {
break
}

// 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败
if rm == 0 {
return nil, false
}
}

return orderedJob, true
}

func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) {
allCCs := make(map[schsdk.CCID]*candidate)

// 初始化备选节点信息
for _, cc := range ccs {
caNode := &candidate{
CC: cc,
}

// 检查此节点是否是它所引用的任务所选的节点
for _, af := range job.Afters {
resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af)
if resJob == nil {
return nil, fmt.Errorf("resource job %s not found in the job set", af)
}

// 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值
scheme, ok := jobSchemes[resJob.TargetLocalJobID]
if !ok {
continue
}

if scheme.TargetCCID == cc.CCID {
caNode.IsReferencedJobTarget = true
break
}
}

allCCs[cc.CCID] = caNode
}

//norJob := job.Job.(*schsdk.NormalJobInfo)

var jobFiles *schsdk.JobFilesInfo
var jobResource *schsdk.JobResourcesInfo

switch runningJob := job.Job.(type) {
case *schsdk.NormalJobInfo:
jobFiles = &runningJob.Files
jobResource = &runningJob.Resources
case *schsdk.MultiInstanceJobInfo:
jobFiles = &runningJob.Files
jobResource = &runningJob.Resources
}

// 计算文件占有量得分
err := s.calcFileScore(*jobFiles, allCCs)
if err != nil {
return nil, err
}

// 计算资源余量得分
err = s.calcResourceScore(*jobResource, allCCs)
if err != nil {
return nil, err
}

allCCsArr := lo.Values(allCCs)
sort.Sort(CandidateArr(allCCsArr))

targetNode := allCCsArr[0]
if targetNode.Resource.MaxLevel == ResourceLevel3 {
return nil, ErrNoAvailableScheme
}

scheme := s.makeSchemeForNode(jobFiles, targetNode)
return &scheme, nil
}

func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) {
allCCs := make(map[schsdk.CCID]*candidate)

// 初始化备选节点信息
for _, cc := range ccs {
caNode := &candidate{
CC: cc,
}

allCCs[cc.CCID] = caNode
}

//norJob := job.Job.(*schsdk.NormalJobInfo)

var jobFiles *schsdk.JobFilesInfo
var jobResource *schsdk.JobResourcesInfo

switch runningJob := job.Job.(type) {
case *schsdk.NormalJobInfo:
jobFiles = &runningJob.Files
jobResource = &runningJob.Resources
case *schsdk.MultiInstanceJobInfo:
jobFiles = &runningJob.Files
jobResource = &runningJob.Resources
}

// 计算文件占有量得分
err := s.calcFileScore(*jobFiles, allCCs)
if err != nil {
return nil, err
}

// 计算资源余量得分
err = s.calcResourceScore(*jobResource, allCCs)
if err != nil {
return nil, err
}

allCCsArr := lo.Values(allCCs)
sort.Sort(CandidateArr(allCCsArr))

targetNode := allCCsArr[0]
if targetNode.Resource.MaxLevel == ResourceLevel3 {
return nil, ErrNoAvailableScheme
}

scheme := s.makeSchemeForNode(jobFiles, targetNode)
return &scheme, nil
}

func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) {
if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}

if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}

if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}
}

func (s *DefaultPreScheduler) makeSchemeForNode(jobFiles *schsdk.JobFilesInfo, targetCC *candidate) jobmod.JobScheduleScheme {
scheme := jobmod.JobScheduleScheme{
TargetCCID: targetCC.CC.CCID,
}

// TODO 根据实际情况选择Move或者Load

if _, ok := jobFiles.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
} else {
scheme.Dataset.Action = jobmod.ActionNo
}

if _, ok := jobFiles.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded {
scheme.Code.Action = jobmod.ActionLoad
} else {
scheme.Code.Action = jobmod.ActionNo
}

if _, ok := jobFiles.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded {
scheme.Image.Action = jobmod.ActionImportImage
} else {
scheme.Image.Action = jobmod.ActionNo
}

return scheme
}

func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T {
for _, data := range all {
if ret, ok := data.(T); ok {
return ret
}
}

var def T
return def
}

func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T {
for _, job := range jobs {
if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID {
return ret
}
}

var def T
return def
}

+ 0
- 117
pkgs/prescheduler/default_prescheduler_test.go View File

@@ -1,117 +0,0 @@
package prescheduler

import (
"testing"

"github.com/samber/lo"
. "github.com/smartystreets/goconvey/convey"

schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)

func TestOrderByAfters(t *testing.T) {
cases := []struct {
title string
jobs []*schedulingJob
wants []string
}{
{
title: "所有Job都有直接或间接的依赖关系",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}},
Afters: []string{"1"},
},
},
wants: []string{"2", "1", "3"},
},

{
title: "部分Job之间无依赖关系",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}},
Afters: []string{"1"},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}},
Afters: []string{"5"},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}},
Afters: []string{},
},
},
wants: []string{"2", "5", "1", "3", "4"},
},

{
title: "存在循环依赖",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{"1"},
},
},
wants: nil,
},

{
title: "完全不依赖",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{},
},

{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},
},
wants: []string{"1", "2"},
},
}

sch := NewDefaultPreScheduler()
for _, c := range cases {
Convey(c.title, t, func() {
ordered, ok := sch.orderByAfters(c.jobs)
if c.wants == nil {
So(ok, ShouldBeFalse)
} else {
So(ok, ShouldBeTrue)

ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() })
So(ids, ShouldResemble, c.wants)
}
})
}
}

+ 0
- 11
pkgs/prescheduler/prescheduler.go View File

@@ -1,11 +0,0 @@
package prescheduler

import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)

type PreScheduler interface {
ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error)
ScheduleJob(info *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error)
}

+ 3
- 11
sdks/scheduler/models.go View File

@@ -9,6 +9,7 @@ import (
const ( const (
JobTypeNormal = "Normal" JobTypeNormal = "Normal"
JobTypeResource = "Resource" JobTypeResource = "Resource"
JobTypeInstance = "Instance"


FileInfoTypePackage = "Package" FileInfoTypePackage = "Package"
FileInfoTypeLocalFile = "LocalFile" FileInfoTypeLocalFile = "LocalFile"
@@ -37,6 +38,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*NormalJobInfo)(nil), (*NormalJobInfo)(nil),
(*DataReturnJobInfo)(nil), (*DataReturnJobInfo)(nil),
(*MultiInstanceJobInfo)(nil), (*MultiInstanceJobInfo)(nil),
(*InstanceJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -58,16 +60,6 @@ type NormalJobInfo struct {
Services JobServicesInfo `json:"services"` Services JobServicesInfo `json:"services"`
} }


type CommonJobInfo struct {
serder.Metadata `union:"Normal"`
JobInfoBase
Type string `json:"type"`
Files JobFilesInfo `json:"files"`
Runtime JobRuntimeInfo `json:"runtime"`
Resources JobResourcesInfo `json:"resources"`
Services JobServicesInfo `json:"services"`
}

type DataReturnJobInfo struct { type DataReturnJobInfo struct {
serder.Metadata `union:"DataReturn"` serder.Metadata `union:"DataReturn"`
JobInfoBase JobInfoBase
@@ -86,7 +78,7 @@ type MultiInstanceJobInfo struct {
} }


type InstanceJobInfo struct { type InstanceJobInfo struct {
serder.Metadata `union:"SubJob"`
serder.Metadata `union:"Instance"`
JobInfoBase JobInfoBase
Type string `json:"type"` Type string `json:"type"`
LocalJobID string `json:"multiInstJobID"` LocalJobID string `json:"multiInstJobID"`


Loading…
Cancel
Save