zhangwei 5 months ago
parent
commit
869b5068ee
11 changed files with 46 additions and 27 deletions
  1. +1
    -0
      desc/schedule/pcm-schedule.api
  2. +7
    -2
      internal/cron/cron.go
  3. +22
    -17
      internal/logic/schedule/queryresourceslogic.go
  4. +1
    -1
      internal/logic/schedule/schedulecreatetasklogic.go
  5. +1
    -1
      internal/scheduler/service/collector/collector.go
  6. +1
    -1
      internal/storeLink/modelarts.go
  7. +1
    -1
      internal/storeLink/octopus.go
  8. +9
    -2
      internal/storeLink/openi.go
  9. +1
    -1
      internal/storeLink/shuguangai.go
  10. +1
    -1
      internal/storeLink/template.go
  11. +1
    -0
      internal/types/types.go

+ 1
- 0
desc/schedule/pcm-schedule.api View File

@@ -154,6 +154,7 @@ type (


// 调度资源信息:/queryResources // 调度资源信息:/queryResources
QueryResourcesReq{ QueryResourcesReq{
Type string `json:"type"`
ClusterIDs []string `json:"clusterIDs,optional"` ClusterIDs []string `json:"clusterIDs,optional"`
} }




+ 7
- 2
internal/cron/cron.go View File

@@ -42,11 +42,16 @@ func AddCronGroup(svc *svc.ServiceContext) {


svc.Cron.AddFunc("0 5/5 * * * *", func() { svc.Cron.AddFunc("0 5/5 * * * *", func() {
queryResource := schedule.NewQueryResourcesLogic(svc.HttpClient.R().Context(), svc) queryResource := schedule.NewQueryResourcesLogic(svc.HttpClient.R().Context(), svc)
rus, err := queryResource.QueryResourcesByClusterId(nil)
trainResrc, err := queryResource.QueryResourcesByClusterId(nil, "Train")
if err != nil { if err != nil {
logx.Error(err) logx.Error(err)
} }
svc.Scheduler.AiService.LocalCache[schedule.QUERY_RESOURCES] = rus
svc.Scheduler.AiService.LocalCache[schedule.QUERY_TRAIN_RESOURCES] = trainResrc
inferResrc, err := queryResource.QueryResourcesByClusterId(nil, "Inference")
if err != nil {
logx.Error(err)
}
svc.Scheduler.AiService.LocalCache[schedule.QUERY_INFERENCE_RESOURCES] = inferResrc
}) })


//更新hpc任务状态 //更新hpc任务状态


+ 22
- 17
internal/logic/schedule/queryresourceslogic.go View File

@@ -15,8 +15,9 @@ import (
) )


const ( const (
ADAPTERID = "1777144940459986944" // 异构适配器id
QUERY_RESOURCES = "query_resources"
ADAPTERID = "1777144940459986944" // 异构适配器id
QUERY_TRAIN_RESOURCES = "train_resources"
QUERY_INFERENCE_RESOURCES = "inference_resources"
) )


type QueryResourcesLogic struct { type QueryResourcesLogic struct {
@@ -41,25 +42,29 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
if err != nil { if err != nil {
return nil, err return nil, err
} }
resources, ok := l.svcCtx.Scheduler.AiService.LocalCache[QUERY_RESOURCES]

var resources interface{}
switch req.Type {
case "Train":
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
case "Inference":
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_INFERENCE_RESOURCES]
default:
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
}

specs, ok := resources.([]*collector.ResourceSpec)
if ok { if ok {
specs, ok := resources.([]*collector.ResourceSpec)
if ok {
results := handleEmptyResourceUsage(cs.List, specs)
resp.Data = results
return resp, nil
}
results := handleEmptyResourceUsage(cs.List, specs)
resp.Data = results
return resp, nil
} }


rus, err := l.QueryResourcesByClusterId(cs.List)
rus, err := l.QueryResourcesByClusterId(cs.List, req.Type)
if err != nil { if err != nil {
return nil, err return nil, err
} }


if checkCachingCondition(cs.List, rus) {
l.svcCtx.Scheduler.AiService.LocalCache[QUERY_RESOURCES] = rus
}

results := handleEmptyResourceUsage(cs.List, rus) results := handleEmptyResourceUsage(cs.List, rus)
resp.Data = results resp.Data = results


@@ -77,7 +82,7 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
return nil, errors.New("no clusters found ") return nil, errors.New("no clusters found ")
} }


rus, err := l.QueryResourcesByClusterId(clusters)
rus, err := l.QueryResourcesByClusterId(clusters, req.Type)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -89,7 +94,7 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
return resp, nil return resp, nil
} }


