You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

taskStatusSync.go 11 kB

10 months ago
10 months ago
10 months ago
10 months ago

  1. package status
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. "net/http"
  14. "strconv"
  15. "sync"
  16. "time"
  17. )
  18. func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  19. list := make([]*types.TaskModel, len(tasklist))
  20. copy(list, tasklist)
  21. for i := len(list) - 1; i >= 0; i-- {
  22. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed || list[i].Status == constants.Cancelled {
  23. list = append(list[:i], list[i+1:]...)
  24. }
  25. }
  26. if len(list) == 0 {
  27. return
  28. }
  29. task := list[0]
  30. for i := range list {
  31. earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
  32. latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
  33. if latest.Before(earliest) {
  34. task = list[i]
  35. }
  36. }
  37. // Update Infer Task Status
  38. if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" {
  39. updateInferTaskStatus(svc, *task)
  40. return
  41. }
  42. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  43. if err != nil {
  44. logx.Errorf(err.Error())
  45. return
  46. }
  47. if len(aiTask) == 0 {
  48. err := svc.Scheduler.AiStorages.UpdateTask(task)
  49. if err != nil {
  50. return
  51. }
  52. return
  53. }
  54. if len(aiTask) == 1 {
  55. if aiTask[0].Status == constants.Completed {
  56. task.Status = constants.Succeeded
  57. _ = reportStatusMessages(svc, task, aiTask[0])
  58. } else {
  59. task.Status = aiTask[0].Status
  60. }
  61. task.StartTime = aiTask[0].StartTime
  62. task.EndTime = aiTask[0].EndTime
  63. err := svc.Scheduler.AiStorages.UpdateTask(task)
  64. if err != nil {
  65. return
  66. }
  67. return
  68. }
  69. for i := len(aiTask) - 1; i >= 0; i-- {
  70. if aiTask[i].StartTime == "" {
  71. task.Status = aiTask[i].Status
  72. aiTask = append(aiTask[:i], aiTask[i+1:]...)
  73. }
  74. }
  75. if len(aiTask) == 0 {
  76. err := svc.Scheduler.AiStorages.UpdateTask(task)
  77. if err != nil {
  78. return
  79. }
  80. return
  81. }
  82. start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
  83. end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
  84. var status string
  85. var count int
  86. for _, a := range aiTask {
  87. s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
  88. e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
  89. if s.Before(start) {
  90. start = s
  91. }
  92. if e.After(end) {
  93. end = e
  94. }
  95. if a.Status == constants.Failed {
  96. status = a.Status
  97. break
  98. }
  99. if a.Status == constants.Pending {
  100. status = a.Status
  101. continue
  102. }
  103. if a.Status == constants.Running {
  104. status = a.Status
  105. continue
  106. }
  107. if a.Status == constants.Completed {
  108. count++
  109. continue
  110. }
  111. }
  112. if count == len(aiTask) {
  113. status = constants.Succeeded
  114. }
  115. if status != "" {
  116. task.Status = status
  117. task.StartTime = start.Format(constants.Layout)
  118. task.EndTime = end.Format(constants.Layout)
  119. }
  120. err = svc.Scheduler.AiStorages.UpdateTask(task)
  121. if err != nil {
  122. return
  123. }
  124. }
  125. func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
  126. report := &jcs.JobStatusReportReq{
  127. TaskName: task.Name,
  128. TaskID: strconv.FormatInt(task.Id, 10),
  129. Messages: make([]*jcs.ReportMessage, 0),
  130. }
  131. //add report msg
  132. jobMsg := &jcs.ReportMessage{
  133. Status: true,
  134. Message: "",
  135. ClusterID: strconv.FormatInt(aiTask.ClusterId, 10),
  136. Output: aiTask.JobId,
  137. }
  138. report.Messages = append(report.Messages, jobMsg)
  139. _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.Url, report)
  140. return nil
  141. }
  142. func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
  143. aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  144. if err != nil {
  145. logx.Errorf(err.Error())
  146. return
  147. }
  148. if len(aiTask) == 0 {
  149. //task.Status = constants.Failed
  150. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  151. if err != nil {
  152. return
  153. }
  154. return
  155. }
  156. if len(aiTask) == 1 {
  157. if aiTask[0].Status == constants.Completed {
  158. task.StartTime = aiTask[0].StartTime
  159. task.EndTime = aiTask[0].EndTime
  160. task.Status = constants.Succeeded
  161. } else {
  162. task.StartTime = aiTask[0].StartTime
  163. task.Status = aiTask[0].Status
  164. }
  165. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  166. if err != nil {
  167. return
  168. }
  169. return
  170. }
  171. //for i := len(aiTask) - 1; i >= 0; i-- {
  172. // if aiTask[i].StartTime == "" {
  173. // task.Status = aiTask[i].Status
  174. // aiTask = append(aiTask[:i], aiTask[i+1:]...)
  175. // }
  176. //}
  177. //
  178. //if len(aiTask) == 0 {
  179. // task.UpdatedTime = time.Now().Format(constants.Layout)
  180. // tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  181. // if tx.Error != nil {
  182. // logx.Errorf(tx.Error.Error())
  183. // return
  184. // }
  185. // return
  186. //}
  187. if aiTask[0].StartTime == "" {
  188. return
  189. }
  190. start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local)
  191. end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local)
  192. var status string
  193. var count int
  194. for _, a := range aiTask {
  195. if a.Status == constants.Failed {
  196. status = a.Status
  197. break
  198. }
  199. if a.Status == constants.Pending {
  200. status = a.Status
  201. continue
  202. }
  203. if a.Status == constants.Running {
  204. status = a.Status
  205. continue
  206. }
  207. if a.Status == constants.Completed {
  208. count++
  209. continue
  210. }
  211. }
  212. if count == len(aiTask) {
  213. status = constants.Succeeded
  214. }
  215. if status == constants.Succeeded {
  216. task.Status = status
  217. task.StartTime = start.Format(time.RFC3339)
  218. task.EndTime = end.Format(time.RFC3339)
  219. } else {
  220. task.Status = status
  221. task.StartTime = start.Format(time.RFC3339)
  222. }
  223. err = svc.Scheduler.AiStorages.UpdateTask(&task)
  224. if err != nil {
  225. return
  226. }
  227. }
  228. func UpdateAiTask(svc *svc.ServiceContext, aiTaskList ...*models.TaskAi) {
  229. var wg sync.WaitGroup
  230. for _, aitask := range aiTaskList {
  231. t := aitask
  232. if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
  233. continue
  234. }
  235. wg.Add(1)
  236. go func() {
  237. h := http.Request{}
  238. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  239. if err != nil {
  240. if status.Code(err) == codes.DeadlineExceeded {
  241. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  242. logx.Errorf(errors.New(msg).Error())
  243. wg.Done()
  244. return
  245. }
  246. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  247. logx.Errorf(errors.New(msg).Error())
  248. wg.Done()
  249. return
  250. }
  251. if trainingTask == nil {
  252. wg.Done()
  253. return
  254. }
  255. switch trainingTask.Status {
  256. case constants.Running:
  257. if t.Status != trainingTask.Status {
  258. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
  259. t.Status = trainingTask.Status
  260. }
  261. case constants.Failed:
  262. if t.Status != trainingTask.Status {
  263. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
  264. t.Status = trainingTask.Status
  265. }
  266. case constants.Completed:
  267. if t.Status != trainingTask.Status {
  268. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
  269. t.Status = trainingTask.Status
  270. }
  271. default:
  272. if t.Status != trainingTask.Status {
  273. svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
  274. t.Status = trainingTask.Status
  275. }
  276. }
  277. t.StartTime = trainingTask.Start
  278. t.EndTime = trainingTask.End
  279. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  280. if err != nil {
  281. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  282. logx.Errorf(errors.New(msg).Error())
  283. wg.Done()
  284. return
  285. }
  286. wg.Done()
  287. }()
  288. }
  289. wg.Wait()
  290. }
  291. func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  292. list := make([]*types.TaskModel, len(tasklist))
  293. copy(list, tasklist)
  294. for i := len(list) - 1; i >= 0; i-- {
  295. if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  296. list = append(list[:i], list[i+1:]...)
  297. }
  298. }
  299. if len(list) == 0 {
  300. return
  301. }
  302. task := list[0]
  303. for i := range list {
  304. earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
  305. latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
  306. if latest.Before(earliest) {
  307. task = list[i]
  308. }
  309. }
  310. aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
  311. if err != nil {
  312. logx.Errorf(err.Error())
  313. return
  314. }
  315. if len(aiTaskList) == 0 {
  316. return
  317. }
  318. UpdateAiTask(svc, aiTaskList...)
  319. }
  320. func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
  321. var wg sync.WaitGroup
  322. for _, adapter := range list {
  323. taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  324. if err != nil {
  325. continue
  326. }
  327. if len(taskList) == 0 {
  328. continue
  329. }
  330. for _, task := range taskList {
  331. t := task
  332. if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" {
  333. continue
  334. }
  335. wg.Add(1)
  336. go func() {
  337. h := http.Request{}
  338. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  339. if err != nil {
  340. msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  341. logx.Errorf(errors.New(msg).Error())
  342. wg.Done()
  343. return
  344. }
  345. if trainingTask == nil {
  346. wg.Done()
  347. return
  348. }
  349. t.Status = trainingTask.Status
  350. t.StartTime = trainingTask.Start
  351. t.EndTime = trainingTask.End
  352. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  353. if err != nil {
  354. wg.Done()
  355. return
  356. }
  357. wg.Done()
  358. }()
  359. }
  360. }
  361. wg.Wait()
  362. return
  363. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.