diff --git a/desc/pcm.api b/desc/pcm.api index 49002681..c753a583 100644 --- a/desc/pcm.api +++ b/desc/pcm.api @@ -943,6 +943,12 @@ service pcm { @handler ScheduleCreateTaskHandler post /schedule/createTask (CreateTaskReq) returns (CreateTaskResp) + + @handler ScheduleRunTaskHandler + post /schedule/runTask (RunTaskReq) returns (RunTaskResp) + + @handler ScheduleCancelTaskHandler + post /schedule/cancelTask (CancelTaskReq) returns (CancelTaskResp) } @server( diff --git a/desc/schedule/pcm-schedule.api b/desc/schedule/pcm-schedule.api index de18ef60..c6b65de3 100644 --- a/desc/schedule/pcm-schedule.api +++ b/desc/schedule/pcm-schedule.api @@ -162,15 +162,15 @@ type ( // 创建任务(下发资源):/createTask CreateTaskReq{ - Resources []*CreateTaskResource `json:"resources"` + Dataset string `json:"dataset,optional"` + Code string `json:"code,optional"` + Image string `json:"image,optional"` + Clusters []string `json:"clusters,optional"` + StaticWeight map[string ]int32 `json:"staticWeight,optional"` + Resources []map[string]interface{} `json:"resources,optional"` DataDistributes DataDistribute `json:"dataDistributes"` } - CreateTaskResource { - Type string `json:"type"` - Req interface{} `json:"req"` - } - DataDistribute { Dataset []*DatasetDistribute `json:"dataset"` Code []*CodeDistribute `json:"code"` @@ -213,4 +213,36 @@ type ( StorageType string `json:"storageType"` // 目的地类型:如代码仓、JCS ClusterIDs []int64 `json:"clusterIDs"` } + + // 启动任务(资源已就绪):/runTask + RunTaskReq { + TaskID int64 `json:"taskID"` + ScheduledDatas []*DataScheduleResults `json:"scheduledDatas"` + } + + DataScheduleResults { + DataType string `json:"dataType"` + Results []DataScheduleResult `json:"results"` + } + + DataScheduleResult { + ClusterID int64 `json:"clusterID"` + PackageID int64 `json:"packageID"` + PackageFullPath string `json:"packageFullPath"` + Status bool `json:"status"` + Msg string `json:"msg"` +} + + RunTaskResp { + + } + + CancelTaskReq { + TaskId int64 `json:"taskID"` + } + + CancelTaskResp { + + } + ) \ No newline at end of file diff --git a/internal/handler/routes.go b/internal/handler/routes.go index ef3ae436..35c95d09 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -1187,6 +1187,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/schedule/createTask", Handler: schedule.ScheduleCreateTaskHandler(serverCtx), }, + { + Method: http.MethodPost, + Path: "/schedule/runTask", + Handler: schedule.ScheduleRunTaskHandler(serverCtx), + }, + { + Method: http.MethodPost, + Path: "/schedule/cancelTask", + Handler: schedule.ScheduleCancelTaskHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/internal/handler/schedule/schedulecanceltaskhandler.go b/internal/handler/schedule/schedulecanceltaskhandler.go new file mode 100644 index 00000000..ea091651 --- /dev/null +++ b/internal/handler/schedule/schedulecanceltaskhandler.go @@ -0,0 +1,26 @@ +package schedule + +import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" +) + +func ScheduleCancelTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.CancelTaskReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := schedule.NewScheduleCancelTaskLogic(r.Context(), svcCtx) + resp, err := l.ScheduleCancelTask(&req) + result.HttpResult(r, w, resp, err) + + } +} diff --git a/internal/handler/schedule/schedulecreatetaskhandler.go b/internal/handler/schedule/schedulecreatetaskhandler.go index 5f9b5fa9..71415b6a 100644 --- a/internal/handler/schedule/schedulecreatetaskhandler.go +++ b/internal/handler/schedule/schedulecreatetaskhandler.go @@ -1,6 +1,7 @@ package schedule import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "net/http" "github.com/zeromicro/go-zero/rest/httpx" @@ -13,16 +14,13 @@ func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var req types.CreateTaskReq if err := httpx.Parse(r, &req); err != nil { - httpx.ErrorCtx(r.Context(), w, err) + result.ParamErrorResult(r, w, err) return } l := schedule.NewScheduleCreateTaskLogic(r.Context(), svcCtx) resp, err := l.ScheduleCreateTask(&req) - if err != nil { - httpx.ErrorCtx(r.Context(), w, err) - } else { - httpx.OkJsonCtx(r.Context(), w, resp) - } + result.HttpResult(r, w, resp, err) + } } diff --git a/internal/handler/schedule/scheduleruntaskhandler.go b/internal/handler/schedule/scheduleruntaskhandler.go new file mode 100644 index 00000000..aa5c9b9e --- /dev/null +++ b/internal/handler/schedule/scheduleruntaskhandler.go @@ -0,0 +1,26 @@ +package schedule + +import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" +) + +func ScheduleRunTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.RunTaskReq + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + + l := schedule.NewScheduleRunTaskLogic(r.Context(), svcCtx) + resp, err := l.ScheduleRunTask(&req) + result.HttpResult(r, w, resp, err) + + } +} diff --git a/internal/logic/schedule/schedulecanceltasklogic.go b/internal/logic/schedule/schedulecanceltasklogic.go new file mode 100644 index 00000000..291cfba8 --- /dev/null +++ b/internal/logic/schedule/schedulecanceltasklogic.go @@ -0,0 +1,30 @@ +package schedule + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleCancelTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleCancelTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCancelTaskLogic { + return &ScheduleCancelTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleCancelTaskLogic) ScheduleCancelTask(req *types.CancelTaskReq) (resp *types.CancelTaskResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/internal/logic/schedule/schedulecreatetasklogic.go b/internal/logic/schedule/schedulecreatetasklogic.go index 80cd13c0..44c04c35 100644 --- a/internal/logic/schedule/schedulecreatetasklogic.go +++ b/internal/logic/schedule/schedulecreatetasklogic.go @@ -3,9 +3,11 @@ package schedule import ( "context" "fmt" - + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "slices" + "strconv" "github.com/zeromicro/go-zero/core/logx" ) @@ -25,7 +27,98 @@ func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) { - // todo: add your logic here and delete this line - fmt.Println(req) + resp = &types.CreateTaskResp{} + if len(req.Resources) == 0 && len(req.Clusters) == 0 && len(req.StaticWeight) == 0 { + return nil, fmt.Errorf("must specify at least one resource") + } + + if len(req.Clusters) != 0 { + schedatas := generateScheduleResult(req.DataDistributes, req.Clusters) + taskId := l.createTask() + resp.ScheduleDatas = schedatas + resp.TaskID = taskId + return resp, nil + } + + if len(req.StaticWeight) != 0 { + strgy := strategy.NewStaticWeightStrategy(req.StaticWeight, 1) + sche, err := strgy.Schedule() + if err != nil { + return nil, err + } + var clusters []string + for _, c := range sche { + if c.Replicas == 0 { + continue + } + clusters = append(clusters, c.ClusterId) + } + schedatas := generateScheduleResult(req.DataDistributes, clusters) + taskId := l.createTask() + resp.ScheduleDatas = schedatas + resp.TaskID = taskId + return resp, nil + } + + if len(req.Resources) != 0 { + + } return } + +func generateScheduleResult(distribute types.DataDistribute, clusters []string) []*types.ScheduleData { + var schedatas []*types.ScheduleData + + for _, cluster := range clusters { + id, _ := strconv.ParseInt(cluster, 10, 64) + for _, dd := range distribute.Dataset { + if slices.Contains(dd.Clusters, id) { + data := &types.ScheduleData{ + DataType: "dataset", + PackageID: dd.PackageID, + ClusterIDs: dd.Clusters, + } + schedatas = append(schedatas, data) + } + } + + for _, dd := range distribute.Code { + if slices.Contains(dd.Clusters, id) { + data := &types.ScheduleData{ + DataType: "code", + PackageID: dd.PackageID, + ClusterIDs: dd.Clusters, + } + schedatas = append(schedatas, data) + } + } + + for _, dd := range distribute.Image { + if slices.Contains(dd.Clusters, id) { + data := &types.ScheduleData{ + DataType: "image", + PackageID: dd.PackageID, + ClusterIDs: dd.Clusters, + } + schedatas = append(schedatas, data) + } + } + + for _, dd := range distribute.Model { + if slices.Contains(dd.Clusters, id) { + data := &types.ScheduleData{ + DataType: "model", + PackageID: dd.PackageID, + ClusterIDs: dd.Clusters, + } + schedatas = append(schedatas, data) + } + } + } + + return schedatas +} + +func (l *ScheduleCreateTaskLogic) createTask() int64 { + return 123456789 +} diff --git a/internal/logic/schedule/scheduleruntasklogic.go b/internal/logic/schedule/scheduleruntasklogic.go new file mode 100644 index 00000000..a922293f --- /dev/null +++ b/internal/logic/schedule/scheduleruntasklogic.go @@ -0,0 +1,30 @@ +package schedule + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleRunTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleRunTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleRunTaskLogic { + return &ScheduleRunTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *types.RunTaskResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/internal/types/types.go b/internal/types/types.go index 981f8a0d..30b86df0 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -5879,13 +5879,13 @@ type QueryResourcesResp struct { } type CreateTaskReq struct { - Resources []*CreateTaskResource `json:"resources"` - DataDistributes DataDistribute `json:"dataDistributes"` -} - -type CreateTaskResource struct { - Type string `json:"type"` - Req interface{} `json:"req"` + Dataset string `json:"dataset,optional"` + Code string `json:"code,optional"` + Image string `json:"image,optional"` + Clusters []string `json:"clusters,optional"` + StaticWeight map[string]int32 `json:"staticWeight,optional"` + Resources []map[string]interface{} `json:"resources,optional"` + DataDistributes DataDistribute `json:"dataDistributes"` } type DataDistribute struct { @@ -5931,6 +5931,34 @@ type ScheduleData struct { ClusterIDs []int64 `json:"clusterIDs"` } +type RunTaskReq struct { + TaskID int64 `json:"taskID"` + ScheduledDatas []*DataScheduleResults `json:"scheduledDatas"` +} + +type DataScheduleResults struct { + DataType string `json:"dataType"` + Results []DataScheduleResult `json:"results"` +} + +type DataScheduleResult struct { + ClusterID int64 `json:"clusterID"` + PackageID int64 `json:"packageID"` + PackageFullPath string `json:"packageFullPath"` + Status bool `json:"status"` + Msg string `json:"msg"` +} + +type RunTaskResp struct { +} + +type CancelTaskReq struct { + TaskId int64 `json:"taskID"` +} + +type CancelTaskResp struct { +} + type CreateAlertRuleReq struct { CLusterId string `json:"clusterId"` ClusterName string `json:"clusterName"`