You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

taskStatusSync.go 11 kB

10 months ago
8 months ago
10 months ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package status
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. "net/http"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  19. svc.Scheduler.AiService.TaskSyncLock.Lock()
  20. defer svc.Scheduler.AiService.TaskSyncLock.Unlock()
  21. list := make([]*types.TaskModel, len(tasklist))
  22. copy(list, tasklist)
  23. for i := len(list) - 1; i >= 0; i-- {
  24. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed || list[i].Status == constants.Cancelled {
  25. list = append(list[:i], list[i+1:]...)
  26. }
  27. }
  28. if len(list) == 0 {
  29. return
  30. }
  31. task := list[0]
  32. for i := range list {
  33. earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
  34. latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
  35. if latest.Before(earliest) {
  36. task = list[i]
  37. }
  38. }
  39. // Update Infer Task Status
  40. if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" {
  41. updateInferTaskStatus(svc, *task)
  42. return
  43. }
  44. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  45. if err != nil {
  46. logx.Errorf(err.Error())
  47. return
  48. }
  49. if len(aiTask) == 0 {
  50. err := svc.Scheduler.AiStorages.UpdateTask(task)
  51. if err != nil {
  52. return
  53. }
  54. return
  55. }
  56. logx.Errorf("############ Report Status Message Before switch %s", task.Status)
  57. if len(aiTask) == 1 {
  58. logx.Errorf("############ Report Status Message Switch %s", aiTask[0].Status)
  59. switch aiTask[0].Status {
  60. case constants.Completed:
  61. task.Status = constants.Succeeded
  62. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  63. _ = reportStatusMessages(svc, task, aiTask[0])
  64. case constants.Failed:
  65. task.Status = constants.Failed
  66. logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
  67. _ = reportStatusMessages(svc, task, aiTask[0])
  68. default:
  69. task.Status = aiTask[0].Status
  70. }
  71. task.StartTime = aiTask[0].StartTime
  72. task.EndTime = aiTask[0].EndTime
  73. err := svc.Scheduler.AiStorages.UpdateTask(task)
  74. if err != nil {
  75. return
  76. }
  77. return
  78. }
  79. logx.Errorf("############ Report Status Message After switch %s", task.Status)
  80. for i := len(aiTask) - 1; i >= 0; i-- {
  81. if aiTask[i].StartTime == "" {
  82. task.Status = aiTask[i].Status
  83. aiTask = append(aiTask[:i], aiTask[i+1:]...)
  84. }
  85. }
  86. if len(aiTask) == 0 {
  87. err := svc.Scheduler.AiStorages.UpdateTask(task)
  88. if err != nil {
  89. return
  90. }
  91. return
  92. }
  93. start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
  94. end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
  95. var status string
  96. var count int
  97. for _, a := range aiTask {
  98. s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
  99. e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
  100. if s.Before(start) {
  101. start = s
  102. }
  103. if e.After(end) {
  104. end = e
  105. }
  106. if a.Status == constants.Failed {
  107. status = a.Status
  108. break
  109. }
  110. if a.Status == constants.Pending {
  111. status = a.Status
  112. continue
  113. }
  114. if a.Status == constants.Running {
  115. status = a.Status
  116. continue
  117. }
  118. if a.Status == constants.Completed {
  119. count++
  120. continue
  121. }
  122. }
  123. if count == len(aiTask) {
  124. status = constants.Succeeded
  125. }
  126. if status != "" {
  127. task.Status = status
  128. task.StartTime = start.Format(constants.Layout)
  129. task.EndTime = end.Format(constants.Layout)
  130. }
  131. err = svc.Scheduler.AiStorages.UpdateTask(task)
  132. if err != nil {
  133. return
  134. }
  135. }
  136. func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
  137. report := &jcs.JobStatusReportReq{
  138. TaskName: task.Name,
  139. TaskID: strconv.FormatInt(task.Id, 10),
  140. Messages: make([]*jcs.ReportMessage, 0),
  141. }
  142. //add report msg
  143. var output string
  144. switch aiTask.ClusterName {
  145. case "openI":
  146. output = aiTask.JobId
  147. case "鹏城云脑II-modelarts":
  148. output = aiTask.Output
  149. }
  150. jobMsg := &jcs.ReportMessage{
  151. Status: true,
  152. Message: "",
  153. ClusterID: strconv.FormatInt(aiTask.ClusterId, 10),
  154. Output: output,
  155. }
  156. report.Messages = append(report.Messages, jobMsg)
  157. _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
  158. return nil
  159. }
  160. func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
  161. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  162. if err != nil {
  163. logx.Errorf(err.Error())
  164. return
  165. }
  166. if len(aiTask) == 0 {
  167. //task.Status = constants.Failed
  168. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  169. if err != nil {
  170. return
  171. }
  172. return
  173. }
  174. if len(aiTask) == 1 {
  175. if aiTask[0].Status == constants.Completed {
  176. task.StartTime = aiTask[0].StartTime
  177. task.EndTime = aiTask[0].EndTime
  178. task.Status = constants.Succeeded
  179. } else {
  180. task.StartTime = aiTask[0].StartTime
  181. task.Status = aiTask[0].Status
  182. }
  183. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  184. if err != nil {
  185. return
  186. }
  187. return
  188. }
  189. //for i := len(aiTask) - 1; i >= 0; i-- {
  190. // if aiTask[i].StartTime == "" {
  191. // task.Status = aiTask[i].Status
  192. // aiTask = append(aiTask[:i], aiTask[i+1:]...)
  193. // }
  194. //}
  195. //
  196. //if len(aiTask) == 0 {
  197. // task.UpdatedTime = time.Now().Format(constants.Layout)
  198. // tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  199. // if tx.Error != nil {
  200. // logx.Errorf(tx.Error.Error())
  201. // return
  202. // }
  203. // return
  204. //}
  205. if aiTask[0].StartTime == "" {
  206. return
  207. }
  208. start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local)
  209. end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local)
  210. var status string
  211. var count int
  212. for _, a := range aiTask {
  213. if a.Status == constants.Failed {
  214. status = a.Status
  215. break
  216. }
  217. if a.Status == constants.Pending {
  218. status = a.Status
  219. continue
  220. }
  221. if a.Status == constants.Running {
  222. status = a.Status
  223. continue
  224. }
  225. if a.Status == constants.Completed {
  226. count++
  227. continue
  228. }
  229. }
  230. if count == len(aiTask) {
  231. status = constants.Succeeded
  232. }
  233. if status == constants.Succeeded {
  234. task.Status = status
  235. task.StartTime = start.Format(time.RFC3339)
  236. task.EndTime = end.Format(time.RFC3339)
  237. } else {
  238. task.Status = status
  239. task.StartTime = start.Format(time.RFC3339)
  240. }
  241. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  242. if err != nil {
  243. return
  244. }
  245. }
  246. func UpdateAiTask(svc *svc.ServiceContext, aiTaskList ...*models.TaskAi) {
  247. var wg sync.WaitGroup
  248. for _, aitask := range aiTaskList {
  249. t := aitask
  250. if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
  251. continue
  252. }
  253. wg.Add(1)
  254. go func() {
  255. h := http.Request{}
  256. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  257. if err != nil {
  258. if status.Code(err) == codes.DeadlineExceeded {
  259. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  260. logx.Errorf(errors.New(msg).Error())
  261. wg.Done()
  262. return
  263. }
  264. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  265. logx.Errorf(errors.New(msg).Error())
  266. wg.Done()
  267. return
  268. }
  269. if trainingTask == nil {
  270. wg.Done()
  271. return
  272. }
  273. switch trainingTask.Status {
  274. case constants.Running:
  275. if t.Status != trainingTask.Status {
  276. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
  277. t.Status = trainingTask.Status
  278. }
  279. case constants.Failed:
  280. if t.Status != trainingTask.Status {
  281. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
  282. t.Status = trainingTask.Status
  283. }
  284. case constants.Completed:
  285. if t.Status != trainingTask.Status {
  286. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
  287. t.Status = trainingTask.Status
  288. }
  289. default:
  290. if t.Status != trainingTask.Status {
  291. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
  292. t.Status = trainingTask.Status
  293. }
  294. }
  295. t.StartTime = trainingTask.Start
  296. t.EndTime = trainingTask.End
  297. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  298. if err != nil {
  299. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  300. logx.Errorf(errors.New(msg).Error())
  301. wg.Done()
  302. return
  303. }
  304. wg.Done()
  305. }()
  306. }
  307. wg.Wait()
  308. }
  309. func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  310. list := make([]*types.TaskModel, len(tasklist))
  311. copy(list, tasklist)
  312. for i := len(list) - 1; i >= 0; i-- {
  313. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  314. list = append(list[:i], list[i+1:]...)
  315. }
  316. }
  317. if len(list) == 0 {
  318. return
  319. }
  320. task := list[0]
  321. for i := range list {
  322. earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
  323. latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
  324. if latest.Before(earliest) {
  325. task = list[i]
  326. }
  327. }
  328. aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  329. if err != nil {
  330. logx.Errorf(err.Error())
  331. return
  332. }
  333. if len(aiTaskList) == 0 {
  334. return
  335. }
  336. UpdateAiTask(svc, aiTaskList...)
  337. }
  338. func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
  339. var wg sync.WaitGroup
  340. for _, adapter := range list {
  341. taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  342. if err != nil {
  343. continue
  344. }
  345. if len(taskList) == 0 {
  346. continue
  347. }
  348. for _, task := range taskList {
  349. t := task
  350. if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" {
  351. continue
  352. }
  353. wg.Add(1)
  354. go func() {
  355. h := http.Request{}
  356. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  357. if err != nil {
  358. msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  359. logx.Errorf(errors.New(msg).Error())
  360. wg.Done()
  361. return
  362. }
  363. if trainingTask == nil {
  364. wg.Done()
  365. return
  366. }
  367. t.Status = trainingTask.Status
  368. t.StartTime = trainingTask.Start
  369. t.EndTime = trainingTask.End
  370. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  371. if err != nil {
  372. wg.Done()
  373. return
  374. }
  375. wg.Done()
  376. }()
  377. }
  378. }
  379. wg.Wait()
  380. return
  381. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.