You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

taskStatusSync.go 12 kB

10 months ago
8 months ago
10 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package status
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. "net/http"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  19. svc.Scheduler.AiService.TaskSyncLock.Lock()
  20. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  21. list := make([]*types.TaskModel, len(tasklist))
  22. copy(list, tasklist)
  23. for i := len(list) - 1; i >= 0; i-- {
  24. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed || list[i].Status == constants.Cancelled {
  25. list = append(list[:i], list[i+1:]...)
  26. }
  27. }
  28. if len(list) == 0 {
  29. return
  30. }
  31. task := list[0]
  32. for i := range list {
  33. earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
  34. latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
  35. if latest.Before(earliest) {
  36. task = list[i]
  37. }
  38. }
  39. // Update Infer Task Status
  40. //if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" {
  41. // updateInferTaskStatus(svc, *task)
  42. // return
  43. //}
  44. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  45. if err != nil {
  46. logx.Errorf(err.Error())
  47. return
  48. }
  49. if len(aiTask) == 0 {
  50. err := svc.Scheduler.AiStorages.UpdateTask(task)
  51. if err != nil {
  52. return
  53. }
  54. return
  55. }
  56. logx.Errorf("############ Report Status Message Before switch %s", task.Status)
  57. if len(aiTask) == 1 {
  58. logx.Errorf("############ Report Status Message Switch %s", aiTask[0].Status)
  59. switch aiTask[0].Status {
  60. case constants.Completed:
  61. task.Status = constants.Succeeded
  62. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  63. _ = reportStatusMessages(svc, task, aiTask[0])
  64. //case constants.Running:
  65. // task.Status = constants.Succeeded
  66. // logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  67. //
  68. // _ = reportStatusMessages(svc, task, aiTask[0])
  69. case constants.Failed:
  70. task.Status = constants.Failed
  71. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  72. _ = reportStatusMessages(svc, task, aiTask[0])
  73. default:
  74. task.Status = aiTask[0].Status
  75. }
  76. task.StartTime = aiTask[0].StartTime
  77. task.EndTime = aiTask[0].EndTime
  78. err := svc.Scheduler.AiStorages.UpdateTask(task)
  79. if err != nil {
  80. return
  81. }
  82. return
  83. }
  84. logx.Errorf("############ Report Status Message After switch %s", task.Status)
  85. for i := len(aiTask) - 1; i >= 0; i-- {
  86. if aiTask[i].StartTime == "" {
  87. task.Status = aiTask[i].Status
  88. aiTask = append(aiTask[:i], aiTask[i+1:]...)
  89. }
  90. }
  91. if len(aiTask) == 0 {
  92. err := svc.Scheduler.AiStorages.UpdateTask(task)
  93. if err != nil {
  94. return
  95. }
  96. return
  97. }
  98. //start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
  99. //end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
  100. //
  101. //var status string
  102. //var count int
  103. //for _, a := range aiTask {
  104. // s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
  105. // e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
  106. //
  107. // if s.Before(start) {
  108. // start = s
  109. // }
  110. //
  111. // if e.After(end) {
  112. // end = e
  113. // }
  114. //
  115. // if a.Status == constants.Failed {
  116. // status = a.Status
  117. // break
  118. // }
  119. //
  120. // if a.Status == constants.Pending {
  121. // status = a.Status
  122. // continue
  123. // }
  124. //
  125. // if a.Status == constants.Running {
  126. // status = a.Status
  127. // continue
  128. // }
  129. //
  130. // if a.Status == constants.Completed {
  131. // count++
  132. // continue
  133. // }
  134. //}
  135. //if count == len(aiTask) {
  136. // status = constants.Succeeded
  137. //}
  138. //if status != "" {
  139. // task.Status = status
  140. // task.StartTime = start.Format(constants.Layout)
  141. // task.EndTime = end.Format(constants.Layout)
  142. //}
  143. //
  144. //err = svc.Scheduler.AiStorages.UpdateTask(task)
  145. //if err != nil {
  146. // return
  147. //}
  148. }
  149. func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
  150. report := &jcs.JobStatusReportReq{}
  151. reportMsg := &jcs.TrainReportMessage{
  152. Type: "Train",
  153. TaskName: task.Name,
  154. TaskID: strconv.FormatInt(task.Id, 10),
  155. }
  156. var output string
  157. switch aiTask.ClusterName {
  158. case "openI":
  159. output = aiTask.JobId
  160. case "鹏城云脑II-modelarts":
  161. output = aiTask.Output
  162. }
  163. reportMsg.Status = true
  164. reportMsg.Message = ""
  165. reportMsg.ClusterID = strconv.FormatInt(aiTask.ClusterId, 10)
  166. reportMsg.Output = output
  167. report.Report = reportMsg
  168. err := jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  169. if err != nil {
  170. return err
  171. }
  172. err = jcs.TempSaveReportToTask(svc.Scheduler.AiStorages, task, report)
  173. if err != nil {
  174. return err
  175. }
  176. return nil
  177. }
  178. func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
  179. svc.Scheduler.AiService.TaskSyncLock.Lock()
  180. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  181. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  182. if err != nil {
  183. logx.Errorf(err.Error())
  184. return
  185. }
  186. if len(aiTask) == 0 {
  187. task.Status = constants.Failed
  188. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  189. if err != nil {
  190. return
  191. }
  192. return
  193. }
  194. if len(aiTask) == 1 {
  195. if aiTask[0].Status == constants.Completed {
  196. task.StartTime = aiTask[0].StartTime
  197. task.EndTime = aiTask[0].EndTime
  198. task.Status = constants.Succeeded
  199. } else {
  200. task.StartTime = aiTask[0].StartTime
  201. task.Status = aiTask[0].Status
  202. }
  203. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  204. if err != nil {
  205. return
  206. }
  207. return
  208. }
  209. //for i := len(aiTask) - 1; i >= 0; i-- {
  210. // if aiTask[i].StartTime == "" {
  211. // task.Status = aiTask[i].Status
  212. // aiTask = append(aiTask[:i], aiTask[i+1:]...)
  213. // }
  214. //}
  215. //
  216. //if len(aiTask) == 0 {
  217. // task.UpdatedTime = time.Now().Format(constants.Layout)
  218. // tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  219. // if tx.Error != nil {
  220. // logx.Errorf(tx.Error.Error())
  221. // return
  222. // }
  223. // return
  224. //}
  225. if aiTask[0].StartTime == "" {
  226. return
  227. }
  228. start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local)
  229. end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local)
  230. var status string
  231. var count int
  232. for _, a := range aiTask {
  233. if a.Status == constants.Failed {
  234. status = a.Status
  235. break
  236. }
  237. if a.Status == constants.Pending {
  238. status = a.Status
  239. continue
  240. }
  241. if a.Status == constants.Running {
  242. status = a.Status
  243. continue
  244. }
  245. if a.Status == constants.Completed {
  246. count++
  247. continue
  248. }
  249. }
  250. if count == len(aiTask) {
  251. status = constants.Succeeded
  252. }
  253. if status == constants.Succeeded {
  254. task.Status = status
  255. task.StartTime = start.Format(time.RFC3339)
  256. task.EndTime = end.Format(time.RFC3339)
  257. } else {
  258. task.Status = status
  259. task.StartTime = start.Format(time.RFC3339)
  260. }
  261. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  262. if err != nil {
  263. return
  264. }
  265. }
  266. func UpdateAiTask(svc *svc.ServiceContext, aiTaskList ...*models.TaskAi) {
  267. var wg sync.WaitGroup
  268. for _, aitask := range aiTaskList {
  269. t := aitask
  270. if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
  271. continue
  272. }
  273. wg.Add(1)
  274. go func() {
  275. h := http.Request{}
  276. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  277. if err != nil {
  278. if status.Code(err) == codes.DeadlineExceeded {
  279. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  280. logx.Errorf(errors.New(msg).Error())
  281. wg.Done()
  282. return
  283. }
  284. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  285. logx.Errorf(errors.New(msg).Error())
  286. wg.Done()
  287. return
  288. }
  289. if trainingTask == nil {
  290. wg.Done()
  291. return
  292. }
  293. switch trainingTask.Status {
  294. case constants.Running:
  295. if t.Status != trainingTask.Status {
  296. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
  297. t.Status = trainingTask.Status
  298. }
  299. case constants.Failed:
  300. if t.Status != trainingTask.Status {
  301. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
  302. t.Status = trainingTask.Status
  303. }
  304. case constants.Completed:
  305. if t.Status != trainingTask.Status {
  306. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
  307. t.Status = trainingTask.Status
  308. }
  309. default:
  310. if t.Status != trainingTask.Status {
  311. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
  312. t.Status = trainingTask.Status
  313. }
  314. }
  315. t.StartTime = trainingTask.Start
  316. t.EndTime = trainingTask.End
  317. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  318. if err != nil {
  319. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  320. logx.Errorf(errors.New(msg).Error())
  321. wg.Done()
  322. return
  323. }
  324. wg.Done()
  325. }()
  326. }
  327. wg.Wait()
  328. }
  329. func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  330. svc.Scheduler.AiService.TaskSyncLock.Lock()
  331. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  332. list := make([]*types.TaskModel, len(tasklist))
  333. copy(list, tasklist)
  334. for i := len(list) - 1; i >= 0; i-- {
  335. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  336. list = append(list[:i], list[i+1:]...)
  337. }
  338. }
  339. if len(list) == 0 {
  340. return
  341. }
  342. task := list[0]
  343. for i := range list {
  344. earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
  345. latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
  346. if latest.Before(earliest) {
  347. task = list[i]
  348. }
  349. }
  350. aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  351. if err != nil {
  352. logx.Errorf(err.Error())
  353. return
  354. }
  355. if len(aiTaskList) == 0 {
  356. return
  357. }
  358. UpdateAiTask(svc, aiTaskList...)
  359. }
  360. func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
  361. svc.Scheduler.AiService.TaskSyncLock.Lock()
  362. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  363. var wg sync.WaitGroup
  364. for _, adapter := range list {
  365. taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  366. if err != nil {
  367. continue
  368. }
  369. if len(taskList) == 0 {
  370. continue
  371. }
  372. for _, task := range taskList {
  373. t := task
  374. if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" {
  375. continue
  376. }
  377. wg.Add(1)
  378. go func() {
  379. h := http.Request{}
  380. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  381. if err != nil {
  382. msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  383. logx.Errorf(errors.New(msg).Error())
  384. wg.Done()
  385. return
  386. }
  387. if trainingTask == nil {
  388. wg.Done()
  389. return
  390. }
  391. t.Status = trainingTask.Status
  392. t.StartTime = trainingTask.Start
  393. t.EndTime = trainingTask.End
  394. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  395. if err != nil {
  396. wg.Done()
  397. return
  398. }
  399. wg.Done()
  400. }()
  401. }
  402. }
  403. wg.Wait()
  404. return
  405. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.