- /*
-
- Copyright (c) [2023] [pcm]
- [pcm-coordinator] is licensed under Mulan PSL v2.
- You can use this software according to the terms and conditions of the Mulan PSL v2.
- You may obtain a copy of Mulan PSL v2 at:
- http://license.coscl.org.cn/MulanPSL2
- THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
- EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
- MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
- See the Mulan PSL v2 for more details.
-
- */
-
- package schedulers
-
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
- "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
- "gitlink.org.cn/JointCloud/pcm-openi/model"
- "strconv"
- "sync"
- )
-
- type AiScheduler struct {
- yamlString string
- task *response.TaskInfo
- *scheduler.Scheduler
- option *option.AiOption
- ctx context.Context
- }
-
- type AiResult struct {
- AdapterId string
- TaskName string
- JobId string
- ClusterId string
- Strategy string
- Replica int32
- Card string
- Msg string
- }
-
- func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
- return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
- }
-
- func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
- ai := models.Ai{
- AdapterId: participantId,
- TaskId: task.TaskId,
- Status: "Saved",
- YamlString: as.yamlString,
- }
- utils.Convert(task.Metadata, &ai)
- return ai, nil
- }
-
- func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
- if as.option.ComputeCard != "" {
- m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
- if ok {
- for _, id := range as.option.ClusterIds {
- cm, ok := m[id]
- if ok {
- cards, err := cm.GetComputeCards(as.ctx)
- if err != nil {
- return nil, err
- }
- if common.Contains(cards, as.option.ComputeCard) {
- return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
- }
- }
- }
- }
- }
-
- if len(as.option.ClusterIds) == 1 {
- return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
- }
-
- resources, err := as.findClustersWithResources()
-
- if err != nil {
- return nil, err
- }
- if len(resources) == 0 {
- return nil, errors.New("no cluster has resources")
- }
-
- if len(resources) == 1 {
- var cluster strategy.AssignedCluster
- cluster.ClusterId = resources[0].ClusterId
- cluster.Replicas = 1
- return &strategy.SingleAssignment{Cluster: &cluster}, nil
- }
-
- params := ¶m.Params{Resources: resources}
-
- switch as.option.StrategyName {
- case strategy.REPLICATION:
- var clusterIds []string
- for _, resource := range resources {
- if resource == nil {
- continue
- }
- clusterIds = append(clusterIds, resource.ClusterId)
- }
- strategy := strategy.NewReplicationStrategy(clusterIds, 1)
- return strategy, nil
- case strategy.RESOURCES_PRICING:
- strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
- return strategy, nil
- case strategy.DYNAMIC_RESOURCES:
- strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
- return strategy, nil
- case strategy.STATIC_WEIGHT:
- //todo resources should match cluster StaticWeightMap
- strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
- return strategy, nil
- case strategy.RANDOM:
- strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
- return strategy, nil
- }
-
- return nil, errors.New("no strategy has been chosen")
- }
-
- func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
- if clusters == nil {
- return nil, errors.New("clusters is nil")
- }
-
- for i := len(clusters) - 1; i >= 0; i-- {
- if clusters[i].Replicas == 0 {
- clusters = append(clusters[:i], clusters[i+1:]...)
- }
- }
-
- if len(clusters) == 0 {
- return nil, errors.New("clusters is nil")
- }
-
- var wg sync.WaitGroup
- var results []*AiResult
- var mu sync.Mutex
- var errs []interface{}
- var taskNum int32
- for _, cluster := range clusters {
- taskNum += cluster.Replicas
- }
- var ch = make(chan *AiResult, taskNum)
- var errCh = make(chan interface{}, taskNum)
-
- executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
- for _, cluster := range clusters {
- c := cluster
- for i := 0; i < int(c.Replicas); i++ {
- wg.Add(1)
- go func() {
- opt, _ := cloneAiOption(as.option)
-
- // decide opt params by mode
- updateAiOptionByMode(c, opt, mode)
-
- resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
- if err != nil {
- e := struct {
- err error
- clusterId string
- }{
- err: err,
- clusterId: c.ClusterId,
- }
- errCh <- e
- wg.Done()
- return
- }
-
- result := &AiResult{}
- mu.Lock()
- result, _ = convertType(resp)
- mu.Unlock()
-
- result.AdapterId = opt.AdapterId
- result.TaskName = opt.TaskName
- result.Replica = c.Replicas
- result.ClusterId = c.ClusterId
- result.Strategy = as.option.StrategyName
- result.Card = opt.ComputeCard
-
- ch <- result
- wg.Done()
- }()
- }
- }
- wg.Wait()
- close(ch)
- close(errCh)
-
- for e := range errCh {
- errs = append(errs, e)
- }
-
- for s := range ch {
- results = append(results, s)
- }
-
- err := as.handleErrors(errs, clusters, results, mode)
- if err != nil {
- return nil, err
- }
-
- return results, nil
- }
-
- func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.AssignedCluster, results []*AiResult, mode int) error {
- if len(errs) != 0 {
- var synergystatus int64
- if len(clusters) > 1 {
- synergystatus = 1
- }
-
- var taskId int64
- switch mode {
- case executor.SUBMIT_MODE_JOINT_CLOUD:
- tid, err := as.CreateTask(as.option.TaskName, "", synergystatus, as.option.StrategyName, "", "", "", nil)
- if err != nil {
- return err
- }
- taskId = tid
- case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
- taskId = as.option.TaskId
- }
-
- // aiTasks
- adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
- if err != nil {
- return err
- }
-
- //report msg
- report := &jcs.JobStatusReportReq{
- TaskName: "",
- TaskID: strconv.FormatInt(taskId, 10),
- Messages: make([]*jcs.ReportMessage, 0),
- }
-
- var errmsg string
- for _, err := range errs {
- e := (err).(struct {
- err error
- clusterId string
- })
- msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
- errmsg += msg
-
- clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
-
- err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
- if err != nil {
- return errors.New("database add failed: " + err.Error())
- }
-
- //add report msg
- jobMsg := &jcs.ReportMessage{
- Status: false,
- Message: msg,
- ClusterID: e.clusterId,
- Output: "",
- }
- report.Messages = append(report.Messages, jobMsg)
- }
- for _, s := range results {
- as.option.ComputeCard = s.Card //execute card
- clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
-
- if s.Msg != "" {
- msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
- errmsg += msg
- err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
- if err != nil {
- return errors.New("database add failed: " + err.Error())
- }
- } else {
- msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
- errmsg += msg
- err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
- if err != nil {
- return errors.New("database add failed: " + err.Error())
- }
- }
- //add report msg
- jobMsg := &jcs.ReportMessage{
- Status: false,
- Message: s.Msg,
- ClusterID: s.ClusterId,
- Output: "",
- }
- report.Messages = append(report.Messages, jobMsg)
- }
-
- //report status
- if mode == executor.SUBMIT_MODE_STORAGE_SCHEDULE {
- _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
- }
-
- logx.Errorf(errors.New(errmsg).Error())
- return errors.New(errmsg)
- }
-
- return nil
- }
-
- func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
- switch mode {
- case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
- opt.Cmd = cluster.Cmd
- opt.Envs = cluster.Envs
- opt.Params = cluster.Params
-
- opt.ImageId = cluster.ImageId
- opt.AlgorithmId = cluster.CodeId
- opt.DatasetsId = cluster.DatasetId
-
- opt.ResourcesRequired = cluster.ResourcesRequired
-
- default:
-
- }
- }
-
- func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
- var wg sync.WaitGroup
- var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
- var ch = make(chan *collector.ResourceStats, clustersNum)
- var errCh = make(chan interface{}, clustersNum)
-
- var resourceSpecs []*collector.ResourceStats
- var errs []interface{}
-
- for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
- wg.Add(1)
- rc := resourceCollector
- id := s
- go func() {
- spec, err := rc.GetResourceStats(as.ctx)
- if err != nil {
- e := struct {
- err error
- clusterId string
- }{
- err: err,
- clusterId: id,
- }
- errCh <- e
- wg.Done()
- return
- }
- ch <- spec
- wg.Done()
- }()
- }
- wg.Wait()
- close(ch)
- close(errCh)
-
- for s := range ch {
- resourceSpecs = append(resourceSpecs, s)
- }
-
- for e := range errCh {
- errs = append(errs, e)
- }
-
- if len(errs) == clustersNum {
- return nil, errors.New("get resources failed")
- }
-
- if len(errs) != 0 {
- var msg string
- for _, err := range errs {
- e := (err).(struct {
- err error
- clusterId string
- })
- msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
- }
- //return nil, errors.New(msg)
- }
-
- return resourceSpecs, nil
- }
-
- func convertType(in interface{}) (*AiResult, error) {
- var result AiResult
- switch (in).(type) {
- case *hpcAC.SubmitTaskAiResp:
- resp := (in).(*hpcAC.SubmitTaskAiResp)
- if resp.Code == "0" {
- result.JobId = resp.Data
- } else {
- result.Msg = resp.Msg
- }
- return &result, nil
- case *octopus.CreateTrainJobResp:
- resp := (in).(*octopus.CreateTrainJobResp)
-
- if resp.Success {
- result.JobId = resp.Payload.JobId
- } else {
- result.Msg = resp.Error.Message
- }
-
- return &result, nil
- case *modelartsservice.CreateTrainingJobResp:
- resp := (in).(*modelartsservice.CreateTrainingJobResp)
-
- if resp.ErrorMsg != "" {
- result.Msg = resp.ErrorMsg
- } else {
-
- result.JobId = resp.Metadata.Id
- }
-
- return &result, nil
- case model.CreateTask:
- resp := (in).(model.CreateTask)
-
- if resp.Code != 0 {
- result.Msg = resp.Msg
- } else {
- result.JobId = strconv.Itoa(resp.Data.Id)
- }
-
- return &result, nil
- default:
- return nil, errors.New("ai task response failed")
- }
- }
-
- func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
- origJSON, err := json.Marshal(opt)
- if err != nil {
- return nil, err
- }
-
- clone := option.AiOption{}
- if err = json.Unmarshal(origJSON, &clone); err != nil {
- return nil, err
- }
-
- return &clone, nil
- }
|