Browse Source

added runTask api

pull/355/head
tzwang 11 months ago
parent
commit
cdf060883a
10 changed files with 301 additions and 22 deletions
  1. +6
    -0
      desc/pcm.api
  2. +38
    -6
      desc/schedule/pcm-schedule.api
  3. +10
    -0
      internal/handler/routes.go
  4. +26
    -0
      internal/handler/schedule/schedulecanceltaskhandler.go
  5. +4
    -6
      internal/handler/schedule/schedulecreatetaskhandler.go
  6. +26
    -0
      internal/handler/schedule/scheduleruntaskhandler.go
  7. +30
    -0
      internal/logic/schedule/schedulecanceltasklogic.go
  8. +96
    -3
      internal/logic/schedule/schedulecreatetasklogic.go
  9. +30
    -0
      internal/logic/schedule/scheduleruntasklogic.go
  10. +35
    -7
      internal/types/types.go

+ 6
- 0
desc/pcm.api View File

@@ -943,6 +943,12 @@ service pcm {

@handler ScheduleCreateTaskHandler
post /schedule/createTask (CreateTaskReq) returns (CreateTaskResp)

@handler ScheduleRunTaskHandler
post /schedule/runTask (RunTaskReq) returns (RunTaskResp)

@handler ScheduleCancelTaskHandler
post /schedule/cancelTask (CancelTaskReq) returns (CancelTaskResp)
}

