You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

sync_status.go 3.7 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package cloudbrainTask
  2. import (
  3. "net/http"
  4. "os"
  5. "strings"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/cloudbrain"
  8. "code.gitea.io/gitea/modules/httplib"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/notification"
  11. "code.gitea.io/gitea/modules/setting"
  12. "code.gitea.io/gitea/modules/storage"
  13. )
  14. var noteBookOKMap = make(map[int64]int, 20)
  15. //if a task notebook url can get two times, the notebook can browser.
  16. const successfulCount = 3
  17. func SyncCloudBrainOneStatus(task *models.Cloudbrain) (*models.Cloudbrain, error) {
  18. jobResult, err := cloudbrain.GetJob(task.JobID)
  19. if err != nil {
  20. log.Error("GetJob failed:", err)
  21. return task, err
  22. }
  23. result, err := models.ConvertToJobResultPayload(jobResult.Payload)
  24. if err != nil {
  25. log.Error("ConvertToJobResultPayload failed:", err)
  26. return task, err
  27. }
  28. oldStatus := task.Status
  29. if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
  30. taskRoles := result.TaskRoles
  31. taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
  32. task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
  33. task.ContainerID = taskRes.TaskStatuses[0].ContainerID
  34. }
  35. if (result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobRunning)) ||
  36. task.Status == string(models.JobRunning) || (result.JobStatus.State == string(models.JobRunning) && isNoteBookReady(task)) {
  37. models.ParseAndSetDurationFromCloudBrainOne(result, task)
  38. task.Status = result.JobStatus.State
  39. if oldStatus != task.Status {
  40. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  41. err := updateLogFile(task, result)
  42. if err != nil {
  43. log.Error("updateLogFile failed:", err)
  44. return task, err
  45. }
  46. }
  47. err = models.UpdateJob(task)
  48. if err != nil {
  49. log.Error("UpdateJob failed:", err)
  50. return task, err
  51. }
  52. }
  53. return task, nil
  54. }
  55. func updateLogFile(task *models.Cloudbrain, result models.JobResultPayload) error {
  56. if task.Type == models.TypeCloudBrainOne {
  57. taskRoles := result.TaskRoles
  58. taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
  59. existStr := taskRes.TaskStatuses[0].ExitDiagnostics
  60. logDir := "/model"
  61. files, err := storage.GetOneLevelAllObjectUnderDirMinio(setting.Attachment.Minio.Bucket, setting.CBCodePathPrefix+task.JobName+logDir, "")
  62. if err != nil {
  63. log.Error("query cloudbrain model failed: %v", err)
  64. return err
  65. }
  66. fileName := ""
  67. for _, file := range files {
  68. if strings.HasSuffix(file.FileName, "log.txt") {
  69. fileName = file.FileName
  70. break
  71. }
  72. }
  73. if fileName != "" {
  74. prefix := "/" + setting.CBCodePathPrefix + task.JobName + "/model"
  75. configFile, err := os.OpenFile(setting.Attachment.Minio.RealPath+setting.Attachment.Minio.Bucket+prefix+"/"+fileName, os.O_WRONLY|os.O_APPEND, 0666)
  76. if err != nil {
  77. log.Error("open file(%s) failed:%s", prefix+fileName, err)
  78. return err
  79. }
  80. defer configFile.Close()
  81. _, err = configFile.WriteString(existStr)
  82. if err != nil {
  83. log.Error("WriteString failed:%v", err)
  84. return err
  85. }
  86. }
  87. }
  88. return nil
  89. }
  90. func isNoteBookReady(task *models.Cloudbrain) bool {
  91. if task.JobType != string(models.JobTypeDebug) {
  92. return true
  93. }
  94. noteBookUrl := setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName
  95. r := httplib.Get(noteBookUrl)
  96. res, err := r.Response()
  97. if err != nil {
  98. return false
  99. }
  100. if res.StatusCode == http.StatusOK {
  101. count := noteBookOKMap[task.ID]
  102. if count < successfulCount-1 {
  103. noteBookOKMap[task.ID] = count + 1
  104. return false
  105. } else {
  106. delete(noteBookOKMap, task.ID)
  107. return true
  108. }
  109. }
  110. return false
  111. }