package schedule import ( "context" "errors" "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "slices" "strings" "time" "github.com/zeromicro/go-zero/core/logx" ) const ( TRAINNING_TASK_REPLICA = 1 TRAINNING_TASK_SUFFIX_LEN = 10 QUERY_RESOURCE_RETRY = 3 ) type ScheduleCreateTaskLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext queryResource *QueryResourcesLogic } func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic { return &ScheduleCreateTaskLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, queryResource: NewQueryResourcesLogic(ctx, svcCtx), } } func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) { resp = &types.CreateTaskResp{} if req.JobResources.ScheduleStrategy == "" { return nil, fmt.Errorf("must specify ScheduleStrategy") } if len(req.JobResources.Clusters) == 0 { return nil, fmt.Errorf("must specify at least one cluster") } var clusters []string if len(req.JobResources.Clusters) == 1 { clusters = append(clusters, req.JobResources.Clusters[0].ClusterID) schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters) if err != nil { return nil, err } taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, req.JobResources.Clusters) if err != nil { return nil, err } resp.ScheduleDatas = schedatas resp.TaskID = taskId return resp, nil } else { clusterInfos, err := l.getClusterInfosByStrategy(&req.JobResources) if err != nil { return nil, err } if len(clusterInfos) == 0 { return nil, fmt.Errorf("failed to create task, no scheduled cluster found") } for _, info := range clusterInfos { clusters = append(clusters, info.ClusterID) } schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters) if err != nil { return nil, err } taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, clusterInfos) if err != nil { return nil, err } resp.ScheduleDatas = schedatas resp.TaskID = taskId return resp, nil } } func (l *ScheduleCreateTaskLogic) getClusterInfosByStrategy(resources *types.JobResources) ([]*types.JobClusterInfo, error) { var resSpecs []*collector.ResourceSpec var resCount int for i := 0; i < QUERY_RESOURCE_RETRY; i++ { defer time.Sleep(time.Second) qResources, err := l.queryResource.queryResources(make([]string, 0)) if err != nil { continue } for _, resource := range qResources { if resource.Resources != nil { resCount++ } } if resCount >= 1 { resSpecs = qResources break } else { resCount = 0 continue } } if resCount == 0 { return nil, fmt.Errorf("failed to create task, resources counting fails") } var clusterInfos []*types.JobClusterInfo switch resources.ScheduleStrategy { case strategy.LEASTLOADFIRST: strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs) clusters, err := strtg.Schedule() if err != nil { return nil, err } clusterInfos = filterClusterInfos(clusters, resources.Clusters) } return clusterInfos, nil } func filterClusterInfos(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*types.JobClusterInfo { var result []*types.JobClusterInfo for _, cinfo := range clusterInfos { for _, c := range clusters { if cinfo.ClusterID == c.ClusterId { result = append(result, cinfo) } } } return result } func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, jobClusterInfo []*types.JobClusterInfo) (int64, error) { var synergyStatus int64 if len(jobClusterInfo) > 1 { synergyStatus = 1 } strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(strategyName) if err != nil { return 0, err } taskId, err := l.svcCtx.Scheduler.AiStorages.SaveTask(taskName, strategyCode, synergyStatus, "10") if err != nil { return 0, err } adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID) if err != nil { return 0, err } for _, i := range jobClusterInfo { clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(i.ClusterID) opt := &option.AiOption{} err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(taskId, opt, adapterName, i.ClusterID, clusterName, "", constants.Saved, "") if err != nil { return 0, errors.New("database add failed: " + err.Error()) } } return taskId, nil } func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) { var schedatas []*types.ScheduleData for _, d := range distribute.Dataset { data := &types.ScheduleData{ DataType: "dataset", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } for _, cluster := range clusters { if !slices.Contains(d.Clusters, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Code { data := &types.ScheduleData{ DataType: "code", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } for _, cluster := range clusters { if !slices.Contains(d.Clusters, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Image { data := &types.ScheduleData{ DataType: "image", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } for _, cluster := range clusters { if !slices.Contains(d.Clusters, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } for _, d := range distribute.Model { data := &types.ScheduleData{ DataType: "model", PackageID: d.PackageID, ClusterIDs: make([]string, 0), } for _, cluster := range clusters { if !slices.Contains(d.Clusters, cluster) { data.ClusterIDs = append(data.ClusterIDs, cluster) } else { continue } } if len(data.ClusterIDs) != 0 { schedatas = append(schedatas, data) } } if len(schedatas) != 0 { err := l.updateStorageType(&schedatas) if err != nil { return nil, err } } return schedatas, nil } func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error { for _, s := range *schedatas { var storageType string var sTypes []string for _, id := range s.ClusterIDs { cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id) if err != nil { return err } stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)] if ok { sTypes = append(sTypes, stype) } } sTypes = common.Unique(sTypes) for _, st := range sTypes { storageType += st + storeLink.COMMA } storageType = strings.TrimSuffix(storageType, storeLink.COMMA) s.StorageType = storageType } return nil }