package core import ( "context" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" "strings" "time" ) type PushTaskInfoLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewPushTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PushTaskInfoLogic { return &PushTaskInfoLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clientCore.PushTaskInfoResp, error) { resp := clientCore.PushTaskInfoResp{} var kind int32 l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind) switch kind { case 0: var resourceType int32 l.svcCtx.DbEngin.Raw("select resource_type as resourceType from `t_adapter` where id = ?", req.AdapterId).Scan(&resourceType) switch resourceType { case 01: for _, cloudInfo := range req.CloudInfoList { var taskId uint result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId) if errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, errors.New("Record does not exist") } l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) var taskName string l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName) noticeInfo := clientCore.NoticeInfo{ TaskId: cloudInfo.TaskId, AdapterId: cloudInfo.AdapterId, AdapterName: cloudInfo.AdapterName, ClusterId: cloudInfo.ClusterId, ClusterName: cloudInfo.ClusterName, TaskName: taskName, } syncTask(l.svcCtx.DbEngin, noticeInfo) } case 02: for _, vmInfo := range req.VmInfoList { l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ?,server_id = ? where cluster_id = ? and task_id = ? and name = ?", vmInfo.Status, vmInfo.StartTime, vmInfo.ServerId, vmInfo.ClusterId, vmInfo.TaskId, vmInfo.Name) noticeInfo := clientCore.NoticeInfo{ TaskId: vmInfo.TaskId, AdapterId: vmInfo.AdapterId, AdapterName: vmInfo.AdapterName, ClusterId: vmInfo.ClusterId, ClusterName: vmInfo.ClusterName, TaskName: vmInfo.Name, } syncTask(l.svcCtx.DbEngin, noticeInfo) } } case 2: for _, hpcInfo := range req.HpcInfoList { l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name) noticeInfo := clientCore.NoticeInfo{ TaskId: hpcInfo.TaskId, AdapterId: hpcInfo.AdapterId, AdapterName: hpcInfo.AdapterName, ClusterId: hpcInfo.ClusterId, ClusterName: hpcInfo.ClusterName, TaskName: hpcInfo.Name, } syncTask(l.svcCtx.DbEngin, noticeInfo) } case 1: for _, aiInfo := range req.AiInfoList { l.svcCtx.DbEngin.Exec("update task_ai_asynchronous set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", aiInfo.Status, aiInfo.StartTime, aiInfo.JobId, aiInfo.ClusterId, aiInfo.TaskId, aiInfo.Name) noticeInfo := clientCore.NoticeInfo{ TaskId: aiInfo.TaskId, AdapterId: aiInfo.AdapterId, AdapterName: aiInfo.AdapterName, ClusterId: aiInfo.ClusterId, ClusterName: aiInfo.ClusterName, TaskName: aiInfo.Name, } syncTask(l.svcCtx.DbEngin, noticeInfo) } } return &resp, nil } func syncTask(gorm *gorm.DB, noticeInfo clientCore.NoticeInfo) { var allStatus string tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status),GROUP_CONCAT(DISTINCT v.status))as status from task t left join task_hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join task_vm v on t.id = v.task_id left join task_ai a on t.id = a.task_id where t.id = ?", noticeInfo.TaskId).Scan(&allStatus) if tx.Error != nil { logx.Error(tx.Error) } allStatus = strings.ToUpper(allStatus) for pcmStatus, ProviderStatus := range clientCore.StatusMapping { for _, originalStatus := range ProviderStatus { // if Failed type status appears in subTask then update mainTask to Failed if pcmStatus == "Failed" && strings.Contains(allStatus, originalStatus) { updateTask(gorm, noticeInfo.TaskId, constants.Failed) noticeInfo := clientCore.NoticeInfo{ AdapterId: noticeInfo.AdapterId, AdapterName: noticeInfo.AdapterName, ClusterId: noticeInfo.ClusterId, ClusterName: noticeInfo.ClusterName, NoticeType: "failed", TaskName: noticeInfo.TaskName, Incident: "任务执行失败,请查看日志!", CreatedTime: time.Now(), } gorm.Table("t_notice").Create(¬iceInfo) return // no Failed type status in subTask,if Saved type status appears in subTask then update mainTask to Saved } else if pcmStatus == "Saved" && strings.Contains(allStatus, originalStatus) { if getTaskStatus(gorm, noticeInfo.TaskId) != "Saved" { updateTask(gorm, noticeInfo.TaskId, constants.Saved) noticeInfo := clientCore.NoticeInfo{ AdapterId: noticeInfo.AdapterId, AdapterName: noticeInfo.AdapterName, ClusterId: noticeInfo.ClusterId, ClusterName: noticeInfo.ClusterName, NoticeType: "saved", TaskName: noticeInfo.TaskName, Incident: "任务已处于队列中!", CreatedTime: time.Now(), } gorm.Table("t_notice").Create(¬iceInfo) return } else { return } // no Failed and Saved type status in subTask,if Running type status appears in subTask then update mainTask to Running } else if pcmStatus == "Running" && strings.Contains(allStatus, originalStatus) { if getTaskStatus(gorm, noticeInfo.TaskId) != "Running" { updateTask(gorm, noticeInfo.TaskId, constants.Running) noticeInfo := clientCore.NoticeInfo{ AdapterId: noticeInfo.AdapterId, AdapterName: noticeInfo.AdapterName, ClusterId: noticeInfo.ClusterId, ClusterName: noticeInfo.ClusterName, NoticeType: "running", TaskName: noticeInfo.TaskName, Incident: "任务状态切换为运行中!", CreatedTime: time.Now(), } gorm.Table("t_notice").Create(¬iceInfo) return } else { return } // at last, mainTask should be succeeded } else { if strings.Contains(allStatus, originalStatus) { updateTask(gorm, noticeInfo.TaskId, constants.Succeeded) noticeInfo := clientCore.NoticeInfo{ AdapterId: noticeInfo.AdapterId, AdapterName: noticeInfo.AdapterName, ClusterId: noticeInfo.ClusterId, ClusterName: noticeInfo.ClusterName, NoticeType: "succeeded", TaskName: noticeInfo.TaskName, Incident: "任务执行完成!", CreatedTime: time.Now(), } gorm.Table("t_notice").Create(¬iceInfo) return } } } } } func updateTask(gorm *gorm.DB, taskId int64, status string) { now := time.Now() var task models.Task gorm.Where("id = ? ", taskId).Find(&task) if task.Status != status { task.Status = status if status == constants.Running { task.StartTime = &now } if task.Status == constants.Failed || task.Status == constants.Succeeded { task.EndTime = &now } gorm.Updates(&task) } } func getTaskStatus(gorm *gorm.DB, taskId int64) (status string) { var task models.Task gorm.Where("id = ? ", taskId).Find(&task) return task.Status }