|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package status
-
- import (
- "context"
- "fmt"
- jsoniter "github.com/json-iterator/go"
- "github.com/rs/zerolog/log"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gorm.io/gorm"
- "strconv"
- "time"
- )
-
- func reportCloudStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, cloudTask *models.TaskCloud, status bool, message string) error {
- report := &jcs.JobStatusReportReq{}
- reportMsg := &jcs.TrainReportMessage{
- Type: "Train",
- TaskName: task.Name,
- TaskID: strconv.FormatInt(task.Id, 10),
- Status: status,
- Message: message,
- ClusterID: strconv.FormatInt(cloudTask.ClusterId, 10),
- }
- report.Report = reportMsg
-
- marshal, _ := jsoniter.MarshalToString(report)
- log.Debug().Msgf("通知中间件任务状态参数: [%v]", marshal)
- err := jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
- if err != nil {
- logx.Errorf("############ Report Status Message Error %s", err.Error())
- return err
- }
- return nil
- }
-
- // UpdateCloudTaskStatus CLOUD 任务状态同步函数
- func UpdateCloudTaskStatus(svc *svc.ServiceContext) {
- // 1. 查询需要同步的通算任务
- var cloudTaskList []*models.TaskCloud
- sqlStr := `SELECT * FROM task_cloud WHERE status NOT IN ('Failed', 'Completed', 'Cancelled') ORDER BY create_time DESC LIMIT 10`
- if err := svc.DbEngin.Raw(sqlStr).Scan(&cloudTaskList).Error; err != nil {
- logx.Errorf("Failed to query CLOUD tasks for sync: %v", err)
- return
- }
-
- if len(cloudTaskList) == 0 {
- return
- }
-
- // 2. 批量获取关联的 Task 模型
- taskIDs := make([]int64, len(cloudTaskList))
- for i, cloud := range cloudTaskList {
- taskIDs[i] = cloud.TaskId
- }
-
- taskMap := make(map[int64]*types.TaskModel)
- var tasks []*types.TaskModel
- if err := svc.DbEngin.Model(&models.Task{}).Where("id IN ?", taskIDs).Find(&tasks).Error; err != nil {
- logx.Errorf("Failed to batch query tasks: %v", err)
- return
- }
- for _, task := range tasks {
- taskMap[task.Id] = task
- }
-
- // 3. 遍历 CLOUD 任务并更新状态
- for _, cloud := range cloudTaskList {
- task, ok := taskMap[cloud.TaskId]
- if !ok {
- logx.Errorf("Task with ID %d not found for CLOUD task %d, skipping", cloud.TaskId, cloud.Id)
- continue
- }
-
- // 使用带超时的 Context,防止 API 调用阻塞
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
-
- adapterIDStr := strconv.FormatInt(cloud.AdapterId, 10)
- adapter, adapterExists := svc.Scheduler.CloudService.CloudExecutorAdapterMap[adapterIDStr]
- if !adapterExists {
- logx.Errorf("CLOUD adapter with ID %s not found, skipping task %s", adapterIDStr, cloud.Name)
- continue
- }
-
- // 4. 从 CLOUD 集群获取最新状态
- cloudTaskInfo, err := adapter.GetContainer(ctx, cloud.Name, cloud.BusinessCode, utils.Int64ToString(cloud.ClusterId))
- if err != nil {
- logx.Errorf("Failed to get task status from CLOUD executor for job %s: %v", cloud.Name, err)
- continue // 继续处理下一个任务
- }
- if cloudTaskInfo.Status == "" {
- continue
- }
- // 如果状态没有变化,则跳过
- if cloud.Status == cloudTaskInfo.Status {
- continue
- }
-
- // 5. 准备更新
- startTime := convertUTCTimeToCST(cloudTaskInfo.Start)
- previousStatus := cloud.Status
- cloud.Status = cloudTaskInfo.Status
- cloud.StartTime = startTime
-
- task.Status = cloudTaskInfo.Status
- task.StartTime = startTime
- task.EndTime = cloudTaskInfo.End
-
- logx.Infof("CLOUD task status change detected for job %s: %s -> %s", cloud.Name, previousStatus, cloud.Status)
-
- // 6. 在事务中更新数据库
- err = svc.DbEngin.Transaction(func(tx *gorm.DB) error {
- task.UpdatedTime = time.Now().Format(constants.Layout)
- if err := tx.Table("task").Updates(task).Error; err != nil {
- return fmt.Errorf("failed to update task table: %w", err)
- }
- if err := tx.Table("task_cloud").Updates(cloud).Error; err != nil {
- return fmt.Errorf("failed to update cloud_task table: %w", err)
- }
- return nil
- })
-
- if err != nil {
- logx.Errorf("Failed to update database in transaction for job %s: %v", cloud.Name, err)
- // 事务失败,回滚状态,继续处理下一个任务
- cloud.Status = previousStatus
- task.Status = previousStatus
- continue
- }
-
- // 7. 根据新状态执行后续操作 (通知、报告等)
- handleNoticeChange(svc, task, cloud, cloudTaskInfo.Status)
- }
- }
-
- // handleStatusChange 根据新状态执行后续操作
- func handleNoticeChange(svc *svc.ServiceContext, task *types.TaskModel, cloud *models.TaskCloud, newStatus string) {
- adapterIDStr := strconv.FormatInt(cloud.AdapterId, 10)
- clusterIDStr := strconv.FormatInt(cloud.ClusterId, 10)
- var noticeType, noticeMessage string
- var reportSuccess bool
- var shouldReport bool
-
- switch newStatus {
- case constants.Running:
- noticeType = "running"
- noticeMessage = "任务运行中"
- case constants.Failed:
- noticeType = "failed"
- noticeMessage = "任务失败"
- reportSuccess = false
- shouldReport = true
- case constants.Completed:
- noticeType = "completed"
- noticeMessage = "任务完成"
- reportSuccess = true
- shouldReport = true
- case constants.Pending:
- noticeType = "pending"
- noticeMessage = "任务pending"
- default:
- // 对于其他未知状态,可以选择记录日志并返回
- logx.Errorf("Unhandled CLOUD task status '%s' for job %s", newStatus, cloud.Name)
- return
- }
-
- // 发送通知
- svc.Scheduler.CloudStorages.AddNoticeInfo(adapterIDStr, cloud.AdapterName, clusterIDStr, cloud.ClusterName, cloud.Name, noticeType, noticeMessage)
- logx.Infof("[%s]: 任务状态变更为 [%s],发送通知。", cloud.Name, newStatus)
-
- // 上报状态
- if shouldReport {
- if err := reportCloudStatusMessages(svc, task, cloud, reportSuccess, noticeMessage); err != nil {
- logx.Errorf("Failed to report Cloud status for job %s: %v", cloud.Name, err)
- }
- }
- }
- func convertUTCTimeToCST(utcTimeStr string) string {
- if utcTimeStr == "" {
- return ""
- }
-
- // 定义多种可能的时间格式
- timeFormats := []string{
- "2006-01-02T15:04:05Z", // ISO 8601 格式
- "2006-01-02 15:04:05", // 常见格式
- "2006-01-02T15:04:05", // ISO 无时区
- "2006-01-02 15:04:05Z", // 带Z的常见格式
- time.RFC3339, // RFC3339 标准格式
- "2006-01-02T15:04:05.000Z", // 带毫秒的ISO格式
- }
-
- var utcTime time.Time
- var err error
-
- // 尝试解析多种格式
- for _, format := range timeFormats {
- utcTime, err = time.Parse(format, utcTimeStr)
- if err == nil {
- break
- }
- }
-
- // 如果所有格式都失败,记录警告并返回原字符串
- if err != nil {
- logx.Errorf("Failed to parse time string '%s' with all known formats, returning original string", utcTimeStr)
- return utcTimeStr
- }
-
- // 创建东八区时区
- cstZone := time.FixedZone("CST", 8*3600)
-
- // 将UTC时间转换为东八区时间
- cstTime := utcTime.In(cstZone)
-
- // 格式化东八区时间
- return cstTime.Format("2006-01-02T15:04:05-07:00")
- }
|