Former-commit-id: 6d85d7373c
pull/154/head
| @@ -5,20 +5,12 @@ import ( | |||
| "time" | |||
| ) | |||
| var HpcStatusMapping = map[string][]string{ | |||
| var StatusMapping = map[string][]string{ | |||
| "Running": {"RUNNING", "RUNNING", "CONFIGURING", "COMPLETING"}, | |||
| "Succeeded": {"COMPLETED"}, | |||
| "Failed": {"FAILED", "TIMEOUT", "DEADLINE", "OUT_OF_MEMORY", "BOOT_FAIL", "CANCELLED"}, | |||
| } | |||
| var AiStatusMapping = map[string]string{ | |||
| "PENDING": "Running", | |||
| } | |||
| var CloudStatusMapping = map[string]string{ | |||
| "PENDING": "Running", | |||
| } | |||
| type PullTaskInfoReq struct { | |||
| AdapterId int64 `form:"adapterId"` | |||
| } | |||
| @@ -58,6 +50,7 @@ type NoticeInfo struct { | |||
| ClusterId int64 `json:"clusterId"` | |||
| ClusterName string `json:"clusterName"` | |||
| NoticeType string `json:"noticeType"` | |||
| TaskId int64 `json:"taskId"` | |||
| TaskName string `json:"taskName"` | |||
| Incident string `json:"incident"` | |||
| CreatedTime time.Time `json:"createdTime"` | |||
| @@ -82,11 +75,13 @@ type PushNoticeResp struct { | |||
| } | |||
| type HpcInfo struct { | |||
| Id int64 `json:"id"` // id | |||
| TaskId int64 `json:"task_id"` // 任务id | |||
| JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id) | |||
| AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id | |||
| ClusterId int64 `json:"cluster_id"` // 执行任务的集群id | |||
| Id int64 `json:"id"` // id | |||
| TaskId int64 `json:"task_id"` // 任务id | |||
| JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id) | |||
| AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id | |||
| AdapterName string `json:"adapterName,omitempty,optional"` | |||
| ClusterId int64 `json:"cluster_id"` // 执行任务的集群id | |||
| ClusterName string `json:"clusterName,omitempty,optional"` | |||
| ClusterType string `json:"cluster_type"` // 执行任务的集群类型 | |||
| Name string `json:"name"` // 名称 | |||
| Status string `json:"status"` // 状态 | |||
| @@ -127,8 +122,9 @@ type HpcInfo struct { | |||
| type CloudInfo struct { | |||
| Id uint `json:"id,omitempty,optional"` | |||
| TaskId int64 `json:"taskId,omitempty,optional"` | |||
| AdapterId uint `json:"adapterId,omitempty,optional"` | |||
| ClusterId uint `json:"clusterId,omitempty,optional"` | |||
| AdapterId int64 `json:"adapterId,omitempty,optional"` | |||
| AdapterName string `json:"adapterName,omitempty,optional"` | |||
| ClusterId int64 `json:"clusterId,omitempty,optional"` | |||
| ClusterName string `json:"clusterName,omitempty,optional"` | |||
| Kind string `json:"kind,omitempty,optional"` | |||
| Status string `json:"status,omitempty,optional"` | |||
| @@ -139,9 +135,12 @@ type CloudInfo struct { | |||
| } | |||
| type AiInfo struct { | |||
| ParticipantId int64 `json:"participantId,omitempty"` | |||
| TaskId int64 `json:"taskId,omitempty"` | |||
| ProjectId string `json:"project_id,omitempty"` | |||
| AdapterId int64 `json:"adapterId,omitempty,optional"` | |||
| AdapterName string `json:"adapterName,omitempty,optional"` | |||
| ClusterId int64 `json:"clusterId,omitempty,optional"` | |||
| ClusterName string `json:"clusterName,omitempty,optional"` | |||
| Name string `json:"name,omitempty"` | |||
| Status string `json:"status,omitempty"` | |||
| StartTime string `json:"startTime,omitempty"` | |||
| @@ -157,9 +156,12 @@ type AiInfo struct { | |||
| } | |||
| type VmInfo struct { | |||
| ParticipantId int64 `json:"participantId,omitempty"` | |||
| TaskId int64 `json:"taskId,omitempty"` | |||
| Name string `json:"name,omitempty"` | |||
| AdapterId int64 `json:"adapterId,omitempty,optional"` | |||
| AdapterName string `json:"adapterName,omitempty,optional"` | |||
| ClusterId int64 `json:"clusterId,omitempty,optional"` | |||
| ClusterName string `json:"clusterName,omitempty,optional"` | |||
| FlavorRef string `json:"flavor_ref,omitempty"` | |||
| ImageRef string `json:"image_ref,omitempty"` | |||
| NetworkUuid string `json:"network_uuid,omitempty"` | |||
| @@ -41,61 +41,143 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie | |||
| } | |||
| l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", | |||
| cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) | |||
| syncTask(l.svcCtx.DbEngin, int64(taskId)) | |||
| var taskName string | |||
| l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| TaskId: cloudInfo.TaskId, | |||
| AdapterId: cloudInfo.AdapterId, | |||
| AdapterName: cloudInfo.AdapterName, | |||
| ClusterId: cloudInfo.ClusterId, | |||
| ClusterName: cloudInfo.ClusterName, | |||
| TaskName: taskName, | |||
| } | |||
| syncTask(l.svcCtx.DbEngin, noticeInfo) | |||
| } | |||
| case 2: | |||
| for _, hpcInfo := range req.HpcInfoList { | |||
| l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", | |||
| hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name) | |||
| syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| TaskId: hpcInfo.TaskId, | |||
| AdapterId: hpcInfo.AdapterId, | |||
| AdapterName: hpcInfo.AdapterName, | |||
| ClusterId: hpcInfo.ClusterId, | |||
| ClusterName: hpcInfo.ClusterName, | |||
| TaskName: hpcInfo.Name, | |||
| } | |||
| syncTask(l.svcCtx.DbEngin, noticeInfo) | |||
| } | |||
| case 1: | |||
| for _, aiInfo := range req.AiInfoList { | |||
| l.svcCtx.DbEngin.Exec("update task_ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", | |||
| aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name) | |||
| syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| TaskId: aiInfo.TaskId, | |||
| AdapterId: aiInfo.AdapterId, | |||
| AdapterName: aiInfo.AdapterName, | |||
| ClusterId: aiInfo.ClusterId, | |||
| ClusterName: aiInfo.ClusterName, | |||
| TaskName: aiInfo.Name, | |||
| } | |||
| syncTask(l.svcCtx.DbEngin, noticeInfo) | |||
| } | |||
| case 3: | |||
| for _, vmInfo := range req.VmInfoList { | |||
| l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?", | |||
| vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name) | |||
| syncTask(l.svcCtx.DbEngin, vmInfo.TaskId) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| TaskId: vmInfo.TaskId, | |||
| AdapterId: vmInfo.AdapterId, | |||
| AdapterName: vmInfo.AdapterName, | |||
| ClusterId: vmInfo.ClusterId, | |||
| ClusterName: vmInfo.ClusterName, | |||
| TaskName: vmInfo.Name, | |||
| } | |||
| syncTask(l.svcCtx.DbEngin, noticeInfo) | |||
| } | |||
| } | |||
| return &resp, nil | |||
| } | |||
| func syncTask(gorm *gorm.DB, taskId int64) { | |||
| func syncTask(gorm *gorm.DB, noticeInfo clientCore.NoticeInfo) { | |||
| var allStatus string | |||
| tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join task_hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join task_ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) | |||
| tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join task_hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join task_ai a on t.id = a.task_id where t.id = ?", noticeInfo.TaskId).Scan(&allStatus) | |||
| if tx.Error != nil { | |||
| logx.Error(tx.Error) | |||
| } | |||
| for pcmStatus, hpcStatus := range clientCore.HpcStatusMapping { | |||
| for _, status := range hpcStatus { | |||
| for pcmStatus, ProviderStatus := range clientCore.StatusMapping { | |||
| for _, originalStatus := range ProviderStatus { | |||
| // if Failed type status appears in subTask then update mainTask to Failed | |||
| if pcmStatus == "Failed" && strings.Contains(allStatus, status) { | |||
| updateTask(gorm, taskId, constants.Failed) | |||
| if pcmStatus == "Failed" && strings.Contains(allStatus, originalStatus) { | |||
| updateTask(gorm, noticeInfo.TaskId, constants.Failed) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| AdapterId: noticeInfo.AdapterId, | |||
| AdapterName: noticeInfo.AdapterName, | |||
| ClusterId: noticeInfo.ClusterId, | |||
| ClusterName: noticeInfo.ClusterName, | |||
| NoticeType: "failed", | |||
| TaskName: noticeInfo.TaskName, | |||
| Incident: "任务执行失败,请查看日志!", | |||
| CreatedTime: time.Now(), | |||
| } | |||
| gorm.Table("t_notice").Create(¬iceInfo) | |||
| return | |||
| // no Failed type status in subTask,if Saved type status appears in subTask then update mainTask to Saved | |||
| } else if pcmStatus == "Saved" { | |||
| if strings.Contains(allStatus, status) { | |||
| updateTask(gorm, taskId, constants.Saved) | |||
| } else if pcmStatus == "Saved" && strings.Contains(allStatus, originalStatus) { | |||
| if getTaskStatus(gorm, noticeInfo.TaskId) != "Saved" { | |||
| updateTask(gorm, noticeInfo.TaskId, constants.Saved) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| AdapterId: noticeInfo.AdapterId, | |||
| AdapterName: noticeInfo.AdapterName, | |||
| ClusterId: noticeInfo.ClusterId, | |||
| ClusterName: noticeInfo.ClusterName, | |||
| NoticeType: "saved", | |||
| TaskName: noticeInfo.TaskName, | |||
| Incident: "任务已处于队列中!", | |||
| CreatedTime: time.Now(), | |||
| } | |||
| gorm.Table("t_notice").Create(¬iceInfo) | |||
| return | |||
| } else { | |||
| return | |||
| } | |||
| // no Failed and Saved type status in subTask,if Running type status appears in subTask then update mainTask to Running | |||
| } else if pcmStatus == "Running" { | |||
| if strings.Contains(allStatus, status) { | |||
| updateTask(gorm, taskId, constants.Running) | |||
| } else if pcmStatus == "Running" && strings.Contains(allStatus, originalStatus) { | |||
| if getTaskStatus(gorm, noticeInfo.TaskId) != "Running" { | |||
| updateTask(gorm, noticeInfo.TaskId, constants.Running) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| AdapterId: noticeInfo.AdapterId, | |||
| AdapterName: noticeInfo.AdapterName, | |||
| ClusterId: noticeInfo.ClusterId, | |||
| ClusterName: noticeInfo.ClusterName, | |||
| NoticeType: "running", | |||
| TaskName: noticeInfo.TaskName, | |||
| Incident: "任务状态切换为运行中!", | |||
| CreatedTime: time.Now(), | |||
| } | |||
| gorm.Table("t_notice").Create(¬iceInfo) | |||
| return | |||
| } else { | |||
| return | |||
| } | |||
| // at last, mainTask should be succeeded | |||
| } else { | |||
| if strings.Contains(allStatus, status) { | |||
| updateTask(gorm, taskId, constants.Succeeded) | |||
| if strings.Contains(allStatus, originalStatus) { | |||
| updateTask(gorm, noticeInfo.TaskId, constants.Succeeded) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| AdapterId: noticeInfo.AdapterId, | |||
| AdapterName: noticeInfo.AdapterName, | |||
| ClusterId: noticeInfo.ClusterId, | |||
| ClusterName: noticeInfo.ClusterName, | |||
| NoticeType: "succeeded", | |||
| TaskName: noticeInfo.TaskName, | |||
| Incident: "任务执行完成!", | |||
| CreatedTime: time.Now(), | |||
| } | |||
| gorm.Table("t_notice").Create(¬iceInfo) | |||
| return | |||
| } | |||
| } | |||
| @@ -117,22 +199,10 @@ func updateTask(gorm *gorm.DB, taskId int64, status string) { | |||
| } | |||
| gorm.Updates(&task) | |||
| } | |||
| } | |||
| func removeRepeatedElement(arr []string) (newArr []string) { | |||
| newArr = make([]string, 0) | |||
| for i := 0; i < len(arr); i++ { | |||
| repeat := false | |||
| for j := i + 1; j < len(arr); j++ { | |||
| if arr[i] == arr[j] { | |||
| repeat = true | |||
| break | |||
| } | |||
| } | |||
| if !repeat { | |||
| newArr = append(newArr, arr[i]) | |||
| } | |||
| } | |||
| return | |||
| func getTaskStatus(gorm *gorm.DB, taskId int64) (status string) { | |||
| var task models.Task | |||
| gorm.Where("id = ? ", taskId).Find(&task) | |||
| return task.Status | |||
| } | |||
| @@ -52,6 +52,13 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t | |||
| var clusterIds []int64 | |||
| l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds) | |||
| adapterId, _ := strconv.ParseInt(req.AdapterId, 10, 64) | |||
| var adapterName string | |||
| l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", req.AdapterId).Scan(&adapterName) | |||
| clusterId := clusterIds[rand.Intn(len(clusterIds))] | |||
| var clusterName string | |||
| l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", clusterId).Scan(&clusterName) | |||
| env, _ := json.Marshal(req.Environment) | |||
| if len(clusterIds) == 0 || clusterIds == nil { | |||
| @@ -62,7 +69,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t | |||
| hpcInfo := models.TaskHpc{ | |||
| TaskId: taskModel.Id, | |||
| ClusterId: clusterIds[rand.Intn(len(clusterIds))], | |||
| AdapterId: uint(adapterId), | |||
| AdapterName: adapterName, | |||
| ClusterId: uint(clusterId), | |||
| ClusterName: clusterName, | |||
| Name: taskModel.Name, | |||
| Status: "Saved", | |||
| CmdScript: req.CmdScript, | |||
| @@ -90,12 +100,11 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t | |||
| if tx.Error != nil { | |||
| return nil, tx.Error | |||
| } | |||
| adapterId, _ := strconv.ParseUint(req.AdapterId, 10, 64) | |||
| adapterName := "" | |||
| tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName) | |||
| noticeInfo := clientCore.NoticeInfo{ | |||
| AdapterId: int64(adapterId), | |||
| AdapterId: adapterId, | |||
| AdapterName: adapterName, | |||
| ClusterId: clusterId, | |||
| ClusterName: clusterName, | |||
| NoticeType: "create", | |||
| TaskName: req.Name, | |||
| Incident: "任务创建中", | |||
| @@ -55,10 +55,10 @@ func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Schedu | |||
| func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { | |||
| ai := models.Ai{ | |||
| ParticipantId: participantId, | |||
| TaskId: task.TaskId, | |||
| Status: "Saved", | |||
| YamlString: as.yamlString, | |||
| AdapterId: participantId, | |||
| TaskId: task.TaskId, | |||
| Status: "Saved", | |||
| YamlString: as.yamlString, | |||
| } | |||
| utils.Convert(task.Metadata, &ai) | |||
| return ai, nil | |||
| @@ -36,19 +36,22 @@ type ( | |||
| } | |||
| Ai struct { | |||
| Id int64 `db:"id"` // id | |||
| TaskId int64 `db:"task_id"` // 任务id | |||
| ParticipantId int64 `db:"participant_id"` // 集群静态信息id | |||
| ProjectId string `db:"project_id"` // 项目id | |||
| Name string `db:"name"` // 名称 | |||
| Status string `db:"status"` // 状态 | |||
| StartTime string `db:"start_time"` // 开始时间 | |||
| RunningTime int64 `db:"running_time"` // 运行时间 | |||
| CreatedBy int64 `db:"created_by"` // 创建人 | |||
| CreatedTime sql.NullTime `db:"created_time"` // 创建时间 | |||
| UpdatedBy int64 `db:"updated_by"` // 更新人 | |||
| UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间 | |||
| DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) | |||
| Id int64 `db:"id"` // id | |||
| TaskId int64 `db:"task_id"` // 任务id | |||
| AdapterId int64 `db:"adapter_id"` // 适配器id | |||
| AdapterName string `db:"adapter_name"` //适配器名称 | |||
| ClusterId int64 `db:"cluster_id"` //集群id | |||
| ClusterName string `db:"cluster_name"` //集群名称 | |||
| ProjectId string `db:"project_id"` // 项目id | |||
| Name string `db:"name"` // 名称 | |||
| Status string `db:"status"` // 状态 | |||
| StartTime string `db:"start_time"` // 开始时间 | |||
| RunningTime int64 `db:"running_time"` // 运行时间 | |||
| CreatedBy int64 `db:"created_by"` // 创建人 | |||
| CreatedTime sql.NullTime `db:"created_time"` // 创建时间 | |||
| UpdatedBy int64 `db:"updated_by"` // 更新人 | |||
| UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间 | |||
| DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) | |||
| Result string `db:"result"` | |||
| YamlString string `db:"yaml_string"` | |||
| JobId string `db:"job_id"` | |||
| @@ -9,6 +9,7 @@ type TaskCloudModel struct { | |||
| Id uint `json:"id" gorm:"primarykey;not null;comment:id"` | |||
| TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` | |||
| AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` | |||
| AdapterName string `json:"adapterName" gorm:"not null;comment:适配器名称"` | |||
| ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` | |||
| ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` | |||
| Kind string `json:"kind" gorm:"comment:种类"` | |||
| @@ -36,12 +36,15 @@ type ( | |||
| } | |||
| TaskHpc struct { | |||
| Id int64 `db:"id"` // id | |||
| TaskId int64 `db:"task_id"` // 任务id | |||
| JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id) | |||
| ClusterId int64 `db:"cluster_id"` // 执行任务的集群id | |||
| Name string `db:"name"` // 名称 | |||
| Status string `db:"status"` // 状态 | |||
| Id int64 `db:"id"` // id | |||
| TaskId int64 `db:"task_id"` // 任务id | |||
| JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id) | |||
| AdapterId uint `db:"adapter_d"` // 适配器id | |||
| AdapterName string `db:"adapter_name"` //适配器名称 | |||
| ClusterId uint `db:"cluster_id"` //集群id | |||
| ClusterName string `db:"cluster_name"` //集群名称 | |||
| Name string `db:"name"` // 名称 | |||
| Status string `db:"status"` // 状态 | |||
| CmdScript string `db:"cmd_script"` | |||
| StartTime string `db:"start_time"` // 开始时间 | |||
| RunningTime int64 `db:"running_time"` // 运行时间 | |||
| @@ -39,8 +39,10 @@ type ( | |||
| ParticipantId int64 `db:"participant_id"` // p端id | |||
| TaskId int64 `db:"task_id"` // 任务id | |||
| Name string `db:"name"` // 虚拟机名称 | |||
| AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id | |||
| ClusterId int64 `db:"cluster_id"` // 执行任务的集群id | |||
| AdapterId int64 `db:"adapter_id"` // 适配器id | |||
| AdapterName string `db:"adapter_name"` //适配器名称 | |||
| ClusterId int64 `db:"cluster_id"` //集群id | |||
| ClusterName string `db:"cluster_name"` //集群名称 | |||
| FlavorRef string `db:"flavor_ref"` // 规格索引 | |||
| ImageRef string `db:"image_ref"` // 镜像索引 | |||
| Status string `db:"status"` // 状态 | |||