diff --git a/internal/logic/cloud/commitgeneraltasklogic.go b/internal/logic/cloud/commitgeneraltasklogic.go index 9d95b66f..ae774b8b 100644 --- a/internal/logic/cloud/commitgeneraltasklogic.go +++ b/internal/logic/cloud/commitgeneraltasklogic.go @@ -5,9 +5,9 @@ import ( "context" "github.com/pkg/errors" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" @@ -70,7 +70,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er utils.Convert(&req, &opt) sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) - results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, scheduler.SUBMIT_MODE_JOINT_CLOUD, nil) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, executor.SUBMIT_MODE_JOINT_CLOUD, nil) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err diff --git a/internal/logic/core/commitvmtasklogic.go b/internal/logic/core/commitvmtasklogic.go index 6fc10814..4ca4b11b 100644 --- a/internal/logic/core/commitvmtasklogic.go +++ b/internal/logic/core/commitvmtasklogic.go @@ -4,9 +4,9 @@ import ( "context" "fmt" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -63,7 +63,7 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type return nil, err } // 3、Return scheduling results - results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl, scheduler.SUBMIT_MODE_JOINT_CLOUD, nil) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl, executor.SUBMIT_MODE_JOINT_CLOUD, nil) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return nil, err diff --git a/internal/logic/schedule/scheduleruntasklogic.go b/internal/logic/schedule/scheduleruntasklogic.go index 984f203f..3bed907a 100644 --- a/internal/logic/schedule/scheduleruntasklogic.go +++ b/internal/logic/schedule/scheduleruntasklogic.go @@ -6,9 +6,9 @@ import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -54,8 +54,9 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ } opt := &option.AiOption{ - AdapterId: ADAPTERID, - TaskName: task.Name, + AdapterId: ADAPTERID, + TaskName: task.Name, + StrategyName: "", } // update assignedClusters err = updateClustersByScheduledDatas(task.Id, &clusters, req.ScheduledDatas) @@ -68,7 +69,7 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ return nil, err } - results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.SUBMIT_MODE_STORAGE_SCHEDULE, clusters) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_STORAGE_SCHEDULE, clusters) if err != nil { return nil, err } diff --git a/internal/logic/schedule/schedulesubmitlogic.go b/internal/logic/schedule/schedulesubmitlogic.go index 284c843e..522781a6 100644 --- a/internal/logic/schedule/schedulesubmitlogic.go +++ b/internal/logic/schedule/schedulesubmitlogic.go @@ -2,9 +2,9 @@ package schedule import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -52,7 +52,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.SUBMIT_MODE_JOINT_CLOUD, nil) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_JOINT_CLOUD, nil) if err != nil { return nil, err } diff --git a/internal/mqs/ScheduleAi.go b/internal/mqs/ScheduleAi.go index 2e33cc77..8720cdf5 100644 --- a/internal/mqs/ScheduleAi.go +++ b/internal/mqs/ScheduleAi.go @@ -16,8 +16,8 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) @@ -41,7 +41,7 @@ func (l *AiQueue) Consume(val string) error { aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil) // 调度算法 - _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.SUBMIT_MODE_JOINT_CLOUD, nil) + _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_JOINT_CLOUD, nil) if err != nil { return err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 07714570..5399db0d 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -21,6 +21,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gorm.io/gorm" @@ -29,11 +30,6 @@ import ( "sync" ) -const ( - SUBMIT_MODE_JOINT_CLOUD = iota + 1 - SUBMIT_MODE_STORAGE_SCHEDULE -) - type Scheduler struct { task *response.TaskInfo participantIds []int64 @@ -134,7 +130,7 @@ func (s *Scheduler) TempAssign() error { func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters []*strategy.AssignedCluster) (interface{}, error) { var result interface{} switch mode { - case SUBMIT_MODE_JOINT_CLOUD: + case executor.SUBMIT_MODE_JOINT_CLOUD: //choose strategy strategy, err := ss.PickOptimalStrategy() if err != nil { @@ -155,7 +151,7 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int, assignedClusters result = resp - case SUBMIT_MODE_STORAGE_SCHEDULE: + case executor.SUBMIT_MODE_STORAGE_SCHEDULE: //assign tasks to clusters resp, err := ss.AssignTask(assignedClusters, mode) diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 662d596d..7276e820 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -25,6 +25,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -282,7 +283,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) { switch mode { - case scheduler.SUBMIT_MODE_STORAGE_SCHEDULE: + case executor.SUBMIT_MODE_STORAGE_SCHEDULE: opt.Cmd = cluster.Cmd opt.Envs = cluster.Envs opt.Params = cluster.Params diff --git a/internal/scheduler/service/executor/aiExecutor.go b/internal/scheduler/service/executor/aiExecutor.go index e252d16d..5f9ca651 100644 --- a/internal/scheduler/service/executor/aiExecutor.go +++ b/internal/scheduler/service/executor/aiExecutor.go @@ -5,6 +5,11 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" ) +const ( + SUBMIT_MODE_JOINT_CLOUD = iota + 1 + SUBMIT_MODE_STORAGE_SCHEDULE +) + type AiExecutor interface { Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) } diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index 3af11c3a..0ac4ce7d 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -604,12 +605,12 @@ func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*colle func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { switch mode { - case 1: + case executor.SUBMIT_MODE_JOINT_CLOUD: err := s.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } - case 2: + case executor.SUBMIT_MODE_STORAGE_SCHEDULE: var dcuNum int64 for _, res := range option.ResourcesRequired { typeName, ok := res["type"]