|
- /*
-
- Copyright (c) [2023] [pcm]
- [pcm-coordinator] is licensed under Mulan PSL v2.
- You can use this software according to the terms and conditions of the Mulan PSL v2.
- You may obtain a copy of Mulan PSL v2 at:
- http://license.coscl.org.cn/MulanPSL2
- THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
- EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
- MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
- See the Mulan PSL v2 for more details.
-
- */
-
- package schedulers
-
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
- "sync"
- )
-
- type AiScheduler struct {
- yamlString string
- task *response.TaskInfo
- *scheduler.Scheduler
- option *option.AiOption
- ctx context.Context
- }
-
- type AiResult struct {
- JobId string
- ClusterId string
- Strategy string
- Replica int32
- Card string
- Msg string
- }
-
- func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
- return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
- }
-
- func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
- ai := models.Ai{
- AdapterId: participantId,
- TaskId: task.TaskId,
- Status: "Saved",
- YamlString: as.yamlString,
- }
- utils.Convert(task.Metadata, &ai)
- return ai, nil
- }
-
- func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
- if len(as.option.ClusterIds) == 1 {
- return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
- }
-
- resources, err := as.findClustersWithResources()
-
- if err != nil {
- return nil, err
- }
- if len(resources) == 0 {
- return nil, errors.New("no cluster has resources")
- }
-
- if len(resources) == 1 {
- var cluster strategy.AssignedCluster
- cluster.ClusterId = resources[0].ClusterId
- cluster.Replicas = 1
- return &strategy.SingleAssignment{Cluster: &cluster}, nil
- }
-
- params := ¶m.Params{Resources: resources}
-
- switch as.option.StrategyName {
- case strategy.REPLICATION:
- var clusterIds []string
- for _, resource := range resources {
- clusterIds = append(clusterIds, resource.ClusterId)
- }
- strategy := strategy.NewReplicationStrategy(clusterIds, 1)
- return strategy, nil
- case strategy.RESOURCES_PRICING:
- strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
- return strategy, nil
- case strategy.DYNAMIC_RESOURCES:
- strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
- return strategy, nil
- case strategy.STATIC_WEIGHT:
- //todo resources should match cluster StaticWeightMap
- strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
- return strategy, nil
- }
-
- return nil, errors.New("no strategy has been chosen")
- }
-
- func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
- if clusters == nil {
- return nil, errors.New("clusters is nil")
- }
-
- for i := len(clusters) - 1; i >= 0; i-- {
- if clusters[i].Replicas == 0 {
- clusters = append(clusters[:i], clusters[i+1:]...)
- }
- }
-
- if len(clusters) == 0 {
- return nil, errors.New("clusters is nil")
- }
-
- var wg sync.WaitGroup
- var results []*AiResult
- var errs []interface{}
- var ch = make(chan *AiResult, len(clusters))
- var errCh = make(chan interface{}, len(clusters))
-
- executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
- for _, cluster := range clusters {
- c := cluster
- wg.Add(1)
- go func() {
- opt, _ := cloneAiOption(as.option)
- resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
-
- if err != nil {
- e := struct {
- err error
- clusterId string
- }{
- err: err,
- clusterId: c.ClusterId,
- }
- errCh <- e
- wg.Done()
- return
- }
-
- result, _ := convertType(resp)
- result.Replica = c.Replicas
- result.ClusterId = c.ClusterId
- result.Strategy = as.option.StrategyName
- result.Card = opt.ComputeCard
-
- ch <- result
- wg.Done()
- }()
- }
- wg.Wait()
- close(ch)
- close(errCh)
-
- for e := range errCh {
- errs = append(errs, e)
- }
-
- for s := range ch {
- results = append(results, s)
- }
-
- if len(errs) != 0 {
- var synergystatus int64
- if len(clusters) > 1 {
- synergystatus = 1
- }
- strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName)
- taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus)
- if err != nil {
- return nil, errors.New("database add failed: " + err.Error())
- }
-
- var errmsg string
- for _, err := range errs {
- e := (err).(struct {
- err error
- clusterId string
- })
- msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
- errmsg += msg
-
- clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
-
- err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg)
- if err != nil {
- return nil, errors.New("database add failed: " + err.Error())
- }
- }
- for _, s := range results {
- as.option.ComputeCard = s.Card //execute card
- clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
-
- if s.Msg != "" {
- msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
- errmsg += msg
- err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg)
- if err != nil {
- return nil, errors.New("database add failed: " + err.Error())
- }
- } else {
- msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
- errmsg += msg
- err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
- if err != nil {
- return nil, errors.New("database add failed: " + err.Error())
- }
- }
- }
- logx.Errorf(errors.New(errmsg).Error())
- return nil, errors.New(errmsg)
- }
-
- return results, nil
- }
-
- func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
- var wg sync.WaitGroup
- var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
- var ch = make(chan *collector.ResourceStats, clustersNum)
- var errCh = make(chan interface{}, clustersNum)
-
- var resourceSpecs []*collector.ResourceStats
- var errs []interface{}
-
- for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
- wg.Add(1)
- rc := resourceCollector
- id := s
- go func() {
- spec, err := rc.GetResourceStats(as.ctx)
- if err != nil {
- e := struct {
- err error
- clusterId string
- }{
- err: err,
- clusterId: id,
- }
- errCh <- e
- wg.Done()
- return
- }
- ch <- spec
- wg.Done()
- }()
- }
- wg.Wait()
- close(ch)
- close(errCh)
-
- for s := range ch {
- resourceSpecs = append(resourceSpecs, s)
- }
-
- for e := range errCh {
- errs = append(errs, e)
- }
-
- if len(errs) == clustersNum {
- return nil, errors.New("get resources failed")
- }
-
- if len(errs) != 0 {
- var msg string
- for _, err := range errs {
- e := (err).(struct {
- err error
- clusterId string
- })
- msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
- }
- return nil, errors.New(msg)
- }
-
- return resourceSpecs, nil
- }
-
- func convertType(in interface{}) (*AiResult, error) {
- var result AiResult
- switch (in).(type) {
- case *hpcAC.SubmitTaskAiResp:
- resp := (in).(*hpcAC.SubmitTaskAiResp)
- if resp.Code == "0" {
- result.JobId = resp.Data
- } else {
- result.Msg = resp.Msg
- }
- return &result, nil
- case *octopus.CreateTrainJobResp:
- resp := (in).(*octopus.CreateTrainJobResp)
-
- if resp.Success {
- result.JobId = resp.Payload.JobId
- } else {
- result.Msg = resp.Error.Message
- }
-
- return &result, nil
- default:
- return nil, errors.New("ai task response failed")
- }
- }
-
- func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
- origJSON, err := json.Marshal(opt)
- if err != nil {
- return nil, err
- }
-
- clone := option.AiOption{}
- if err = json.Unmarshal(origJSON, &clone); err != nil {
- return nil, err
- }
-
- return &clone, nil
- }
|