diff --git a/internal/logic/cloud/commitgeneraltasklogic.go b/internal/logic/cloud/commitgeneraltasklogic.go index 87f23245..1558f3c6 100644 --- a/internal/logic/cloud/commitgeneraltasklogic.go +++ b/internal/logic/cloud/commitgeneraltasklogic.go @@ -70,7 +70,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er utils.Convert(&req, &opt) sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) - results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, scheduler.JOINT_CLOUD_MODE) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, scheduler.JOINT_CLOUD_MODE, nil) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err diff --git a/internal/logic/core/commitvmtasklogic.go b/internal/logic/core/commitvmtasklogic.go index 3d0c1cb8..f682c9a9 100644 --- a/internal/logic/core/commitvmtasklogic.go +++ b/internal/logic/core/commitvmtasklogic.go @@ -63,7 +63,7 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type return nil, err } // 3、Return scheduling results - results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl, scheduler.JOINT_CLOUD_MODE) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl, scheduler.JOINT_CLOUD_MODE, nil) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return nil, err diff --git a/internal/logic/schedule/scheduleruntasklogic.go b/internal/logic/schedule/scheduleruntasklogic.go index c68adeb6..08dc0598 100644 --- a/internal/logic/schedule/scheduleruntasklogic.go +++ b/internal/logic/schedule/scheduleruntasklogic.go @@ -3,6 +3,8 @@ package schedule import ( "context" "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gopkg.in/yaml.v2" @@ -42,10 +44,17 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ return nil, err } - //schedule, err := l.svcCtx.Scheduler.AssignAndSchedule() - //if err != nil { - // return nil, err - //} + opt := &option.AiOption{} + + aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) + if err != nil { + return nil, err + } + + _, err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.STORAGE_SCHEDULE_MODE, clusters) + if err != nil { + return nil, err + } adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID) if err != nil { diff --git a/internal/logic/schedule/schedulesubmitlogic.go b/internal/logic/schedule/schedulesubmitlogic.go index c5934a8a..6e715e88 100644 --- a/internal/logic/schedule/schedulesubmitlogic.go +++ b/internal/logic/schedule/schedulesubmitlogic.go @@ -52,7 +52,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE, nil) if err != nil { return nil, err } diff --git a/internal/mqs/ScheduleAi.go b/internal/mqs/ScheduleAi.go index 2406a90f..d3cd9250 100644 --- a/internal/mqs/ScheduleAi.go +++ b/internal/mqs/ScheduleAi.go @@ -41,7 +41,7 @@ func (l *AiQueue) Consume(val string) error { aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil) // 调度算法 - _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE) + _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE, nil) if err != nil { return err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 37ad5a7e..6fe0c3e9 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -131,26 +132,50 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int) (interface{}, error) { - //choose strategy - strategy, err := ss.PickOptimalStrategy() - if err != nil { - return nil, err - } +func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters interface{}) (interface{}, error) { + var result interface{} + switch mode { + case JOINT_CLOUD_MODE: + //choose strategy + strategy, err := ss.PickOptimalStrategy() + if err != nil { + return nil, err + } - //schedule - clusters, err := strategy.Schedule() - if err != nil { - return nil, err - } + //schedule + clusters, err := strategy.Schedule() + if err != nil { + return nil, err + } - //assign tasks to clusters - resp, err := ss.AssignTask(clusters, mode) - if err != nil { - return nil, err + //assign tasks to clusters + resp, err := ss.AssignTask(clusters, mode) + if err != nil { + return nil, err + } + + result = resp + + case STORAGE_SCHEDULE_MODE: + jobClusterInfos, ok := assignedClusters.([]*types.JobClusterInfo) + if !ok { + return nil, errors.New("converting JobClusterInfos fails") + } + var clusters []*strategy.AssignedCluster + for _, info := range jobClusterInfos { + cluster := &strategy.AssignedCluster{ClusterId: info.ClusterID, Replicas: 1} + clusters = append(clusters, cluster) + } + //assign tasks to clusters + resp, err := ss.AssignTask(clusters, mode) + if err != nil { + return nil, err + } + + result = resp } - return resp, nil + return result, nil } func (s *Scheduler) SaveToDb() error {