func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo) ([]*collector.ResourceSpec, error) {
func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo, resrcType string) ([]*collector.ResourceSpec, error) {
var clusters []types.ClusterInfo var clusters []types.ClusterInfo
if len(clusterinfos) == 0 { if len(clusterinfos) == 0 {
cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID) cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID)
@@ -121,7 +126,7 @@ func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.Clu
return return
} }


u, err = col.GetResourceSpecs(l.ctx)
u, err = col.GetResourceSpecs(l.ctx, resrcType)
if err != nil { if err != nil {
done <- true done <- true
return return


+ 1
- 1
internal/logic/schedule/schedulecreatetasklogic.go View File

@@ -218,7 +218,7 @@ func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types
var resCount int var resCount int
for i := 0; i < QUERY_RESOURCE_RETRY; i++ { for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
defer time.Sleep(time.Second) defer time.Sleep(time.Second)
qResources, err := l.queryResource.QueryResourcesByClusterId(nil)
qResources, err := l.queryResource.QueryResourcesByClusterId(nil, "Train")
if err != nil { if err != nil {
continue continue
} }


+ 1
- 1
internal/scheduler/service/collector/collector.go View File

@@ -14,7 +14,7 @@ type AiCollector interface {
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error) GetComputeCards(ctx context.Context) ([]string, error)
GetUserBalance(ctx context.Context) (float64, error) GetUserBalance(ctx context.Context) (float64, error)
GetResourceSpecs(ctx context.Context) (*ResourceSpec, error)
GetResourceSpecs(ctx context.Context, resrcType string) (*ResourceSpec, error)
} }


type ResourceSpec struct { type ResourceSpec struct {


+ 1
- 1
internal/storeLink/modelarts.go View File

@@ -996,7 +996,7 @@ func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.Infe
return errors.New("failed to find Image ") return errors.New("failed to find Image ")
} }


func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
//查询modelarts资源规格 //查询modelarts资源规格
req := &modelarts.GetResourceFlavorsReq{} req := &modelarts.GetResourceFlavorsReq{}


+ 1
- 1
internal/storeLink/octopus.go View File

@@ -1279,7 +1279,7 @@ func (o *OctopusLink) CheckModelExistence(ctx context.Context, name string, mtyp
return true return true
} }


func (o *OctopusLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o *OctopusLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
res := &collector.ResourceSpec{ res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10), ClusterId: strconv.FormatInt(o.participantId, 10),
Resources: make([]interface{}, 0), Resources: make([]interface{}, 0),


+ 9
- 2
internal/storeLink/openi.go View File

@@ -770,7 +770,14 @@ func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
return 0, errors.New("failed to implement") return 0, errors.New("failed to implement")
} }


func (o *OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var jobType string
if resrcType == "Inference" {
jobType = ONLINEINFERENCE
} else if resrcType == "Train" {
jobType = TRAIN
}

var resources []interface{} var resources []interface{}
res := &collector.ResourceSpec{ res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10), ClusterId: strconv.FormatInt(o.participantId, 10),
@@ -795,7 +802,7 @@ func (o *OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec,
param := model.TaskCreationRequiredParam{ param := model.TaskCreationRequiredParam{
UserName: o.userName, UserName: o.userName,
RepoName: TESTREPO, RepoName: TESTREPO,
JobType: TRAIN,
JobType: jobType,
ComputeSource: ComputeSource[i], ComputeSource: ComputeSource[i],
ClusterType: C2NET, ClusterType: C2NET,
} }


+ 1
- 1
internal/storeLink/shuguangai.go View File

@@ -1103,7 +1103,7 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype
return resp.Data.Exist return resp.Data.Exist
} }


func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (s *ShuguangAi) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
return nil, nil return nil, nil
//var timeout = 5 //var timeout = 5
//var wg sync.WaitGroup //var wg sync.WaitGroup


+ 1
- 1
internal/storeLink/template.go View File

@@ -118,6 +118,6 @@ func (o Template) GetUserBalance(ctx context.Context) (float64, error) {
} }


// GetResourceSpecs 查询资源规格 // GetResourceSpecs 查询资源规格
func (o Template) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o Template) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
return nil, errors.New("failed to implement") return nil, errors.New("failed to implement")
} }

+ 1
- 0
internal/types/types.go View File

@@ -6011,6 +6011,7 @@ type GetClusterBalanceByIdResp struct {
} }


type QueryResourcesReq struct { type QueryResourcesReq struct {
Type string `json:"type"`
ClusterIDs []string `json:"clusterIDs,optional"` ClusterIDs []string `json:"clusterIDs,optional"`
} }




Loading…
Cancel
Save