You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiTask.go 9.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package cron
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "github.com/zeromicro/go-zero/zrpc"
  7. "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  16. "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
  17. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  18. "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
  19. "google.golang.org/grpc/codes"
  20. "google.golang.org/grpc/status"
  21. "net/http"
  22. "strconv"
  23. "sync"
  24. "time"
  25. )
  26. const (
  27. OCTOPUS = "octopus"
  28. MODELARTS = "modelarts"
  29. SHUGUANGAI = "shuguangAi"
  30. )
  31. func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
  32. limit := 10
  33. offset := 0
  34. var list []*types.TaskModel
  35. db := svc.DbEngin.Model(&types.TaskModel{}).Table("task")
  36. db = db.Where("deleted_at is null")
  37. //count total
  38. var total int64
  39. err := db.Count(&total).Error
  40. db.Limit(limit).Offset(offset)
  41. if err != nil {
  42. return nil, err
  43. }
  44. err = db.Order("created_time desc").Find(&list).Error
  45. if err != nil {
  46. return nil, err
  47. }
  48. return list, nil
  49. }
  50. func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  51. list := make([]*types.TaskModel, len(tasklist))
  52. copy(list, tasklist)
  53. for i := len(list) - 1; i >= 0; i-- {
  54. if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  55. list = append(list[:i], list[i+1:]...)
  56. }
  57. }
  58. if len(list) == 0 {
  59. return
  60. }
  61. task := list[0]
  62. for i := range list {
  63. earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
  64. latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
  65. if latest.Before(earliest) {
  66. task = list[i]
  67. }
  68. }
  69. var aiTaskList []*models.TaskAi
  70. tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
  71. if tx.Error != nil {
  72. logx.Errorf(tx.Error.Error())
  73. return
  74. }
  75. if len(aiTaskList) == 0 {
  76. return
  77. }
  78. var wg sync.WaitGroup
  79. for _, aitask := range aiTaskList {
  80. t := aitask
  81. if t.Status == constants.Completed || t.Status == constants.Failed {
  82. continue
  83. }
  84. wg.Add(1)
  85. go func() {
  86. h := http.Request{}
  87. trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
  88. if err != nil {
  89. if status.Code(err) == codes.DeadlineExceeded {
  90. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  91. logx.Errorf(errors.New(msg).Error())
  92. wg.Done()
  93. return
  94. }
  95. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  96. logx.Errorf(errors.New(msg).Error())
  97. wg.Done()
  98. return
  99. }
  100. if trainingTask == nil {
  101. wg.Done()
  102. return
  103. }
  104. t.Status = trainingTask.Status
  105. t.StartTime = trainingTask.Start
  106. t.EndTime = trainingTask.End
  107. err = svc.Scheduler.AiStorages.UpdateAiTask(t)
  108. if err != nil {
  109. msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
  110. logx.Errorf(errors.New(msg).Error())
  111. wg.Done()
  112. return
  113. }
  114. wg.Done()
  115. }()
  116. }
  117. wg.Wait()
  118. }
  119. func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
  120. list := make([]*types.TaskModel, len(tasklist))
  121. copy(list, tasklist)
  122. for i := len(list) - 1; i >= 0; i-- {
  123. if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
  124. list = append(list[:i], list[i+1:]...)
  125. }
  126. }
  127. if len(list) == 0 {
  128. return
  129. }
  130. task := list[0]
  131. for i := range list {
  132. earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
  133. latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
  134. if latest.Before(earliest) {
  135. task = list[i]
  136. }
  137. }
  138. var aiTask []*models.TaskAi
  139. tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
  140. if tx.Error != nil {
  141. logx.Errorf(tx.Error.Error())
  142. return
  143. }
  144. if len(aiTask) == 0 {
  145. tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
  146. if tx.Error != nil {
  147. logx.Errorf(tx.Error.Error())
  148. return
  149. }
  150. return
  151. }
  152. if len(aiTask) == 1 {
  153. if aiTask[0].Status == constants.Completed {
  154. task.Status = constants.Succeeded
  155. } else {
  156. task.Status = aiTask[0].Status
  157. }
  158. task.StartTime = aiTask[0].StartTime
  159. task.EndTime = aiTask[0].EndTime
  160. task.UpdatedTime = time.Now().Format(constants.Layout)
  161. tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
  162. if tx.Error != nil {
  163. logx.Errorf(tx.Error.Error())
  164. return
  165. }
  166. return
  167. }
  168. for i := len(aiTask) - 1; i >= 0; i-- {
  169. if aiTask[i].StartTime == "" {
  170. task.Status = aiTask[i].Status
  171. aiTask = append(aiTask[:i], aiTask[i+1:]...)
  172. }
  173. }
  174. if len(aiTask) == 0 {
  175. task.UpdatedTime = time.Now().Format(constants.Layout)
  176. tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  177. if tx.Error != nil {
  178. logx.Errorf(tx.Error.Error())
  179. return
  180. }
  181. return
  182. }
  183. start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
  184. end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
  185. var status string
  186. var count int
  187. for _, a := range aiTask {
  188. s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
  189. e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
  190. if s.Before(start) {
  191. start = s
  192. }
  193. if e.After(end) {
  194. end = e
  195. }
  196. if a.Status == constants.Failed {
  197. status = a.Status
  198. break
  199. }
  200. if a.Status == constants.Pending {
  201. status = a.Status
  202. continue
  203. }
  204. if a.Status == constants.Running {
  205. status = a.Status
  206. continue
  207. }
  208. if a.Status == constants.Completed {
  209. count++
  210. continue
  211. }
  212. }
  213. if count == len(aiTask) {
  214. status = constants.Succeeded
  215. }
  216. if status != "" {
  217. task.Status = status
  218. task.StartTime = start.Format(constants.Layout)
  219. task.EndTime = end.Format(constants.Layout)
  220. }
  221. task.UpdatedTime = time.Now().Format(constants.Layout)
  222. tx = svc.DbEngin.Table("task").Model(task).Updates(task)
  223. if tx.Error != nil {
  224. logx.Errorf(tx.Error.Error())
  225. return
  226. }
  227. }
  228. func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
  229. var aiType = "1"
  230. adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType)
  231. if err != nil {
  232. msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
  233. logx.Errorf(errors.New(msg).Error())
  234. return
  235. }
  236. if len(adapterIds) == 0 {
  237. return
  238. }
  239. for _, id := range adapterIds {
  240. if isAdapterExist(svc, id) {
  241. continue
  242. }
  243. clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
  244. if err != nil {
  245. msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
  246. logx.Errorf(errors.New(msg).Error())
  247. return
  248. }
  249. if len(clusters.List) == 0 {
  250. continue
  251. }
  252. exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
  253. svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
  254. svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
  255. }
  256. }
  257. func isAdapterExist(svc *svc.ServiceContext, id string) bool {
  258. _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
  259. _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
  260. if ok && ok2 {
  261. return true
  262. }
  263. return false
  264. }
  265. func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
  266. executorMap := make(map[string]executor.AiExecutor)
  267. collectorMap := make(map[string]collector.AiCollector)
  268. for _, c := range clusters {
  269. switch c.Name {
  270. case OCTOPUS:
  271. id, _ := strconv.ParseInt(c.Id, 10, 64)
  272. octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
  273. octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
  274. collectorMap[c.Id] = octopus
  275. executorMap[c.Id] = octopus
  276. case MODELARTS:
  277. id, _ := strconv.ParseInt(c.Id, 10, 64)
  278. modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
  279. modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
  280. modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
  281. collectorMap[c.Id] = modelarts
  282. executorMap[c.Id] = modelarts
  283. case SHUGUANGAI:
  284. id, _ := strconv.ParseInt(c.Id, 10, 64)
  285. aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
  286. sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
  287. collectorMap[c.Id] = sgai
  288. executorMap[c.Id] = sgai
  289. }
  290. }
  291. return executorMap, collectorMap
  292. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.