package schedule import ( "context" "fmt" "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gopkg.in/yaml.v3" "slices" "strings" "time" "github.com/zeromicro/go-zero/core/logx" ) const ( TRAINNING_TASK_REPLICA = 1 TRAINNING_TASK_SUFFIX_LEN = 10 QUERY_RESOURCE_RETRY = 3 ) type ScheduleCreateTaskLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext queryResource *QueryResourcesLogic } func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic { return &ScheduleCreateTaskLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, queryResource: NewQueryResourcesLogic(ctx, svcCtx), } } func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) { resp = &types.CreateTaskResp{} err = validateJobResources(req.JobResources) if err != nil { return nil, err } var clusters []string if len(req.JobResources.Clusters) == 1 { clusters = append(clusters, req.JobResources.Clusters[0].ClusterID) schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters) if err != nil { return nil, err } assignedClusters := copyParams([]*strategy.AssignedCluster{{ ClusterId: req.JobResources.Clusters[0].ClusterID, }}, req.JobResources.Clusters) taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters, req.Token) if err != nil { return nil, err } resp.ScheduleDatas = schedatas resp.TaskID = taskId return resp, nil } else { assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources, &req.DataDistributes) if err != nil { return nil, err } if len(assignedClusters) == 0 { return nil, fmt.Errorf("failed to create task, no scheduled cluster found") } for _, c := range assignedClusters { clusters = append(clusters, c.ClusterId) } schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters) if err != nil { return nil, err } taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters, req.Token) if err != nil { return nil, err } resp.ScheduleDatas = schedatas resp.TaskID = taskId return resp, nil } } func validateJobResources(resources types.JobResources) error { if resources.ScheduleStrategy == "" { return fmt.Errorf("must specify ScheduleStrategy") } if len(resources.Clusters) == 0 { return fmt.Errorf("must specify at least one cluster") } for _, c := range resources.Clusters { if c.ClusterID == "" { return fmt.Errorf("must specify clusterID") } if len(c.Resources) == 0 { return fmt.Errorf("cluster: %s must specify at least one compute resource", c.ClusterID) //return errors.Wrapf(xerr.NewErrCodeMsg(1234, fmt.Sprintf("cluster: %s must specify at least one compute resource", c.ClusterID)), "") } } return nil } func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources, dataDistribute *types.DataDistribute) ([]*strategy.AssignedCluster, error) { var assignedClusters []*strategy.AssignedCluster switch resources.ScheduleStrategy { case strategy.LEASTLOADFIRST: var resSpecs []*collector.ResourceSpec var resCount int for i := 0; i < QUERY_RESOURCE_RETRY; i++ { defer time.Sleep(time.Second) qResources, err := l.queryResource.QueryResourcesByClusterId(nil) if err != nil { continue } for _, resource := range qResources { if resource.Resources != nil { resCount++ } } if resCount >= 1 { resSpecs = qResources break } else { resCount = 0 continue } } if resCount == 0 { return nil, fmt.Errorf("failed to create task, resources counting fails") } strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs) clusters, err := strtg.Schedule() if err != nil { return nil, err } assignedClusters = copyParams(clusters, resources.Clusters) case strategy.DATA_LOCALITY: strtg := strategy.NewDataLocality(TRAINNING_TASK_REPLICA, dataDistribute) clusters, err := strtg.Schedule() if err != nil { return nil, err } assignedClusters = copyParams(clusters, resources.Clusters) default: return nil, errors.New("no strategy has been chosen") } return assignedClusters, nil } func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*strategy.AssignedCluster { var result []*strategy.AssignedCluster for _, c := range clusters { for _, info := range clusterInfos { if c.ClusterId == info.ClusterID { var envs []string var params []string for k, v := range info.Runtime.Envs { val := common.ConvertTypeToString(v) if val != "" { env := k + storeLink.COMMA + val envs = append(envs, env) } } for k, v := range info.Runtime.Params { val := common.ConvertTypeToString(v) if val != "" { p := k + storeLink.COMMA + val params = append(params, p) } } cluster := &strategy.AssignedCluster{ ClusterId: c.ClusterId, ClusterName: c.ClusterName, Replicas: c.Replicas, ResourcesRequired: info.Resources, Cmd: info.Runtime.Command, Envs: envs, Params: params, } result = append(result, cluster) } } } return result } func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, clusters []*strategy.AssignedCluster, token string) (int64, error) { var synergyStatus int64 if len(clusters) > 1 { synergyStatus = 1 } y, err := yaml.Marshal(clusters) if err != nil { fmt.Printf("Error while Marshaling. %v", err) } taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, synergyStatus, strategyName, string(y), token, &l.svcCtx.Config) if err != nil { return 0, err } return taskId, nil } func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) { var schedatas []*types.ScheduleData for _, d := range distribute.Dataset { data := &types.ScheduleData{ DataType: "dataset", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } var cSlc []string for _, cluster := range d.Clusters { cSlc = append(cSlc, cluster.ClusterID) } for _, cluster := range clusters { if !slices.Contains(cSlc, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Code { data := &types.ScheduleData{ DataType: "code", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } var cSlc []string for _, cluster := range d.Clusters { cSlc = append(cSlc, cluster.ClusterID) } for _, cluster := range clusters { if !slices.Contains(cSlc, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Image { data := &types.ScheduleData{ DataType: "image", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } var cSlc []string for _, cluster := range d.Clusters { cSlc = append(cSlc, cluster.ClusterID) } for _, cluster := range clusters { if !slices.Contains(cSlc, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Model { data := &types.ScheduleData{ DataType: "model", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } var cSlc []string for _, cluster := range d.Clusters { cSlc = append(cSlc, cluster.ClusterID) } for _, cluster := range clusters { if !slices.Contains(cSlc, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } if len(schedatas) != 0 { err := l.updateStorageType(&schedatas) if err != nil { return nil, err } } return schedatas, nil } func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error { for _, s := range *schedatas { var storageType string var sTypes []string for _, id := range s.ClusterIDs { cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id) if err != nil { return err } stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)] if ok { sTypes = append(sTypes, stype) } } sTypes = common.Unique(sTypes) for _, st := range sTypes { storageType += st + storeLink.COMMA } storageType = strings.TrimSuffix(storageType, storeLink.COMMA) s.StorageType = storageType } return nil }