Browse Source

#2701

update
tags/v1.22.9.1^2^2
chenyifan01 3 years ago
parent
commit
dc66fa4b3f
2 changed files with 181 additions and 127 deletions
  1. +21
    -0
      models/resource_specification.go
  2. +160
    -127
      services/cloudbrain/resource/resource_specification.go

+ 21
- 0
models/resource_specification.go View File

@@ -525,3 +525,24 @@ func InitCloudbrainTwoSpecs() (map[string]*Specification, error) {
}
return r, nil
}

var grampusSpecsInitFlag = false
var grampusSpecs map[string]*Specification

func GetGrampusSpecs() (map[string]*Specification, error) {
if !grampusSpecsInitFlag {
r, err := FindSpecs(FindSpecsOptions{
Cluster: C2NetCluster,
})
if err != nil {
return nil, err
}
for _, spec := range r {
grampusSpecs[spec.SourceSpecId] = spec
grampusSpecs[spec.SourceSpecId+"_"+spec.AiCenterCode] = spec
}
grampusSpecsInitFlag = true

}
return grampusSpecs, nil
}

+ 160
- 127
services/cloudbrain/resource/resource_specification.go View File

@@ -338,6 +338,7 @@ func RefreshHistorySpec(scopeAll bool, ids []int64) (int64, int64, error) {
total = n
for i := 0; i < 1000; i++ {
list, err := models.FindNoSpecHistoricTask(page, pageSize)
page++
if err != nil {
log.Error("FindNoSpecHistoricTask error.page=%d pageSize=%d e=%v", page, pageSize, err)
return total, success, err
@@ -373,6 +374,8 @@ func RefreshOneHistorySpec(task *models.Cloudbrain) error {
spec, err = getCloudbrainOneSpec(task)
case models.TypeCloudBrainTwo:
spec, err = getCloudbrainTwoSpec(task)
case models.TypeC2Net:
spec, err = getGrampusSpec(task)
}
if err != nil {
log.Error("find spec error,task.ID=%d err=%v", task.ID, err)
@@ -386,155 +389,179 @@ func RefreshOneHistorySpec(task *models.Cloudbrain) error {
}

func getCloudbrainOneSpec(task *models.Cloudbrain) (*models.Specification, error) {
if task.GpuQueue == "" {
log.Info("gpu queue is empty.task.ID = %d", task.ID)
return nil, nil
}
//find from config
spec, err := findCloudbrainOneSpecFromConfig(task)
if err != nil {
log.Error("getCloudbrainOneSpec findCloudbrainOneSpecFromConfig error.%v", err)
return nil, err
}
if spec != nil {
return spec, nil
}
//find from remote
return findCloudbrainOneSpecFromRemote(task)

}

func findCloudbrainOneSpecFromRemote(task *models.Cloudbrain) (*models.Specification, error) {
time.Sleep(200 * time.Millisecond)
log.Info("start findCloudbrainOneSpecFromRemote")
result, err := cloudbrain.GetJob(task.JobID)
if err != nil {
log.Error("getCloudbrainOneSpec error. %v", err)
return nil, err
}
if result != nil {
jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory)
if task.ComputeResource == "CPU/GPU" {
task.ComputeResource = models.GPU
}
var shmMB float32
if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 {
shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024
}

opt := models.FindSpecsOptions{
ComputeResource: task.ComputeResource,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainOne,
QueueCode: task.GpuQueue,
AccCardsNum: jobRes.Resource.NvidiaComGpu,
UseAccCardsNum: true,
CpuCores: jobRes.Resource.CPU,
UseCpuCores: true,
MemGiB: memSize,
UseMemGiB: memSize > 0,
ShareMemGiB: shmMB,
UseShareMemGiB: shmMB > 0,
RequestAll: true,
}
specs, err := models.FindSpecs(opt)
if result == nil {
log.Info("findCloudbrainOneSpecFromRemote failed,result is empty.task.ID=%d", task.ID)
return nil, nil
}
jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory)
if task.ComputeResource == "CPU/GPU" {
task.ComputeResource = models.GPU
}
var shmMB float32
if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 {
shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024
}

opt := models.FindSpecsOptions{
ComputeResource: task.ComputeResource,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainOne,
QueueCode: task.GpuQueue,
AccCardsNum: jobRes.Resource.NvidiaComGpu,
UseAccCardsNum: true,
CpuCores: jobRes.Resource.CPU,
UseCpuCores: true,
MemGiB: memSize,
UseMemGiB: memSize > 0,
ShareMemGiB: shmMB,
UseShareMemGiB: shmMB > 0,
RequestAll: true,
}
specs, err := models.FindSpecs(opt)
if err != nil {
log.Error("getCloudbrainOneSpec from remote error,%v", err)
return nil, err
}
if len(specs) == 1 {
return specs[0], nil
}
if len(specs) == 0 {
s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
if err != nil {
log.Error("getCloudbrainOneSpec from remote error,%v", err)
return nil, err
log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
return nil, nil
}
if len(specs) == 1 {
return specs[0], nil
return s, nil
}
log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
return nil, nil
}

func findCloudbrainOneSpecFromConfig(task *models.Cloudbrain) (*models.Specification, error) {
//find from config
var specConfig *models.ResourceSpec
hasSpec := false
if task.JobType == string(models.JobTypeTrain) {
if cloudbrain.TrainResourceSpecs == nil {
json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs)
}
if len(specs) == 0 {
s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
if err != nil {
log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
return nil, nil
for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break
}
return s, nil
}
if len(specs) > 1 {
log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
return nil, nil
} else if task.JobType == string(models.JobTypeInference) {
if cloudbrain.InferenceResourceSpecs == nil {
json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs)
}

} else {
//find from config
var specConfig *models.ResourceSpec
hasSpec := false
if task.JobType == string(models.JobTypeTrain) {
if cloudbrain.TrainResourceSpecs == nil {
json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs)
}
for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break
}
}
} else if task.JobType == string(models.JobTypeInference) {
if cloudbrain.InferenceResourceSpecs == nil {
json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs)
}
for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break
}
}
} else {
if cloudbrain.ResourceSpecs == nil {
json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs)
for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break
}
for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break
}
} else {
if cloudbrain.ResourceSpecs == nil {
json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs)
}
for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec {
if tmp.Id == task.ResourceSpecId {
hasSpec = true
specConfig = tmp
break

}
}
}
if !hasSpec && cloudbrain.SpecialPools != nil {
}
if !hasSpec && cloudbrain.SpecialPools != nil {

for _, specialPool := range cloudbrain.SpecialPools.Pools {
for _, specialPool := range cloudbrain.SpecialPools.Pools {

if specialPool.ResourceSpec != nil {
if specialPool.ResourceSpec != nil {

for _, spec := range specialPool.ResourceSpec {
if task.ResourceSpecId == spec.Id {
hasSpec = true
specConfig = spec
break
}
for _, spec := range specialPool.ResourceSpec {
if task.ResourceSpecId == spec.Id {
hasSpec = true
specConfig = spec
break
}
}
}
}
if specConfig == nil {
log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId)
return nil, nil
}
opt := models.FindSpecsOptions{
JobType: models.JobType(task.JobType),
ComputeResource: task.ComputeResource,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainOne,
QueueCode: task.GpuQueue,
AccCardsNum: specConfig.GpuNum,
UseAccCardsNum: true,
CpuCores: specConfig.GpuNum,
UseCpuCores: true,
MemGiB: float32(specConfig.MemMiB) / 1024,
UseMemGiB: true,
ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024,
UseShareMemGiB: true,
RequestAll: true,
}
specs, err := models.FindSpecs(opt)
}
if specConfig == nil {
log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId)
return nil, nil
}
if task.ComputeResource == "CPU/GPU" {
task.ComputeResource = models.GPU
}

opt := models.FindSpecsOptions{
JobType: models.JobType(task.JobType),
ComputeResource: task.ComputeResource,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainOne,
QueueCode: task.GpuQueue,
AccCardsNum: specConfig.GpuNum,
UseAccCardsNum: true,
CpuCores: specConfig.CpuNum,
UseCpuCores: true,
MemGiB: float32(specConfig.MemMiB) / 1024,
UseMemGiB: true,
ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024,
UseShareMemGiB: true,
RequestAll: true,
}
specs, err := models.FindSpecs(opt)
if err != nil {
log.Error("getCloudbrainOneSpec from config error,%v", err)
return nil, err
}
if len(specs) > 1 {
log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
return nil, nil
}
if len(specs) == 0 {
s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
if err != nil {
log.Error("getCloudbrainOneSpec from config error,%v", err)
return nil, err
}
if len(specs) > 1 {
log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
return nil, nil
}
if len(specs) == 0 {
s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
if err != nil {
log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
return nil, nil
}
return s, nil
}
return specs[0], nil
return s, nil
}
return nil, nil

return specs[0], nil
}

func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error) {
@@ -546,6 +573,8 @@ func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error
if task.FlavorCode != "" {
return specMap[task.FlavorCode], nil
}
time.Sleep(200 * time.Millisecond)
log.Info("start getCloudbrainTwoSpec FromRemote")
if task.JobType == string(models.JobTypeDebug) {
result, err := modelarts.GetNotebook2(task.JobID)
if err != nil {
@@ -568,12 +597,16 @@ func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error
return nil, nil
}

func RefreshCloudbrainTwoSpec(task *models.Cloudbrain) error {
return nil
}

func RefreshC2NetSpec(task *models.Cloudbrain) error {
return nil
func getGrampusSpec(task *models.Cloudbrain) (*models.Specification, error) {
specMap, err := models.GetGrampusSpecs()
if err != nil {
log.Error("GetGrampusSpecs err.%v", err)
return nil, err
}
if task.AiCenter != "" {
return specMap[task.FlavorCode+"_"+task.AiCenter], nil
}
return specMap[task.FlavorCode], nil
}

func InitQueueAndSpec(opt models.FindSpecsOptions, aiCenterName string, remark string) (*models.Specification, error) {


Loading…
Cancel
Save