/* Copyright (c) [2023] [pcm] [pcm-coordinator] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ package schedulers import ( "context" "encoding/json" "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "sync" ) type AiScheduler struct { yamlString string task *response.TaskInfo *scheduler.Scheduler option *option.AiOption ctx context.Context } type AiResult struct { JobId string ClusterId string Strategy string Replica int32 Card string Msg string } func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil } func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { ai := models.Ai{ AdapterId: participantId, TaskId: task.TaskId, Status: "Saved", YamlString: as.yamlString, } utils.Convert(task.Metadata, &ai) return ai, nil } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if as.option.ComputeCard != "" { m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId] if ok { for _, id := range as.option.ClusterIds { cm, ok := m[id] if ok { cards, err := cm.GetComputeCards(as.ctx) if err != nil { return nil, err } if common.Contains(cards, as.option.ComputeCard) { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil } } } } } if len(as.option.ClusterIds) == 1 { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } resources, err := as.findClustersWithResources() if err != nil { return nil, err } if len(resources) == 0 { return nil, errors.New("no cluster has resources") } if len(resources) == 1 { var cluster strategy.AssignedCluster cluster.ClusterId = resources[0].ClusterId cluster.Replicas = 1 return &strategy.SingleAssignment{Cluster: &cluster}, nil } params := ¶m.Params{Resources: resources} switch as.option.StrategyName { case strategy.REPLICATION: var clusterIds []string for _, resource := range resources { clusterIds = append(clusterIds, resource.ClusterId) } strategy := strategy.NewReplicationStrategy(clusterIds, 1) return strategy, nil case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) return strategy, nil case strategy.STATIC_WEIGHT: //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) return strategy, nil } return nil, errors.New("no strategy has been chosen") } func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { if clusters == nil { return nil, errors.New("clusters is nil") } for i := len(clusters) - 1; i >= 0; i-- { if clusters[i].Replicas == 0 { clusters = append(clusters[:i], clusters[i+1:]...) } } if len(clusters) == 0 { return nil, errors.New("clusters is nil") } var wg sync.WaitGroup var results []*AiResult var mu sync.Mutex var errs []interface{} var taskNum int32 for _, cluster := range clusters { taskNum += cluster.Replicas } var ch = make(chan *AiResult, taskNum) var errCh = make(chan interface{}, taskNum) executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId] for _, cluster := range clusters { c := cluster for i := 0; i < int(c.Replicas); i++ { wg.Add(1) go func() { opt, _ := cloneAiOption(as.option) resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt) if err != nil { e := struct { err error clusterId string }{ err: err, clusterId: c.ClusterId, } errCh <- e wg.Done() return } mu.Lock() result, _ := convertType(resp) mu.Unlock() result.Replica = c.Replicas result.ClusterId = c.ClusterId result.Strategy = as.option.StrategyName result.Card = opt.ComputeCard ch <- result wg.Done() }() } } wg.Wait() close(ch) close(errCh) for e := range errCh { errs = append(errs, e) } for s := range ch { results = append(results, s) } if len(errs) != 0 { var synergystatus int64 if len(clusters) > 1 { synergystatus = 1 } strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName) taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } var errmsg string for _, err := range errs { e := (err).(struct { err error clusterId string }) msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) errmsg += msg clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId) err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } for _, s := range results { as.option.ComputeCard = s.Card //execute card clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId) if s.Msg != "" { msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) errmsg += msg err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } else { msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId) errmsg += msg err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } } logx.Errorf(errors.New(errmsg).Error()) return nil, errors.New(errmsg) } return results, nil } func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { var wg sync.WaitGroup var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId]) var ch = make(chan *collector.ResourceStats, clustersNum) var errCh = make(chan interface{}, clustersNum) var resourceSpecs []*collector.ResourceStats var errs []interface{} for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] { wg.Add(1) rc := resourceCollector id := s go func() { spec, err := rc.GetResourceStats(as.ctx) if err != nil { e := struct { err error clusterId string }{ err: err, clusterId: id, } errCh <- e wg.Done() return } ch <- spec wg.Done() }() } wg.Wait() close(ch) close(errCh) for s := range ch { resourceSpecs = append(resourceSpecs, s) } for e := range errCh { errs = append(errs, e) } if len(errs) == clustersNum { return nil, errors.New("get resources failed") } if len(errs) != 0 { var msg string for _, err := range errs { e := (err).(struct { err error clusterId string }) msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) } return nil, errors.New(msg) } return resourceSpecs, nil } func convertType(in interface{}) (*AiResult, error) { var result AiResult switch (in).(type) { case *hpcAC.SubmitTaskAiResp: resp := (in).(*hpcAC.SubmitTaskAiResp) if resp.Code == "0" { result.JobId = resp.Data } else { result.Msg = resp.Msg } return &result, nil case *octopus.CreateTrainJobResp: resp := (in).(*octopus.CreateTrainJobResp) if resp.Success { result.JobId = resp.Payload.JobId } else { result.Msg = resp.Error.Message } return &result, nil case *modelartsservice.CreateTrainingJobResp: resp := (in).(*modelartsservice.CreateTrainingJobResp) if resp.ErrorMsg != "" { result.Msg = resp.ErrorMsg } else { result.JobId = resp.Metadata.Id } return &result, nil default: return nil, errors.New("ai task response failed") } } func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) { origJSON, err := json.Marshal(opt) if err != nil { return nil, err } clone := option.AiOption{} if err = json.Unmarshal(origJSON, &clone); err != nil { return nil, err } return &clone, nil }