From 723dbd5446ecb265b450420cd4396f459d55974e Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 30 Apr 2024 16:31:52 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=A4=9A=E5=AE=9E=E4=BE=8B=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/prescheduler/calc_score.go | 337 ++++++++++++ pkgs/prescheduler/default_prescheduler.go | 481 ++++++++++++++++++ .../prescheduler/default_prescheduler_test.go | 117 +++++ pkgs/prescheduler/prescheduler.go | 11 + sdks/scheduler/models.go | 34 +- 5 files changed, 975 insertions(+), 5 deletions(-) create mode 100644 pkgs/prescheduler/calc_score.go create mode 100644 pkgs/prescheduler/default_prescheduler.go create mode 100644 pkgs/prescheduler/default_prescheduler_test.go create mode 100644 pkgs/prescheduler/prescheduler.go diff --git a/pkgs/prescheduler/calc_score.go b/pkgs/prescheduler/calc_score.go new file mode 100644 index 0000000..50a4d0a --- /dev/null +++ b/pkgs/prescheduler/calc_score.go @@ -0,0 +1,337 @@ +package prescheduler + +import ( + "fmt" + "github.com/inhies/go-bytesize" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + "gitlink.org.cn/cloudream/common/utils/math2" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { + for _, cc := range allCCs { + res, err := s.calcOneResourceScore(job.Resources, &cc.CC) + if err != nil { + return err + } + + cc.Resource = *res + } + + return nil +} + +// 划分节点资源等级,并计算资源得分 +func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) + if err != nil { + return nil, err + } + + var resDetail resourcesDetail + + //计算资源得分 + totalScore := 0.0 + maxLevel := 0 + resKinds := 0 + + if requires.CPU > 0 { + res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.CPU.Level = ResourceLevel3 + resDetail.CPU.Score = 0 + } else { + resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU) + resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.CPU.Level) + totalScore += resDetail.CPU.Score + resKinds++ + } + + if requires.GPU > 0 { + res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.GPU.Level = ResourceLevel3 + resDetail.GPU.Score = 0 + } else { + resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU) + resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.GPU.Level) + totalScore += resDetail.GPU.Score + resKinds++ + } + + if requires.NPU > 0 { + res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.NPU.Level = ResourceLevel3 + resDetail.NPU.Score = 0 + } else { + resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU) + resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.NPU.Level) + totalScore += resDetail.NPU.Score + resKinds++ + } + + if requires.MLU > 0 { + res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.MLU.Level = ResourceLevel3 + resDetail.MLU.Score = 0 + } else { + resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU) + resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.MLU.Level) + totalScore += resDetail.MLU.Score + resKinds++ + } + + if requires.Storage > 0 { + res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Storage.Level = ResourceLevel3 + resDetail.Storage.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage)) + resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Storage.Level) + totalScore += resDetail.Storage.Score + resKinds++ + } + + if requires.Memory > 0 { + res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Memory.Level = ResourceLevel3 + resDetail.Memory.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory)) + resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Memory.Level) + totalScore += resDetail.Memory.Score + resKinds++ + } + + if resKinds == 0 { + return &resDetail, nil + } + + resDetail.TotalScore = totalScore + resDetail.AvgScore = resDetail.AvgScore / float64(resKinds) + resDetail.MaxLevel = maxLevel + + return &resDetail, nil +} + +func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int { + if avai >= 1.5*need { + return ResourceLevel1 + } + + if avai >= need { + return ResourceLevel2 + } + + return ResourceLevel3 +} + +// 计算节点得分情况 +func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { + // 只计算运控返回的可用计算中心上的存储服务的数据权重 + cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) + for _, cc := range allCCs { + cdsNodeToCC[cc.CC.CDSNodeID] = cc + } + + //计算code相关得分 + if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { + codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc code file score: %w", err) + } + for id, score := range codeFileScores { + allCCs[id].Files.Code = *score + } + } + + //计算dataset相关得分 + if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { + datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc dataset file score: %w", err) + } + for id, score := range datasetFileScores { + allCCs[id].Files.Dataset = *score + } + } + + //计算image相关得分 + if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { + //计算image相关得分 + imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc image file score: %w", err) + } + for id, score := range imageFileScores { + allCCs[id].Files.Image = *score + } + } + + for _, cc := range allCCs { + cc.Files.TotalScore = cc.Files.Code.CachingScore + + cc.Files.Code.LoadingScore + + cc.Files.Dataset.CachingScore + + cc.Files.Dataset.LoadingScore + + cc.Files.Image.CachingScore + + cc.Files.Image.LoadingScore + } + + return nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + // TODO UserID + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + + // TODO UserID + loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeID := range loadedResp.StgNodeIDs { + cc, ok := cdsNodeToCC[cdsNodeID] + if !ok { + continue + } + + sfc, ok := ccFileScores[cc.CC.CCID] + if !ok { + sfc = &fileDetail{} + ccFileScores[cc.CC.CCID] = sfc + } + + sfc.LoadingScore = 1 * LoadedWeight + sfc.IsLoaded = true + } + + return ccFileScores, nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + magCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new manager client: %w", err) + } + defer schglb.ManagerMQPool.Release(magCli) + + imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) + if err != nil { + return nil, fmt.Errorf("getting image info: %w", err) + } + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + if imageInfoResp.Image.CDSPackageID != nil { + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + } + + // 镜像的LoadingScore是判断是否导入到算力中心 + for _, pcmImg := range imageInfoResp.PCMImages { + _, ok := allCCs[pcmImg.CCID] + if !ok { + continue + } + + fsc, ok := ccFileScores[pcmImg.CCID] + if !ok { + fsc = &fileDetail{} + ccFileScores[pcmImg.CCID] = fsc + } + + fsc.LoadingScore = 1 * LoadedWeight + fsc.IsLoaded = true + } + + return ccFileScores, nil +} diff --git a/pkgs/prescheduler/default_prescheduler.go b/pkgs/prescheduler/default_prescheduler.go new file mode 100644 index 0000000..b30d461 --- /dev/null +++ b/pkgs/prescheduler/default_prescheduler.go @@ -0,0 +1,481 @@ +package prescheduler + +import ( + "fmt" + "sort" + + "github.com/samber/lo" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +const ( + //每个节点划分的资源等级: + // ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍 + ResourceLevel1 = 1 + // ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍 + ResourceLevel2 = 2 + // ResourceLevel3: 表示某些资源类型 小于一倍 + ResourceLevel3 = 3 + + CpuResourceWeight float64 = 1 + StgResourceWeight float64 = 1.2 + + CachingWeight float64 = 1 + LoadedWeight float64 = 2 +) + +var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") + +type candidate struct { + CC schmod.ComputingCenter + IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 + Resource resourcesDetail + Files filesDetail +} + +type resourcesDetail struct { + CPU resourceDetail + GPU resourceDetail + NPU resourceDetail + MLU resourceDetail + Storage resourceDetail + Memory resourceDetail + + TotalScore float64 + AvgScore float64 + MaxLevel int +} +type resourceDetail struct { + Level int + Score float64 +} + +type filesDetail struct { + Dataset fileDetail + Code fileDetail + Image fileDetail + + TotalScore float64 +} +type fileDetail struct { + CachingScore float64 + LoadingScore float64 + IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 +} + +type schedulingJob struct { + Job schsdk.JobInfo + Afters []string +} + +type CandidateArr []*candidate + +func (a CandidateArr) Len() int { return len(a) } +func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CandidateArr) Less(i, j int) bool { + n1 := a[i] + n2 := a[j] + + // 优先与所依赖的任务放到一起,但要求那个节点的资源足够 + if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 { + return true + } + if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 { + return true + } + + // 优先判断资源等级,资源等级越低,代表越满足需求 + if n1.Resource.MaxLevel < n2.Resource.MaxLevel { + return true + } + if n1.Resource.MaxLevel > n2.Resource.MaxLevel { + return false + } + + // 等级相同时,根据单项分值比较 + switch n1.Resource.MaxLevel { + case ResourceLevel1: + // 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑 + return n1.Files.TotalScore > n2.Files.TotalScore + + case ResourceLevel2: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + + case ResourceLevel3: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + } + + return false +} + +type DefaultPreScheduler struct { +} + +func NewDefaultPreScheduler() *DefaultPreScheduler { + return &DefaultPreScheduler{} +} + +// ScheduleJobSet 任务集预调度 +func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) { + jobSetScheme := &jobmod.JobSetPreScheduleScheme{ + JobSchemes: make(map[string]jobmod.JobScheduleScheme), + } + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + // 先根据任务配置,收集它们依赖的任务的LocalID + var schJobs []*schedulingJob + for _, job := range info.Jobs { + j := &schedulingJob{ + Job: job, + } + + if norJob, ok := job.(*schsdk.NormalJobInfo); ok { + if resFile, ok := norJob.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + + if resFile, ok := norJob.Files.Code.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + } else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok { + j.Afters = append(j.Afters, resJob.TargetLocalJobID) + } + + schJobs = append(schJobs, j) + } + + // 然后根据依赖进行排序 + schJobs, ok := s.orderByAfters(schJobs) + if !ok { + return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") + } + + // 经过排序后,按顺序生成调度方案 + for _, job := range schJobs { + if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { + scheme, err := s.scheduleForNormalJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + // 回源任务目前不需要生成调度方案 + } + + return jobSetScheme, &schsdk.JobSetFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +// ScheduleJob 单个任务预调度 +func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) { + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + info := &schsdk.NormalJobInfo{ + Files: instJobInfo.Files, + Runtime: instJobInfo.Runtime, + Resources: instJobInfo.Resources, + } + + job := &schedulingJob{ + Job: info, + } + scheme, err := s.scheduleForNormalJob2(job, ccs) + if err != nil { + return nil, nil, err + } + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(info, scheme.TargetCCID, filesUploadSchemes, ccs) + + return scheme, &schsdk.JobFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { + type jobOrder struct { + Job *schedulingJob + Afters []string + } + + var jobOrders []*jobOrder + for _, job := range jobs { + od := &jobOrder{ + Job: job, + Afters: make([]string, len(job.Afters)), + } + + copy(od.Afters, job.Afters) + + jobOrders = append(jobOrders, od) + } + + // 然后排序 + var orderedJob []*schedulingJob + for { + rm := 0 + for i, jo := range jobOrders { + // 找到没有依赖的任务,然后将其取出 + if len(jo.Afters) == 0 { + orderedJob = append(orderedJob, jo.Job) + + // 删除其他任务对它的引用 + for _, job2 := range jobOrders { + job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() }) + } + + rm++ + continue + } + + jobOrders[i-rm] = jobOrders[i] + } + + jobOrders = jobOrders[:len(jobOrders)-rm] + if len(jobOrders) == 0 { + break + } + + // 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败 + if rm == 0 { + return nil, false + } + } + + return orderedJob, true +} + +func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + // 检查此节点是否是它所引用的任务所选的节点 + for _, af := range job.Afters { + resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af) + if resJob == nil { + return nil, fmt.Errorf("resource job %s not found in the job set", af) + } + + // 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值 + scheme, ok := jobSchemes[resJob.TargetLocalJobID] + if !ok { + continue + } + + if scheme.TargetCCID == cc.CCID { + caNode.IsReferencedJobTarget = true + break + } + } + + allCCs[cc.CCID] = caNode + } + + norJob := job.Job.(*schsdk.NormalJobInfo) + + // 计算文件占有量得分 + err := s.calcFileScore(norJob.Files, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(norJob, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(norJob, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) scheduleForNormalJob2(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + allCCs[cc.CCID] = caNode + } + + norJob := job.Job.(*schsdk.NormalJobInfo) + + // 计算文件占有量得分 + err := s.calcFileScore(norJob.Files, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(norJob, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(norJob, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { + if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } +} + +func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetCC *candidate) jobmod.JobScheduleScheme { + scheme := jobmod.JobScheduleScheme{ + TargetCCID: targetCC.CC.CCID, + } + + // TODO 根据实际情况选择Move或者Load + + if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { + scheme.Dataset.Action = jobmod.ActionLoad + } else { + scheme.Dataset.Action = jobmod.ActionNo + } + + if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { + scheme.Code.Action = jobmod.ActionLoad + } else { + scheme.Code.Action = jobmod.ActionNo + } + + if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { + scheme.Image.Action = jobmod.ActionImportImage + } else { + scheme.Image.Action = jobmod.ActionNo + } + + return scheme +} + +func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { + for _, data := range all { + if ret, ok := data.(T); ok { + return ret + } + } + + var def T + return def +} + +func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T { + for _, job := range jobs { + if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID { + return ret + } + } + + var def T + return def +} diff --git a/pkgs/prescheduler/default_prescheduler_test.go b/pkgs/prescheduler/default_prescheduler_test.go new file mode 100644 index 0000000..e10ae82 --- /dev/null +++ b/pkgs/prescheduler/default_prescheduler_test.go @@ -0,0 +1,117 @@ +package prescheduler + +import ( + "testing" + + "github.com/samber/lo" + . "github.com/smartystreets/goconvey/convey" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +) + +func TestOrderByAfters(t *testing.T) { + cases := []struct { + title string + jobs []*schedulingJob + wants []string + }{ + { + title: "所有Job都有直接或间接的依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + }, + wants: []string{"2", "1", "3"}, + }, + + { + title: "部分Job之间无依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}}, + Afters: []string{"5"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}}, + Afters: []string{}, + }, + }, + wants: []string{"2", "5", "1", "3", "4"}, + }, + + { + title: "存在循环依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{"1"}, + }, + }, + wants: nil, + }, + + { + title: "完全不依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + }, + wants: []string{"1", "2"}, + }, + } + + sch := NewDefaultPreScheduler() + for _, c := range cases { + Convey(c.title, t, func() { + ordered, ok := sch.orderByAfters(c.jobs) + if c.wants == nil { + So(ok, ShouldBeFalse) + } else { + So(ok, ShouldBeTrue) + + ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() }) + So(ids, ShouldResemble, c.wants) + } + }) + } +} diff --git a/pkgs/prescheduler/prescheduler.go b/pkgs/prescheduler/prescheduler.go new file mode 100644 index 0000000..7aa3f18 --- /dev/null +++ b/pkgs/prescheduler/prescheduler.go @@ -0,0 +1,11 @@ +package prescheduler + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type PreScheduler interface { + ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) + ScheduleJob(info *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) +} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 5ad60b4..8f8e487 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -36,6 +36,7 @@ type JobInfo interface { var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*NormalJobInfo)(nil), (*DataReturnJobInfo)(nil), + (*MultiInstanceJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -65,6 +66,25 @@ type DataReturnJobInfo struct { TargetLocalJobID string `json:"targetLocalJobID"` } +type MultiInstanceJobInfo struct { + serder.Metadata `union:"MultiInstance"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` +} + +type InstanceJobInfo struct { + serder.Metadata `union:"SubJob"` + JobInfoBase + Type string `json:"type"` + LocalJobID string `json:"multiInstJobID"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` +} + type JobFilesInfo struct { Dataset JobFileInfo `json:"dataset"` Code JobFileInfo `json:"code"` @@ -78,7 +98,7 @@ type JobFileInfo interface { var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo]( (*PackageJobFileInfo)(nil), (*LocalJobFileInfo)(nil), - (*ResourceJobFileInfo)(nil), + (*DataReturnJobFileInfo)(nil), (*ImageJobFileInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type") @@ -101,11 +121,11 @@ type LocalJobFileInfo struct { LocalPath string `json:"localPath"` } -type ResourceJobFileInfo struct { - serder.Metadata `union:"Resource"` +type DataReturnJobFileInfo struct { + serder.Metadata `union:"DataReturn"` JobFileInfoBase - Type string `json:"type"` - ResourceLocalJobID string `json:"resourceLocalJobID"` + Type string `json:"type"` + DataReturnLocalJobID string `json:"dataReturnLocalJobID"` } type ImageJobFileInfo struct { @@ -140,6 +160,10 @@ type JobSetFilesUploadScheme struct { LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"` } +type JobFilesUploadScheme struct { + LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"` +} + type LocalFileUploadScheme struct { LocalPath string `json:"localPath"` UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"` From 1f48ef178ca829c57dcdd19083823d2a407020b1 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 7 May 2024 15:45:14 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=A4=9A=E5=AE=9E=E4=BE=8B=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/prescheduler/calc_score.go | 4 +- pkgs/prescheduler/default_prescheduler.go | 80 ++++++++---- sdks/scheduler/models.go | 10 ++ sdks/unifyops/unifyops.go | 152 +++++++++++++++++----- utils/config/config.go | 13 ++ 5 files changed, 202 insertions(+), 57 deletions(-) diff --git a/pkgs/prescheduler/calc_score.go b/pkgs/prescheduler/calc_score.go index 50a4d0a..68cc801 100644 --- a/pkgs/prescheduler/calc_score.go +++ b/pkgs/prescheduler/calc_score.go @@ -13,9 +13,9 @@ import ( mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) -func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { +func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { for _, cc := range allCCs { - res, err := s.calcOneResourceScore(job.Resources, &cc.CC) + res, err := s.calcOneResourceScore(jobResource, &cc.CC) if err != nil { return err } diff --git a/pkgs/prescheduler/default_prescheduler.go b/pkgs/prescheduler/default_prescheduler.go index b30d461..98f6e03 100644 --- a/pkgs/prescheduler/default_prescheduler.go +++ b/pkgs/prescheduler/default_prescheduler.go @@ -183,7 +183,7 @@ func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.J // 经过排序后,按顺序生成调度方案 for _, job := range schJobs { if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { - scheme, err := s.scheduleForNormalJob(info, job, ccs, jobSetScheme.JobSchemes) + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) if err != nil { return nil, nil, err } @@ -191,7 +191,19 @@ func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.J jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetCCID, filesUploadSchemes, ccs) + s.fillNormarlJobLocalUploadScheme(norJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + if mulJob, ok := job.Job.(*schsdk.MultiInstanceJobInfo); ok { + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(mulJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) } // 回源任务目前不需要生成调度方案 @@ -237,13 +249,13 @@ func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) ( job := &schedulingJob{ Job: info, } - scheme, err := s.scheduleForNormalJob2(job, ccs) + scheme, err := s.scheduleForSingleJob(job, ccs) if err != nil { return nil, nil, err } // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(info, scheme.TargetCCID, filesUploadSchemes, ccs) + s.fillNormarlJobLocalUploadScheme(info.Files, scheme.TargetCCID, filesUploadSchemes, ccs) return scheme, &schsdk.JobFilesUploadScheme{ LocalFileSchemes: lo.Values(filesUploadSchemes), @@ -303,7 +315,7 @@ func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulin return orderedJob, true } -func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { +func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { allCCs := make(map[schsdk.CCID]*candidate) // 初始化备选节点信息 @@ -334,16 +346,28 @@ func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, jo allCCs[cc.CCID] = caNode } - norJob := job.Job.(*schsdk.NormalJobInfo) + //norJob := job.Job.(*schsdk.NormalJobInfo) + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } // 计算文件占有量得分 - err := s.calcFileScore(norJob.Files, allCCs) + err := s.calcFileScore(*jobFiles, allCCs) if err != nil { return nil, err } // 计算资源余量得分 - err = s.calcResourceScore(norJob, allCCs) + err = s.calcResourceScore(*jobResource, allCCs) if err != nil { return nil, err } @@ -356,11 +380,11 @@ func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, jo return nil, ErrNoAvailableScheme } - scheme := s.makeSchemeForNode(norJob, targetNode) + scheme := s.makeSchemeForNode(jobFiles, targetNode) return &scheme, nil } -func (s *DefaultPreScheduler) scheduleForNormalJob2(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { +func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { allCCs := make(map[schsdk.CCID]*candidate) // 初始化备选节点信息 @@ -372,16 +396,28 @@ func (s *DefaultPreScheduler) scheduleForNormalJob2(job *schedulingJob, ccs map[ allCCs[cc.CCID] = caNode } - norJob := job.Job.(*schsdk.NormalJobInfo) + //norJob := job.Job.(*schsdk.NormalJobInfo) + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } // 计算文件占有量得分 - err := s.calcFileScore(norJob.Files, allCCs) + err := s.calcFileScore(*jobFiles, allCCs) if err != nil { return nil, err } // 计算资源余量得分 - err = s.calcResourceScore(norJob, allCCs) + err = s.calcResourceScore(*jobResource, allCCs) if err != nil { return nil, err } @@ -394,12 +430,12 @@ func (s *DefaultPreScheduler) scheduleForNormalJob2(job *schedulingJob, ccs map[ return nil, ErrNoAvailableScheme } - scheme := s.makeSchemeForNode(norJob, targetNode) + scheme := s.makeSchemeForNode(jobFiles, targetNode) return &scheme, nil } -func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { - if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok { +func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { + if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ @@ -409,7 +445,7 @@ func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.Nor } } - if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok { + if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ @@ -419,7 +455,7 @@ func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.Nor } } - if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok { + if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ @@ -430,26 +466,26 @@ func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.Nor } } -func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetCC *candidate) jobmod.JobScheduleScheme { +func (s *DefaultPreScheduler) makeSchemeForNode(jobFiles *schsdk.JobFilesInfo, targetCC *candidate) jobmod.JobScheduleScheme { scheme := jobmod.JobScheduleScheme{ TargetCCID: targetCC.CC.CCID, } // TODO 根据实际情况选择Move或者Load - if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { + if _, ok := jobFiles.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { scheme.Dataset.Action = jobmod.ActionLoad } else { scheme.Dataset.Action = jobmod.ActionNo } - if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { + if _, ok := jobFiles.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { scheme.Code.Action = jobmod.ActionLoad } else { scheme.Code.Action = jobmod.ActionNo } - if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { + if _, ok := jobFiles.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { scheme.Image.Action = jobmod.ActionImportImage } else { scheme.Image.Action = jobmod.ActionNo diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 8f8e487..45282bc 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -58,6 +58,16 @@ type NormalJobInfo struct { Services JobServicesInfo `json:"services"` } +type CommonJobInfo struct { + serder.Metadata `union:"Normal"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + Services JobServicesInfo `json:"services"` +} + type DataReturnJobInfo struct { serder.Metadata `union:"DataReturn"` JobInfoBase diff --git a/sdks/unifyops/unifyops.go b/sdks/unifyops/unifyops.go index 97e64a7..64bb7ad 100644 --- a/sdks/unifyops/unifyops.go +++ b/sdks/unifyops/unifyops.go @@ -223,41 +223,127 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData, } func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) { - url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") - if err != nil { - return nil, err + //url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") + //if err != nil { + // return nil, err + //} + //resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + // Body: node, + //}) + //if err != nil { + // return nil, err + //} + // + //contType := resp.Header.Get("Content-Type") + //if strings.Contains(contType, myhttp.ContentTypeJSON) { + // + // var codeResp response[[]map[string]any] + // if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + // return nil, fmt.Errorf("parsing response: %w", err) + // } + // + // if codeResp.Code != CORRECT_CODE { + // return nil, codeResp.ToError() + // } + // + // var ret []ResourceData + // for _, mp := range codeResp.Data { + // var data ResourceData + // err := serder.MapToObject(mp, &data) + // if err != nil { + // return nil, err + // } + // ret = append(ret, data) + // } + // + // return &ret, nil + //} + // + //return nil, fmt.Errorf("unknow response content type: %s", contType) + + return mockData() +} + +func mockData() (*[]ResourceData, error) { + var ret []ResourceData + + cpuResourceData := CPUResourceData{ + Name: ResourceTypeCPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } - resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ - Body: node, - }) - if err != nil { - return nil, err + ret = append(ret, &cpuResourceData) + + npuResourceData := NPUResourceData{ + Name: ResourceTypeNPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } - - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - - var codeResp response[[]map[string]any] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } - - if codeResp.Code != CORRECT_CODE { - return nil, codeResp.ToError() - } - - var ret []ResourceData - for _, mp := range codeResp.Data { - var data ResourceData - err := serder.MapToObject(mp, &data) - if err != nil { - return nil, err - } - ret = append(ret, data) - } - - return &ret, nil + ret = append(ret, &npuResourceData) + + gpuResourceData := GPUResourceData{ + Name: ResourceTypeGPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &gpuResourceData) + + mluResourceData := MLUResourceData{ + Name: ResourceTypeMLU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } + ret = append(ret, &mluResourceData) + + storageResourceData := StorageResourceData{ + Name: ResourceTypeStorage, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &storageResourceData) + + memoryResourceData := MemoryResourceData{ + Name: ResourceTypeMemory, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &memoryResourceData) - return nil, fmt.Errorf("unknow response content type: %s", contType) + return &ret, nil } diff --git a/utils/config/config.go b/utils/config/config.go index 7a9890b..4f0d7ae 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/imdario/mergo" ) @@ -27,6 +28,18 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { return err } + if strings.Contains(execPath, "scheduler") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\confs\\" + } + + if strings.Contains(execPath, "storage") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\storage\\common\\assets\\confs\\" + } + + if strings.Contains(execPath, "gateway") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\gateway\\assets\\confs\\" + } + // TODO 可以考虑根据环境变量读取不同的配置 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) return Load(configFilePath, defCfg) From 33d9168bce9a00597a98a66bdd9576ea63a5a49f Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Thu, 9 May 2024 09:04:31 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=A4=9A=E5=AE=9E=E4=BE=8B=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BB=A3=E7=A0=81=E6=B5=8B=E8=AF=95=E5=90=8E=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/prescheduler/calc_score.go | 337 ------------ pkgs/prescheduler/default_prescheduler.go | 517 ------------------ .../prescheduler/default_prescheduler_test.go | 117 ---- pkgs/prescheduler/prescheduler.go | 11 - sdks/scheduler/models.go | 14 +- 5 files changed, 3 insertions(+), 993 deletions(-) delete mode 100644 pkgs/prescheduler/calc_score.go delete mode 100644 pkgs/prescheduler/default_prescheduler.go delete mode 100644 pkgs/prescheduler/default_prescheduler_test.go delete mode 100644 pkgs/prescheduler/prescheduler.go diff --git a/pkgs/prescheduler/calc_score.go b/pkgs/prescheduler/calc_score.go deleted file mode 100644 index 68cc801..0000000 --- a/pkgs/prescheduler/calc_score.go +++ /dev/null @@ -1,337 +0,0 @@ -package prescheduler - -import ( - "fmt" - "github.com/inhies/go-bytesize" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" - "gitlink.org.cn/cloudream/common/utils/math2" - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" - schmod "gitlink.org.cn/cloudream/scheduler/common/models" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" - mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" -) - -func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { - for _, cc := range allCCs { - res, err := s.calcOneResourceScore(jobResource, &cc.CC) - if err != nil { - return err - } - - cc.Resource = *res - } - - return nil -} - -// 划分节点资源等级,并计算资源得分 -func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) - if err != nil { - return nil, err - } - - var resDetail resourcesDetail - - //计算资源得分 - totalScore := 0.0 - maxLevel := 0 - resKinds := 0 - - if requires.CPU > 0 { - res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.CPU.Level = ResourceLevel3 - resDetail.CPU.Score = 0 - } else { - resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU) - resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.CPU.Level) - totalScore += resDetail.CPU.Score - resKinds++ - } - - if requires.GPU > 0 { - res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.GPU.Level = ResourceLevel3 - resDetail.GPU.Score = 0 - } else { - resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU) - resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.GPU.Level) - totalScore += resDetail.GPU.Score - resKinds++ - } - - if requires.NPU > 0 { - res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.NPU.Level = ResourceLevel3 - resDetail.NPU.Score = 0 - } else { - resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU) - resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.NPU.Level) - totalScore += resDetail.NPU.Score - resKinds++ - } - - if requires.MLU > 0 { - res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.MLU.Level = ResourceLevel3 - resDetail.MLU.Score = 0 - } else { - resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU) - resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.MLU.Level) - totalScore += resDetail.MLU.Score - resKinds++ - } - - if requires.Storage > 0 { - res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas) - if res == nil { - resDetail.Storage.Level = ResourceLevel3 - resDetail.Storage.Score = 0 - } else { - bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) - if err != nil { - return nil, err - } - - resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage)) - resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.Storage.Level) - totalScore += resDetail.Storage.Score - resKinds++ - } - - if requires.Memory > 0 { - res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas) - if res == nil { - resDetail.Memory.Level = ResourceLevel3 - resDetail.Memory.Score = 0 - } else { - bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) - if err != nil { - return nil, err - } - - resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory)) - resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.Memory.Level) - totalScore += resDetail.Memory.Score - resKinds++ - } - - if resKinds == 0 { - return &resDetail, nil - } - - resDetail.TotalScore = totalScore - resDetail.AvgScore = resDetail.AvgScore / float64(resKinds) - resDetail.MaxLevel = maxLevel - - return &resDetail, nil -} - -func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int { - if avai >= 1.5*need { - return ResourceLevel1 - } - - if avai >= need { - return ResourceLevel2 - } - - return ResourceLevel3 -} - -// 计算节点得分情况 -func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { - // 只计算运控返回的可用计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) - for _, cc := range allCCs { - cdsNodeToCC[cc.CC.CDSNodeID] = cc - } - - //计算code相关得分 - if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { - codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc code file score: %w", err) - } - for id, score := range codeFileScores { - allCCs[id].Files.Code = *score - } - } - - //计算dataset相关得分 - if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { - datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc dataset file score: %w", err) - } - for id, score := range datasetFileScores { - allCCs[id].Files.Dataset = *score - } - } - - //计算image相关得分 - if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { - //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc image file score: %w", err) - } - for id, score := range imageFileScores { - allCCs[id].Files.Image = *score - } - } - - for _, cc := range allCCs { - cc.Files.TotalScore = cc.Files.Code.CachingScore + - cc.Files.Code.LoadingScore + - cc.Files.Dataset.CachingScore + - cc.Files.Dataset.LoadingScore + - cc.Files.Image.CachingScore + - cc.Files.Image.LoadingScore - } - - return nil -} - -// 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - ccFileScores := make(map[schsdk.CCID]*fileDetail) - - // TODO UserID - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] - if !ok { - continue - } - - ccFileScores[cc.CC.CCID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, - } - } - - // TODO UserID - loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeID := range loadedResp.StgNodeIDs { - cc, ok := cdsNodeToCC[cdsNodeID] - if !ok { - continue - } - - sfc, ok := ccFileScores[cc.CC.CCID] - if !ok { - sfc = &fileDetail{} - ccFileScores[cc.CC.CCID] = sfc - } - - sfc.LoadingScore = 1 * LoadedWeight - sfc.IsLoaded = true - } - - return ccFileScores, nil -} - -// 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - magCli, err := schglb.ManagerMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new manager client: %w", err) - } - defer schglb.ManagerMQPool.Release(magCli) - - imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) - if err != nil { - return nil, fmt.Errorf("getting image info: %w", err) - } - - ccFileScores := make(map[schsdk.CCID]*fileDetail) - - if imageInfoResp.Image.CDSPackageID != nil { - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] - if !ok { - continue - } - - ccFileScores[cc.CC.CCID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, - } - } - } - - // 镜像的LoadingScore是判断是否导入到算力中心 - for _, pcmImg := range imageInfoResp.PCMImages { - _, ok := allCCs[pcmImg.CCID] - if !ok { - continue - } - - fsc, ok := ccFileScores[pcmImg.CCID] - if !ok { - fsc = &fileDetail{} - ccFileScores[pcmImg.CCID] = fsc - } - - fsc.LoadingScore = 1 * LoadedWeight - fsc.IsLoaded = true - } - - return ccFileScores, nil -} diff --git a/pkgs/prescheduler/default_prescheduler.go b/pkgs/prescheduler/default_prescheduler.go deleted file mode 100644 index 98f6e03..0000000 --- a/pkgs/prescheduler/default_prescheduler.go +++ /dev/null @@ -1,517 +0,0 @@ -package prescheduler - -import ( - "fmt" - "sort" - - "github.com/samber/lo" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" - schmod "gitlink.org.cn/cloudream/scheduler/common/models" - jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" - mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" -) - -const ( - //每个节点划分的资源等级: - // ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍 - ResourceLevel1 = 1 - // ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍 - ResourceLevel2 = 2 - // ResourceLevel3: 表示某些资源类型 小于一倍 - ResourceLevel3 = 3 - - CpuResourceWeight float64 = 1 - StgResourceWeight float64 = 1.2 - - CachingWeight float64 = 1 - LoadedWeight float64 = 2 -) - -var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") - -type candidate struct { - CC schmod.ComputingCenter - IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 - Resource resourcesDetail - Files filesDetail -} - -type resourcesDetail struct { - CPU resourceDetail - GPU resourceDetail - NPU resourceDetail - MLU resourceDetail - Storage resourceDetail - Memory resourceDetail - - TotalScore float64 - AvgScore float64 - MaxLevel int -} -type resourceDetail struct { - Level int - Score float64 -} - -type filesDetail struct { - Dataset fileDetail - Code fileDetail - Image fileDetail - - TotalScore float64 -} -type fileDetail struct { - CachingScore float64 - LoadingScore float64 - IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 -} - -type schedulingJob struct { - Job schsdk.JobInfo - Afters []string -} - -type CandidateArr []*candidate - -func (a CandidateArr) Len() int { return len(a) } -func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CandidateArr) Less(i, j int) bool { - n1 := a[i] - n2 := a[j] - - // 优先与所依赖的任务放到一起,但要求那个节点的资源足够 - if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 { - return true - } - if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 { - return true - } - - // 优先判断资源等级,资源等级越低,代表越满足需求 - if n1.Resource.MaxLevel < n2.Resource.MaxLevel { - return true - } - if n1.Resource.MaxLevel > n2.Resource.MaxLevel { - return false - } - - // 等级相同时,根据单项分值比较 - switch n1.Resource.MaxLevel { - case ResourceLevel1: - // 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑 - return n1.Files.TotalScore > n2.Files.TotalScore - - case ResourceLevel2: - // 资源分的平均值越高,代表资源越空余,则越优先考虑 - return n1.Resource.AvgScore > n2.Resource.AvgScore - - case ResourceLevel3: - // 资源分的平均值越高,代表资源越空余,则越优先考虑 - return n1.Resource.AvgScore > n2.Resource.AvgScore - } - - return false -} - -type DefaultPreScheduler struct { -} - -func NewDefaultPreScheduler() *DefaultPreScheduler { - return &DefaultPreScheduler{} -} - -// ScheduleJobSet 任务集预调度 -func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) { - jobSetScheme := &jobmod.JobSetPreScheduleScheme{ - JobSchemes: make(map[string]jobmod.JobScheduleScheme), - } - filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) - - mgrCli, err := schglb.ManagerMQPool.Acquire() - if err != nil { - return nil, nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.ManagerMQPool.Release(mgrCli) - - // 查询有哪些算力中心可用 - - allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) - if err != nil { - return nil, nil, fmt.Errorf("getting all computing center info: %w", err) - } - - ccs := make(map[schsdk.CCID]schmod.ComputingCenter) - for _, node := range allCC.ComputingCenters { - ccs[node.CCID] = node - } - - if len(ccs) == 0 { - return nil, nil, ErrNoAvailableScheme - } - - // 先根据任务配置,收集它们依赖的任务的LocalID - var schJobs []*schedulingJob - for _, job := range info.Jobs { - j := &schedulingJob{ - Job: job, - } - - if norJob, ok := job.(*schsdk.NormalJobInfo); ok { - if resFile, ok := norJob.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { - j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) - } - - if resFile, ok := norJob.Files.Code.(*schsdk.DataReturnJobFileInfo); ok { - j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) - } - } else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok { - j.Afters = append(j.Afters, resJob.TargetLocalJobID) - } - - schJobs = append(schJobs, j) - } - - // 然后根据依赖进行排序 - schJobs, ok := s.orderByAfters(schJobs) - if !ok { - return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") - } - - // 经过排序后,按顺序生成调度方案 - for _, job := range schJobs { - if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { - scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) - if err != nil { - return nil, nil, err - } - - jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme - - // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(norJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) - } - - if mulJob, ok := job.Job.(*schsdk.MultiInstanceJobInfo); ok { - scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) - if err != nil { - return nil, nil, err - } - - jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme - - // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(mulJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) - } - - // 回源任务目前不需要生成调度方案 - } - - return jobSetScheme, &schsdk.JobSetFilesUploadScheme{ - LocalFileSchemes: lo.Values(filesUploadSchemes), - }, nil -} - -// ScheduleJob 单个任务预调度 -func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) { - filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) - - mgrCli, err := schglb.ManagerMQPool.Acquire() - if err != nil { - return nil, nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.ManagerMQPool.Release(mgrCli) - - // 查询有哪些算力中心可用 - - allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) - if err != nil { - return nil, nil, fmt.Errorf("getting all computing center info: %w", err) - } - - ccs := make(map[schsdk.CCID]schmod.ComputingCenter) - for _, node := range allCC.ComputingCenters { - ccs[node.CCID] = node - } - - if len(ccs) == 0 { - return nil, nil, ErrNoAvailableScheme - } - - info := &schsdk.NormalJobInfo{ - Files: instJobInfo.Files, - Runtime: instJobInfo.Runtime, - Resources: instJobInfo.Resources, - } - - job := &schedulingJob{ - Job: info, - } - scheme, err := s.scheduleForSingleJob(job, ccs) - if err != nil { - return nil, nil, err - } - - // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(info.Files, scheme.TargetCCID, filesUploadSchemes, ccs) - - return scheme, &schsdk.JobFilesUploadScheme{ - LocalFileSchemes: lo.Values(filesUploadSchemes), - }, nil -} - -func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { - type jobOrder struct { - Job *schedulingJob - Afters []string - } - - var jobOrders []*jobOrder - for _, job := range jobs { - od := &jobOrder{ - Job: job, - Afters: make([]string, len(job.Afters)), - } - - copy(od.Afters, job.Afters) - - jobOrders = append(jobOrders, od) - } - - // 然后排序 - var orderedJob []*schedulingJob - for { - rm := 0 - for i, jo := range jobOrders { - // 找到没有依赖的任务,然后将其取出 - if len(jo.Afters) == 0 { - orderedJob = append(orderedJob, jo.Job) - - // 删除其他任务对它的引用 - for _, job2 := range jobOrders { - job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() }) - } - - rm++ - continue - } - - jobOrders[i-rm] = jobOrders[i] - } - - jobOrders = jobOrders[:len(jobOrders)-rm] - if len(jobOrders) == 0 { - break - } - - // 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败 - if rm == 0 { - return nil, false - } - } - - return orderedJob, true -} - -func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { - allCCs := make(map[schsdk.CCID]*candidate) - - // 初始化备选节点信息 - for _, cc := range ccs { - caNode := &candidate{ - CC: cc, - } - - // 检查此节点是否是它所引用的任务所选的节点 - for _, af := range job.Afters { - resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af) - if resJob == nil { - return nil, fmt.Errorf("resource job %s not found in the job set", af) - } - - // 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值 - scheme, ok := jobSchemes[resJob.TargetLocalJobID] - if !ok { - continue - } - - if scheme.TargetCCID == cc.CCID { - caNode.IsReferencedJobTarget = true - break - } - } - - allCCs[cc.CCID] = caNode - } - - //norJob := job.Job.(*schsdk.NormalJobInfo) - - var jobFiles *schsdk.JobFilesInfo - var jobResource *schsdk.JobResourcesInfo - - switch runningJob := job.Job.(type) { - case *schsdk.NormalJobInfo: - jobFiles = &runningJob.Files - jobResource = &runningJob.Resources - case *schsdk.MultiInstanceJobInfo: - jobFiles = &runningJob.Files - jobResource = &runningJob.Resources - } - - // 计算文件占有量得分 - err := s.calcFileScore(*jobFiles, allCCs) - if err != nil { - return nil, err - } - - // 计算资源余量得分 - err = s.calcResourceScore(*jobResource, allCCs) - if err != nil { - return nil, err - } - - allCCsArr := lo.Values(allCCs) - sort.Sort(CandidateArr(allCCsArr)) - - targetNode := allCCsArr[0] - if targetNode.Resource.MaxLevel == ResourceLevel3 { - return nil, ErrNoAvailableScheme - } - - scheme := s.makeSchemeForNode(jobFiles, targetNode) - return &scheme, nil -} - -func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { - allCCs := make(map[schsdk.CCID]*candidate) - - // 初始化备选节点信息 - for _, cc := range ccs { - caNode := &candidate{ - CC: cc, - } - - allCCs[cc.CCID] = caNode - } - - //norJob := job.Job.(*schsdk.NormalJobInfo) - - var jobFiles *schsdk.JobFilesInfo - var jobResource *schsdk.JobResourcesInfo - - switch runningJob := job.Job.(type) { - case *schsdk.NormalJobInfo: - jobFiles = &runningJob.Files - jobResource = &runningJob.Resources - case *schsdk.MultiInstanceJobInfo: - jobFiles = &runningJob.Files - jobResource = &runningJob.Resources - } - - // 计算文件占有量得分 - err := s.calcFileScore(*jobFiles, allCCs) - if err != nil { - return nil, err - } - - // 计算资源余量得分 - err = s.calcResourceScore(*jobResource, allCCs) - if err != nil { - return nil, err - } - - allCCsArr := lo.Values(allCCs) - sort.Sort(CandidateArr(allCCsArr)) - - targetNode := allCCsArr[0] - if targetNode.Resource.MaxLevel == ResourceLevel3 { - return nil, ErrNoAvailableScheme - } - - scheme := s.makeSchemeForNode(jobFiles, targetNode) - return &scheme, nil -} - -func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { - if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } - - if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } - - if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } -} - -func (s *DefaultPreScheduler) makeSchemeForNode(jobFiles *schsdk.JobFilesInfo, targetCC *candidate) jobmod.JobScheduleScheme { - scheme := jobmod.JobScheduleScheme{ - TargetCCID: targetCC.CC.CCID, - } - - // TODO 根据实际情况选择Move或者Load - - if _, ok := jobFiles.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { - scheme.Dataset.Action = jobmod.ActionLoad - } else { - scheme.Dataset.Action = jobmod.ActionNo - } - - if _, ok := jobFiles.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { - scheme.Code.Action = jobmod.ActionLoad - } else { - scheme.Code.Action = jobmod.ActionNo - } - - if _, ok := jobFiles.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { - scheme.Image.Action = jobmod.ActionImportImage - } else { - scheme.Image.Action = jobmod.ActionNo - } - - return scheme -} - -func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { - for _, data := range all { - if ret, ok := data.(T); ok { - return ret - } - } - - var def T - return def -} - -func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T { - for _, job := range jobs { - if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID { - return ret - } - } - - var def T - return def -} diff --git a/pkgs/prescheduler/default_prescheduler_test.go b/pkgs/prescheduler/default_prescheduler_test.go deleted file mode 100644 index e10ae82..0000000 --- a/pkgs/prescheduler/default_prescheduler_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package prescheduler - -import ( - "testing" - - "github.com/samber/lo" - . "github.com/smartystreets/goconvey/convey" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" -) - -func TestOrderByAfters(t *testing.T) { - cases := []struct { - title string - jobs []*schedulingJob - wants []string - }{ - { - title: "所有Job都有直接或间接的依赖关系", - jobs: []*schedulingJob{ - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, - Afters: []string{"2"}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, - Afters: []string{}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, - Afters: []string{"1"}, - }, - }, - wants: []string{"2", "1", "3"}, - }, - - { - title: "部分Job之间无依赖关系", - jobs: []*schedulingJob{ - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, - Afters: []string{"2"}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, - Afters: []string{}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, - Afters: []string{"1"}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}}, - Afters: []string{"5"}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}}, - Afters: []string{}, - }, - }, - wants: []string{"2", "5", "1", "3", "4"}, - }, - - { - title: "存在循环依赖", - jobs: []*schedulingJob{ - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, - Afters: []string{"2"}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, - Afters: []string{"1"}, - }, - }, - wants: nil, - }, - - { - title: "完全不依赖", - jobs: []*schedulingJob{ - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, - Afters: []string{}, - }, - - { - Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, - Afters: []string{}, - }, - }, - wants: []string{"1", "2"}, - }, - } - - sch := NewDefaultPreScheduler() - for _, c := range cases { - Convey(c.title, t, func() { - ordered, ok := sch.orderByAfters(c.jobs) - if c.wants == nil { - So(ok, ShouldBeFalse) - } else { - So(ok, ShouldBeTrue) - - ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() }) - So(ids, ShouldResemble, c.wants) - } - }) - } -} diff --git a/pkgs/prescheduler/prescheduler.go b/pkgs/prescheduler/prescheduler.go deleted file mode 100644 index 7aa3f18..0000000 --- a/pkgs/prescheduler/prescheduler.go +++ /dev/null @@ -1,11 +0,0 @@ -package prescheduler - -import ( - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" -) - -type PreScheduler interface { - ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) - ScheduleJob(info *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) -} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 45282bc..0b059b1 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -9,6 +9,7 @@ import ( const ( JobTypeNormal = "Normal" JobTypeResource = "Resource" + JobTypeInstance = "Instance" FileInfoTypePackage = "Package" FileInfoTypeLocalFile = "LocalFile" @@ -37,6 +38,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*NormalJobInfo)(nil), (*DataReturnJobInfo)(nil), (*MultiInstanceJobInfo)(nil), + (*InstanceJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -58,16 +60,6 @@ type NormalJobInfo struct { Services JobServicesInfo `json:"services"` } -type CommonJobInfo struct { - serder.Metadata `union:"Normal"` - JobInfoBase - Type string `json:"type"` - Files JobFilesInfo `json:"files"` - Runtime JobRuntimeInfo `json:"runtime"` - Resources JobResourcesInfo `json:"resources"` - Services JobServicesInfo `json:"services"` -} - type DataReturnJobInfo struct { serder.Metadata `union:"DataReturn"` JobInfoBase @@ -86,7 +78,7 @@ type MultiInstanceJobInfo struct { } type InstanceJobInfo struct { - serder.Metadata `union:"SubJob"` + serder.Metadata `union:"Instance"` JobInfoBase Type string `json:"type"` LocalJobID string `json:"multiInstJobID"` From 3a67f7804d3941045b876d19ae52d91a251c390c Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 15 May 2024 16:10:06 +0800 Subject: [PATCH 4/5] =?UTF-8?q?1=E3=80=81=E5=A4=9A=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1LocalJobID=E6=94=B9=E6=88=90=E6=8B=BC?= =?UTF-8?q?=E6=8E=A5=E4=B8=80=E4=B8=AA=E9=9A=8F=E6=9C=BA=E5=80=BC=202?= =?UTF-8?q?=E3=80=81=E6=99=AE=E9=80=9A=E4=BB=BB=E5=8A=A1OutputFullPath?= =?UTF-8?q?=E8=B5=8B=E5=80=BC=203=E3=80=81=E5=85=B6=E4=BB=96=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/config/config.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/utils/config/config.go b/utils/config/config.go index 4f0d7ae..3d797a9 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -3,11 +3,9 @@ package config import ( "encoding/json" "fmt" + "github.com/imdario/mergo" "os" "path/filepath" - "strings" - - "github.com/imdario/mergo" ) // Load 加载配置文件 @@ -28,17 +26,17 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { return err } - if strings.Contains(execPath, "scheduler") { - execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\confs\\" - } - - if strings.Contains(execPath, "storage") { - execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\storage\\common\\assets\\confs\\" - } - - if strings.Contains(execPath, "gateway") { - execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\gateway\\assets\\confs\\" - } + //if strings.Contains(execPath, "scheduler") { + // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\confs\\" + //} + // + //if strings.Contains(execPath, "storage") { + // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\storage\\common\\assets\\confs\\" + //} + // + //if strings.Contains(execPath, "gateway") { + // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\gateway\\assets\\confs\\" + //} // TODO 可以考虑根据环境变量读取不同的配置 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) From 90c06cdcf50f6414e95e248408524fe0e135bcce Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 17 May 2024 11:03:04 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/config/config.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/utils/config/config.go b/utils/config/config.go index 3d797a9..0f4cb04 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -26,18 +26,6 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { return err } - //if strings.Contains(execPath, "scheduler") { - // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\confs\\" - //} - // - //if strings.Contains(execPath, "storage") { - // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\storage\\common\\assets\\confs\\" - //} - // - //if strings.Contains(execPath, "gateway") { - // execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\gateway\\assets\\confs\\" - //} - // TODO 可以考虑根据环境变量读取不同的配置 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) return Load(configFilePath, defCfg)