You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

taskStatusSync.go 11 kB

10 months ago
8 months ago
10 months ago
10 months ago
10 months ago

  1. package status
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. "net/http"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  19. svc.Scheduler.AiService.TaskSyncLock.Lock()
  20. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  21. list := make([]*types.TaskModel, len(tasklist))
  22. copy(list, tasklist)
  23. for i := len(list) - 1; i >= 0; i-- {
  24. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed || list[i].Status == constants.Cancelled {
  25. list = append(list[:i], list[i+1:]...)
  26. }
  27. }
  28. if len(list) == 0 {
  29. return
  30. }
  31. task := list[0]
  32. for i := range list {
  33. earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
  34. latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
  35. if latest.Before(earliest) {
  36. task = list[i]
  37. }
  38. }
  39. // Update Infer Task Status
  40. if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" {
  41. updateInferTaskStatus(svc, *task)
  42. return
  43. }
  44. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  45. if err != nil {
  46. logx.Errorf(err.Error())
  47. return
  48. }
  49. if len(aiTask) == 0 {
  50. err := svc.Scheduler.AiStorages.UpdateTask(task)
  51. if err != nil {
  52. return
  53. }
  54. return
  55. }
  56. if len(aiTask) == 1 {
  57. switch aiTask[0].Status {
  58. case constants.Completed:
  59. task.Status = constants.Succeeded
  60. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  61. _ = reportStatusMessages(svc, task, aiTask[0])
  62. case constants.Failed:
  63. task.Status = constants.Failed
  64. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  65. _ = reportStatusMessages(svc, task, aiTask[0])
  66. default:
  67. task.Status = aiTask[0].Status
  68. }
  69. task.StartTime = aiTask[0].StartTime
  70. task.EndTime = aiTask[0].EndTime
  71. err := svc.Scheduler.AiStorages.UpdateTask(task)
  72. if err != nil {
  73. return
  74. }
  75. return
  76. }
  77. for i := len(aiTask) - 1; i >= 0; i-- {
  78. if aiTask[i].StartTime == "" {
  79. task.Status = aiTask[i].Status
  80. aiTask = append(aiTask[:i], aiTask[i+1:]...)
  81. }
  82. }
  83. if len(aiTask) == 0 {
  84. err := svc.Scheduler.AiStorages.UpdateTask(task)
  85. if err != nil {
  86. return
  87. }
  88. return
  89. }
  90. start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
  91. end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
  92. var status string
  93. var count int
  94. for _, a := range aiTask {
  95. s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
  96. e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
  97. if s.Before(start) {
  98. start = s
  99. }
  100. if e.After(end) {
  101. end = e
  102. }
  103. if a.Status == constants.Failed {
  104. status = a.Status
  105. break
  106. }
  107. if a.Status == constants.Pending {
  108. status = a.Status
  109. continue
  110. }
  111. if a.Status == constants.Running {
  112. status = a.Status
  113. continue
  114. }
  115. if a.Status == constants.Completed {
  116. count++
  117. continue
  118. }
  119. }
  120. if count == len(aiTask) {
  121. status = constants.Succeeded
  122. }
  123. if status != "" {
  124. task.Status = status
  125. task.StartTime = start.Format(constants.Layout)
  126. task.EndTime = end.Format(constants.Layout)
  127. }
  128. err = svc.Scheduler.AiStorages.UpdateTask(task)
  129. if err != nil {
  130. return
  131. }
  132. }
  133. func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
  134. report := &jcs.JobStatusReportReq{
  135. TaskName: task.Name,
  136. TaskID: strconv.FormatInt(task.Id, 10),
  137. Messages: make([]*jcs.ReportMessage, 0),
  138. }
  139. //add report msg
  140. var output string
  141. switch aiTask.ClusterName {
  142. case "openI":
  143. output = aiTask.JobId
  144. case "鹏城云脑II-modelarts":
  145. output = aiTask.Output
  146. }
  147. jobMsg := &jcs.ReportMessage{
  148. Status: true,
  149. Message: "",
  150. ClusterID: strconv.FormatInt(aiTask.ClusterId, 10),
  151. Output: output,
  152. }
  153. report.Messages = append(report.Messages, jobMsg)
  154. _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  155. return nil
  156. }
  157. func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
  158. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  159. if err != nil {
  160. logx.Errorf(err.Error())
  161. return
  162. }
  163. if len(aiTask) == 0 {
  164. //task.Status = constants.Failed
  165. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  166. if err != nil {
  167. return
  168. }
  169. return
  170. }
  171. if len(aiTask) == 1 {
  172. if aiTask[0].Status == constants.Completed {
  173. task.StartTime = aiTask[0].StartTime
  174. task.EndTime = aiTask[0].EndTime
  175. task.Status = constants.Succeeded
  176. } else {
  177. task.StartTime = aiTask[0].StartTime
  178. task.Status = aiTask[0].Status
  179. }
  180. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  181. if err != nil {
  182. return
  183. }
  184. return
  185. }
  186. //for i := len(aiTask) - 1; i >= 0; i-- {
  187. // if aiTask[i].StartTime == "" {
  188. // task.Status = aiTask[i].Status
  189. // aiTask = append(aiTask[:i], aiTask[i+1:]...)
  190. // }
  191. //}
  192. //
  193. //if len(aiTask) == 0 {
  194. // task.UpdatedTime = time.Now().Format(constants.Layout)
  195. // tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  196. // if tx.Error != nil {
  197. // logx.Errorf(tx.Error.Error())
  198. // return
  199. // }
  200. // return
  201. //}
  202. if aiTask[0].StartTime == "" {
  203. return
  204. }
  205. start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local)
  206. end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local)
  207. var status string
  208. var count int
  209. for _, a := range aiTask {
  210. if a.Status == constants.Failed {
  211. status = a.Status
  212. break
  213. }
  214. if a.Status == constants.Pending {
  215. status = a.Status
  216. continue
  217. }
  218. if a.Status == constants.Running {
  219. status = a.Status
  220. continue
  221. }
  222. if a.Status == constants.Completed {
  223. count++
  224. continue
  225. }
  226. }
  227. if count == len(aiTask) {
  228. status = constants.Succeeded
  229. }
  230. if status == constants.Succeeded {
  231. task.Status = status
  232. task.StartTime = start.Format(time.RFC3339)
  233. task.EndTime = end.Format(time.RFC3339)
  234. } else {
  235. task.Status = status
  236. task.StartTime = start.Format(time.RFC3339)
  237. }
  238. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  239. if err != nil {
  240. return
  241. }
  242. }
  243. func UpdateAiTask(svc *svc.ServiceContext, aiTaskList ...*models.TaskAi) {
  244. var wg sync.WaitGroup
  245. for _, aitask := range aiTaskList {
  246. t := aitask
  247. if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
  248. continue
  249. }
  250. wg.Add(1)
  251. go func() {
  252. h := http.Request{}
  253. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  254. if err != nil {
  255. if status.Code(err) == codes.DeadlineExceeded {
  256. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  257. logx.Errorf(errors.New(msg).Error())
  258. wg.Done()
  259. return
  260. }
  261. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  262. logx.Errorf(errors.New(msg).Error())
  263. wg.Done()
  264. return
  265. }
  266. if trainingTask == nil {
  267. wg.Done()
  268. return
  269. }
  270. switch trainingTask.Status {
  271. case constants.Running:
  272. if t.Status != trainingTask.Status {
  273. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
  274. t.Status = trainingTask.Status
  275. }
  276. case constants.Failed:
  277. if t.Status != trainingTask.Status {
  278. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
  279. t.Status = trainingTask.Status
  280. }
  281. case constants.Completed:
  282. if t.Status != trainingTask.Status {
  283. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
  284. t.Status = trainingTask.Status
  285. }
  286. default:
  287. if t.Status != trainingTask.Status {
  288. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
  289. t.Status = trainingTask.Status
  290. }
  291. }
  292. t.StartTime = trainingTask.Start
  293. t.EndTime = trainingTask.End
  294. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  295. if err != nil {
  296. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  297. logx.Errorf(errors.New(msg).Error())
  298. wg.Done()
  299. return
  300. }
  301. wg.Done()
  302. }()
  303. }
  304. wg.Wait()
  305. }
  306. func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  307. list := make([]*types.TaskModel, len(tasklist))
  308. copy(list, tasklist)
  309. for i := len(list) - 1; i >= 0; i-- {
  310. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  311. list = append(list[:i], list[i+1:]...)
  312. }
  313. }
  314. if len(list) == 0 {
  315. return
  316. }
  317. task := list[0]
  318. for i := range list {
  319. earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
  320. latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
  321. if latest.Before(earliest) {
  322. task = list[i]
  323. }
  324. }
  325. aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  326. if err != nil {
  327. logx.Errorf(err.Error())
  328. return
  329. }
  330. if len(aiTaskList) == 0 {
  331. return
  332. }
  333. UpdateAiTask(svc, aiTaskList...)
  334. }
  335. func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
  336. var wg sync.WaitGroup
  337. for _, adapter := range list {
  338. taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  339. if err != nil {
  340. continue
  341. }
  342. if len(taskList) == 0 {
  343. continue
  344. }
  345. for _, task := range taskList {
  346. t := task
  347. if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" {
  348. continue
  349. }
  350. wg.Add(1)
  351. go func() {
  352. h := http.Request{}
  353. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  354. if err != nil {
  355. msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  356. logx.Errorf(errors.New(msg).Error())
  357. wg.Done()
  358. return
  359. }
  360. if trainingTask == nil {
  361. wg.Done()
  362. return
  363. }
  364. t.Status = trainingTask.Status
  365. t.StartTime = trainingTask.Start
  366. t.EndTime = trainingTask.End
  367. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  368. if err != nil {
  369. wg.Done()
  370. return
  371. }
  372. wg.Done()
  373. }()
  374. }
  375. }
  376. wg.Wait()
  377. return
  378. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.