@server(


+ 38
- 6
desc/schedule/pcm-schedule.api View File

@@ -162,15 +162,15 @@ type (

// 创建任务(下发资源):/createTask
CreateTaskReq{
Resources []*CreateTaskResource `json:"resources"`
Dataset string `json:"dataset,optional"`
Code string `json:"code,optional"`
Image string `json:"image,optional"`
Clusters []string `json:"clusters,optional"`
StaticWeight map[string ]int32 `json:"staticWeight,optional"`
Resources []map[string]interface{} `json:"resources,optional"`
DataDistributes DataDistribute `json:"dataDistributes"`
}

CreateTaskResource {
Type string `json:"type"`
Req interface{} `json:"req"`
}

DataDistribute {
Dataset []*DatasetDistribute `json:"dataset"`
Code []*CodeDistribute `json:"code"`
@@ -213,4 +213,36 @@ type (
StorageType string `json:"storageType"` // 目的地类型:如代码仓、JCS
ClusterIDs []int64 `json:"clusterIDs"`
}

// 启动任务(资源已就绪):/runTask
RunTaskReq {
TaskID int64 `json:"taskID"`
ScheduledDatas []*DataScheduleResults `json:"scheduledDatas"`
}

DataScheduleResults {
DataType string `json:"dataType"`
Results []DataScheduleResult `json:"results"`
}

DataScheduleResult {
ClusterID int64 `json:"clusterID"`
PackageID int64 `json:"packageID"`
PackageFullPath string `json:"packageFullPath"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

RunTaskResp {

}

CancelTaskReq {
TaskId int64 `json:"taskID"`
}

CancelTaskResp {

}

)

+ 10
- 0
internal/handler/routes.go View File

@@ -1187,6 +1187,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/createTask",
Handler: schedule.ScheduleCreateTaskHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/runTask",
Handler: schedule.ScheduleRunTaskHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/cancelTask",
Handler: schedule.ScheduleCancelTaskHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)


+ 26
- 0
internal/handler/schedule/schedulecanceltaskhandler.go View File

@@ -0,0 +1,26 @@
package schedule

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"

"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)

func ScheduleCancelTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.CancelTaskReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}

l := schedule.NewScheduleCancelTaskLogic(r.Context(), svcCtx)
resp, err := l.ScheduleCancelTask(&req)
result.HttpResult(r, w, resp, err)

}
}

+ 4
- 6
internal/handler/schedule/schedulecreatetaskhandler.go View File

@@ -1,6 +1,7 @@
package schedule

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"

"github.com/zeromicro/go-zero/rest/httpx"
@@ -13,16 +14,13 @@ func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.CreateTaskReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
result.ParamErrorResult(r, w, err)
return
}

l := schedule.NewScheduleCreateTaskLogic(r.Context(), svcCtx)
resp, err := l.ScheduleCreateTask(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
result.HttpResult(r, w, resp, err)

}
}

+ 26
- 0
internal/handler/schedule/scheduleruntaskhandler.go View File

@@ -0,0 +1,26 @@
package schedule

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"

"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)

func ScheduleRunTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.RunTaskReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}

l := schedule.NewScheduleRunTaskLogic(r.Context(), svcCtx)
resp, err := l.ScheduleRunTask(&req)
result.HttpResult(r, w, resp, err)

}
}

+ 30
- 0
internal/logic/schedule/schedulecanceltasklogic.go View File

@@ -0,0 +1,30 @@
package schedule

import (
"context"

"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"

"github.com/zeromicro/go-zero/core/logx"
)

type ScheduleCancelTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}

func NewScheduleCancelTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCancelTaskLogic {
return &ScheduleCancelTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}

func (l *ScheduleCancelTaskLogic) ScheduleCancelTask(req *types.CancelTaskReq) (resp *types.CancelTaskResp, err error) {
// todo: add your logic here and delete this line

return
}

+ 96
- 3
internal/logic/schedule/schedulecreatetasklogic.go View File

@@ -3,9 +3,11 @@ package schedule
import (
"context"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"slices"
"strconv"

"github.com/zeromicro/go-zero/core/logx"
)
@@ -25,7 +27,98 @@ func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext)
}

func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
// todo: add your logic here and delete this line
fmt.Println(req)
resp = &types.CreateTaskResp{}
if len(req.Resources) == 0 && len(req.Clusters) == 0 && len(req.StaticWeight) == 0 {
return nil, fmt.Errorf("must specify at least one resource")
}

if len(req.Clusters) != 0 {
schedatas := generateScheduleResult(req.DataDistributes, req.Clusters)
taskId := l.createTask()
resp.ScheduleDatas = schedatas
resp.TaskID = taskId
return resp, nil
}

if len(req.StaticWeight) != 0 {
strgy := strategy.NewStaticWeightStrategy(req.StaticWeight, 1)
sche, err := strgy.Schedule()
if err != nil {
return nil, err
}
var clusters []string
for _, c := range sche {
if c.Replicas == 0 {
continue
}
clusters = append(clusters, c.ClusterId)
}
schedatas := generateScheduleResult(req.DataDistributes, clusters)
taskId := l.createTask()
resp.ScheduleDatas = schedatas
resp.TaskID = taskId
return resp, nil
}

if len(req.Resources) != 0 {

}
return
}

func generateScheduleResult(distribute types.DataDistribute, clusters []string) []*types.ScheduleData {
var schedatas []*types.ScheduleData

for _, cluster := range clusters {
id, _ := strconv.ParseInt(cluster, 10, 64)
for _, dd := range distribute.Dataset {
if slices.Contains(dd.Clusters, id) {
data := &types.ScheduleData{
DataType: "dataset",
PackageID: dd.PackageID,
ClusterIDs: dd.Clusters,
}
schedatas = append(schedatas, data)
}
}

for _, dd := range distribute.Code {
if slices.Contains(dd.Clusters, id) {
data := &types.ScheduleData{
DataType: "code",
PackageID: dd.PackageID,
ClusterIDs: dd.Clusters,
}
schedatas = append(schedatas, data)
}
}

for _, dd := range distribute.Image {
if slices.Contains(dd.Clusters, id) {
data := &types.ScheduleData{
DataType: "image",
PackageID: dd.PackageID,
ClusterIDs: dd.Clusters,
}
schedatas = append(schedatas, data)
}
}

for _, dd := range distribute.Model {
if slices.Contains(dd.Clusters, id) {
data := &types.ScheduleData{
DataType: "model",
PackageID: dd.PackageID,
ClusterIDs: dd.Clusters,
}
schedatas = append(schedatas, data)
}
}
}

return schedatas
}

func (l *ScheduleCreateTaskLogic) createTask() int64 {
return 123456789
}

+ 30
- 0
internal/logic/schedule/scheduleruntasklogic.go View File

@@ -0,0 +1,30 @@
package schedule

import (
"context"

"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"

"github.com/zeromicro/go-zero/core/logx"
)

type ScheduleRunTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}

func NewScheduleRunTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleRunTaskLogic {
return &ScheduleRunTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}

func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *types.RunTaskResp, err error) {
// todo: add your logic here and delete this line

return
}

+ 35
- 7
internal/types/types.go View File

@@ -5879,13 +5879,13 @@ type QueryResourcesResp struct {
}

type CreateTaskReq struct {
Resources []*CreateTaskResource `json:"resources"`
DataDistributes DataDistribute `json:"dataDistributes"`
}
type CreateTaskResource struct {
Type string `json:"type"`
Req interface{} `json:"req"`
Dataset string `json:"dataset,optional"`
Code string `json:"code,optional"`
Image string `json:"image,optional"`
Clusters []string `json:"clusters,optional"`
StaticWeight map[string]int32 `json:"staticWeight,optional"`
Resources []map[string]interface{} `json:"resources,optional"`
DataDistributes DataDistribute `json:"dataDistributes"`
}

type DataDistribute struct {
@@ -5931,6 +5931,34 @@ type ScheduleData struct {
ClusterIDs []int64 `json:"clusterIDs"`
}

type RunTaskReq struct {
TaskID int64 `json:"taskID"`
ScheduledDatas []*DataScheduleResults `json:"scheduledDatas"`
}

type DataScheduleResults struct {
DataType string `json:"dataType"`
Results []DataScheduleResult `json:"results"`
}

type DataScheduleResult struct {
ClusterID int64 `json:"clusterID"`
PackageID int64 `json:"packageID"`
PackageFullPath string `json:"packageFullPath"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

type RunTaskResp struct {
}

type CancelTaskReq struct {
TaskId int64 `json:"taskID"`
}

type CancelTaskResp struct {
}

type CreateAlertRuleReq struct {
CLusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`


Loading…
Cancel
Save