From ff35ffef90ca76a7a2cec922161b54286afb45b4 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 25 Dec 2024 17:39:52 +0800 Subject: [PATCH] update createTasks --- .../logic/schedule/schedulecreatetasklogic.go | 115 +++++++++++------- .../logic/schedule/scheduleruntasklogic.go | 38 +++--- internal/scheduler/common/common.go | 24 ++++ internal/scheduler/scheduler.go | 15 +-- internal/scheduler/schedulers/aiScheduler.go | 10 ++ internal/scheduler/strategy/dataLocality.go | 4 + internal/scheduler/strategy/strategy.go | 3 + pkg/constants/task.go | 1 + 8 files changed, 139 insertions(+), 71 deletions(-) create mode 100644 internal/scheduler/strategy/dataLocality.go diff --git a/internal/logic/schedule/schedulecreatetasklogic.go b/internal/logic/schedule/schedulecreatetasklogic.go index 28b089f7..c43ac86c 100644 --- a/internal/logic/schedule/schedulecreatetasklogic.go +++ b/internal/logic/schedule/schedulecreatetasklogic.go @@ -57,7 +57,12 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( if err != nil { return nil, err } - taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, req.JobResources.Clusters) + + assignedClusters := copyParams([]*strategy.AssignedCluster{{ + ClusterId: req.JobResources.Clusters[0].ClusterID, + }}, req.JobResources.Clusters) + + taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters) if err != nil { return nil, err } @@ -66,24 +71,24 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( return resp, nil } else { - clusterInfos, err := l.getClusterInfosByStrategy(&req.JobResources) + assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources) if err != nil { return nil, err } - if len(clusterInfos) == 0 { + if len(assignedClusters) == 0 { return nil, fmt.Errorf("failed to create task, no scheduled cluster found") } - for _, info := range clusterInfos { - clusters = append(clusters, info.ClusterID) + for _, c := range assignedClusters { + clusters = append(clusters, c.ClusterId) } schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters) if err != nil { return nil, err } - taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, clusterInfos) + taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters) if err != nil { return nil, err } @@ -93,69 +98,93 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( } } -func (l *ScheduleCreateTaskLogic) getClusterInfosByStrategy(resources *types.JobResources) ([]*types.JobClusterInfo, error) { +func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources) ([]*strategy.AssignedCluster, error) { + var assignedClusters []*strategy.AssignedCluster + switch resources.ScheduleStrategy { + case strategy.LEASTLOADFIRST: + var resSpecs []*collector.ResourceSpec + var resCount int + for i := 0; i < QUERY_RESOURCE_RETRY; i++ { + defer time.Sleep(time.Second) + qResources, err := l.queryResource.queryResources(make([]string, 0)) + if err != nil { + continue + } - var resSpecs []*collector.ResourceSpec - var resCount int - for i := 0; i < QUERY_RESOURCE_RETRY; i++ { - defer time.Sleep(time.Second) - qResources, err := l.queryResource.queryResources(make([]string, 0)) - if err != nil { - continue - } + for _, resource := range qResources { + if resource.Resources != nil { + resCount++ + } + } - for _, resource := range qResources { - if resource.Resources != nil { - resCount++ + if resCount >= 1 { + resSpecs = qResources + break + } else { + resCount = 0 + continue } } - if resCount >= 1 { - resSpecs = qResources - break - } else { - resCount = 0 - continue + if resCount == 0 { + return nil, fmt.Errorf("failed to create task, resources counting fails") } - } - - if resCount == 0 { - return nil, fmt.Errorf("failed to create task, resources counting fails") - } - var clusterInfos []*types.JobClusterInfo - switch resources.ScheduleStrategy { - case strategy.LEASTLOADFIRST: strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs) clusters, err := strtg.Schedule() if err != nil { return nil, err } - clusterInfos = filterClusterInfos(clusters, resources.Clusters) + assignedClusters = copyParams(clusters, resources.Clusters) } - return clusterInfos, nil + return assignedClusters, nil } -func filterClusterInfos(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*types.JobClusterInfo { - var result []*types.JobClusterInfo - for _, cinfo := range clusterInfos { - for _, c := range clusters { - if cinfo.ClusterID == c.ClusterId { - result = append(result, cinfo) +func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*strategy.AssignedCluster { + var result []*strategy.AssignedCluster + + for _, c := range clusters { + for _, info := range clusterInfos { + if c.ClusterId == info.ClusterID { + var envs []string + var params []string + for k, v := range info.Runtime.Envs { + val := common.ConvertTypeToString(v) + if val != "" { + env := k + storeLink.COMMA + val + envs = append(envs, env) + } + } + for k, v := range info.Runtime.Params { + val := common.ConvertTypeToString(v) + if val != "" { + p := k + storeLink.COMMA + val + params = append(params, p) + } + } + cluster := &strategy.AssignedCluster{ + ClusterId: c.ClusterId, + ClusterName: c.ClusterName, + Replicas: c.Replicas, + Cmd: info.Runtime.Command, + Envs: envs, + Params: params, + } + result = append(result, cluster) } } } return result } -func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, jobClusterInfo []*types.JobClusterInfo) (int64, error) { +func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, clusters []*strategy.AssignedCluster) (int64, error) { var synergyStatus int64 - if len(jobClusterInfo) > 1 { + if len(clusters) > 1 { synergyStatus = 1 } - y, err := yaml.Marshal(jobClusterInfo) + y, err := yaml.Marshal(clusters) if err != nil { fmt.Printf("Error while Marshaling. %v", err) } diff --git a/internal/logic/schedule/scheduleruntasklogic.go b/internal/logic/schedule/scheduleruntasklogic.go index 08dc0598..1ef56f65 100644 --- a/internal/logic/schedule/scheduleruntasklogic.go +++ b/internal/logic/schedule/scheduleruntasklogic.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gopkg.in/yaml.v2" @@ -34,11 +35,16 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ if err != nil { return nil, err } + if task == nil { return nil, errors.New("task not found ") } - var clusters []*types.JobClusterInfo + if task.Status == constants.Cancelled { + return nil, errors.New("task has been cancelled ") + } + + var clusters []*strategy.AssignedCluster err = yaml.Unmarshal([]byte(task.YamlString), &clusters) if err != nil { return nil, err @@ -56,21 +62,21 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ return nil, err } - adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID) - if err != nil { - return nil, err - } - - for _, i := range clusters { - clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(i.ClusterID) - - opt := &option.AiOption{} - - err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(task.Id, opt, adapterName, i.ClusterID, clusterName, "", constants.Saved, "") - if err != nil { - return nil, errors.New("database add failed: " + err.Error()) - } - } + //adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID) + //if err != nil { + // return nil, err + //} + // + //for _, i := range clusters { + // clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(i.ClusterID) + // + // opt := &option.AiOption{} + // + // err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(task.Id, opt, adapterName, i.ClusterID, clusterName, "", constants.Saved, "") + // if err != nil { + // return nil, errors.New("database add failed: " + err.Error()) + // } + //} return } diff --git a/internal/scheduler/common/common.go b/internal/scheduler/common/common.go index 1b486525..424ec21f 100644 --- a/internal/scheduler/common/common.go +++ b/internal/scheduler/common/common.go @@ -15,9 +15,11 @@ package common import ( + "encoding/json" "github.com/go-resty/resty/v2" "math" "math/rand" + "strconv" "time" ) @@ -134,3 +136,25 @@ func Unique(s []string) []string { } return result } + +func ConvertTypeToString(v interface{}) string { + switch v.(type) { + + case int: + s := v.(int) + return strconv.Itoa(s) + case string: + s := v.(string) + return s + case float64: + s := strconv.FormatFloat(v.(float64), 'f', -1, 64) + return s + case int64: + s := v.(int64) + return strconv.FormatInt(s, 64) + case json.Number: + return v.(json.Number).String() + default: + return "" + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 6fe0c3e9..f9f23c12 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -22,7 +22,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -132,7 +131,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters interface{}) (interface{}, error) { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters []*strategy.AssignedCluster) (interface{}, error) { var result interface{} switch mode { case JOINT_CLOUD_MODE: @@ -157,17 +156,9 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters result = resp case STORAGE_SCHEDULE_MODE: - jobClusterInfos, ok := assignedClusters.([]*types.JobClusterInfo) - if !ok { - return nil, errors.New("converting JobClusterInfos fails") - } - var clusters []*strategy.AssignedCluster - for _, info := range jobClusterInfos { - cluster := &strategy.AssignedCluster{ClusterId: info.ClusterID, Replicas: 1} - clusters = append(clusters, cluster) - } + //assign tasks to clusters - resp, err := ss.AssignTask(clusters, mode) + resp, err := ss.AssignTask(assignedClusters, mode) if err != nil { return nil, err } diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 8fc3b21c..c364c101 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -173,6 +173,16 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int wg.Add(1) go func() { opt, _ := cloneAiOption(as.option) + // decide opt params by mode + switch mode { + case scheduler.STORAGE_SCHEDULE_MODE: + opt.Cmd = c.Cmd + opt.Envs = c.Envs + opt.Params = c.Params + default: + + } + resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode) if err != nil { e := struct { diff --git a/internal/scheduler/strategy/dataLocality.go b/internal/scheduler/strategy/dataLocality.go new file mode 100644 index 00000000..693c58f7 --- /dev/null +++ b/internal/scheduler/strategy/dataLocality.go @@ -0,0 +1,4 @@ +package strategy + +type DataLocality struct { +} diff --git a/internal/scheduler/strategy/strategy.go b/internal/scheduler/strategy/strategy.go index 00d9e02b..08697a6b 100644 --- a/internal/scheduler/strategy/strategy.go +++ b/internal/scheduler/strategy/strategy.go @@ -23,6 +23,9 @@ type AssignedCluster struct { ClusterId string ClusterName string Replicas int32 + Cmd string + Envs []string + Params []string } func GetStrategyNames() []string { diff --git a/pkg/constants/task.go b/pkg/constants/task.go index 6df4cbe7..d21f5b9d 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -29,4 +29,5 @@ const ( Pending = "Pending" Stopped = "Stopped" Deploying = "Deploying" + Cancelled = "Cancelled" )