You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

deployInstance.go 7.9 kB

6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package status
  2. import (
  3. "github.com/zeromicro/go-zero/core/logx"
  4. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "net/http"
  10. "strconv"
  11. "time"
  12. )
  13. func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance, needfilter bool) {
  14. list := make([]*models.AiInferDeployInstance, len(insList))
  15. copy(list, insList)
  16. if needfilter {
  17. for i := len(list) - 1; i >= 0; i-- {
  18. if list[i].Status == constants.Running || list[i].Status == constants.Stopped || list[i].Status == constants.Failed {
  19. list = append(list[:i], list[i+1:]...)
  20. }
  21. }
  22. }
  23. if len(list) == 0 {
  24. return
  25. }
  26. buffer := make(chan bool, 3)
  27. for _, instance := range list {
  28. buffer <- true
  29. go UpdateDeployInstanceStatus(svc, instance, false, buffer)
  30. }
  31. }
  32. func UpdateDeployTaskStatus(svc *svc.ServiceContext) {
  33. list, err := svc.Scheduler.AiStorages.GetAllDeployTasks()
  34. if err != nil {
  35. return
  36. }
  37. ins := list[0]
  38. for i := range list {
  39. uTime, _ := time.Parse(time.RFC3339, ins.UpdateTime)
  40. latest, _ := time.Parse(time.RFC3339, list[i].UpdateTime)
  41. if latest.After(uTime) {
  42. ins = list[i]
  43. }
  44. }
  45. inslist, err := svc.Scheduler.AiStorages.GetInstanceListByDeployTaskId(ins.Id)
  46. if err != nil {
  47. return
  48. }
  49. buffer := make(chan bool, 2)
  50. for _, instance := range inslist {
  51. buffer <- true
  52. go UpdateDeployInstanceStatus(svc, instance, false, buffer)
  53. }
  54. }
  55. func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool, ch chan bool) {
  56. amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)]
  57. if !found {
  58. if ch != nil {
  59. <-ch
  60. return
  61. }
  62. return
  63. }
  64. cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)]
  65. if !found {
  66. if ch != nil {
  67. <-ch
  68. return
  69. }
  70. return
  71. }
  72. h := http.Request{}
  73. ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId)
  74. if err != nil {
  75. if ch != nil {
  76. <-ch
  77. return
  78. }
  79. return
  80. }
  81. switch instance.ClusterType {
  82. case storeLink.TYPE_OCTOPUS:
  83. switch ins.Status {
  84. case "running":
  85. if instance.Status == constants.Running {
  86. if ch != nil {
  87. <-ch
  88. return
  89. }
  90. return
  91. }
  92. url := ins.InferUrl
  93. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  94. if err != nil {
  95. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  96. }
  97. instance.Status = constants.Running
  98. case "stopped":
  99. if instance.Status == constants.Stopped {
  100. if ch != nil {
  101. <-ch
  102. return
  103. }
  104. return
  105. }
  106. instance.Status = constants.Stopped
  107. default:
  108. instance.Status = ins.Status
  109. }
  110. case storeLink.TYPE_MODELARTS:
  111. switch ins.Status {
  112. case "running":
  113. if instance.Status == constants.Running {
  114. if ch != nil {
  115. <-ch
  116. return
  117. }
  118. return
  119. }
  120. url := ins.InferUrl
  121. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  122. if err != nil {
  123. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  124. }
  125. instance.Status = constants.Running
  126. case "stopped":
  127. if instance.Status == constants.Stopped {
  128. if ch != nil {
  129. <-ch
  130. return
  131. }
  132. return
  133. }
  134. instance.Status = constants.Stopped
  135. case "failed":
  136. if instance.Status == constants.Failed {
  137. if ch != nil {
  138. <-ch
  139. return
  140. }
  141. return
  142. }
  143. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  144. if err != nil {
  145. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  146. }
  147. instance.Status = constants.Failed
  148. default:
  149. instance.Status = ins.Status
  150. }
  151. case storeLink.TYPE_SHUGUANGAI:
  152. switch ins.Status {
  153. case "Running":
  154. if instance.Status == constants.Running {
  155. if ch != nil {
  156. <-ch
  157. return
  158. }
  159. return
  160. }
  161. instance.Status = constants.Running
  162. case "Terminated":
  163. if instance.Status == constants.Stopped {
  164. if ch != nil {
  165. <-ch
  166. return
  167. }
  168. return
  169. }
  170. instance.Status = constants.Stopped
  171. default:
  172. instance.Status = ins.Status
  173. }
  174. case storeLink.TYPE_OPENI:
  175. switch ins.Status {
  176. case "RUNNING":
  177. if instance.Status == constants.Running {
  178. if ch != nil {
  179. <-ch
  180. return
  181. }
  182. return
  183. }
  184. url := ins.InferUrl
  185. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  186. if err != nil {
  187. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  188. }
  189. instance.Status = constants.Running
  190. case "STOPPED":
  191. if instance.Status == constants.Stopped {
  192. if ch != nil {
  193. <-ch
  194. return
  195. }
  196. return
  197. }
  198. instance.Status = constants.Stopped
  199. case "CREATED_FAILED":
  200. if instance.Status == constants.Failed {
  201. if ch != nil {
  202. <-ch
  203. return
  204. }
  205. return
  206. }
  207. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  208. if err != nil {
  209. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  210. }
  211. instance.Status = constants.Failed
  212. case "FAILED":
  213. if instance.Status == constants.Failed {
  214. if ch != nil {
  215. <-ch
  216. return
  217. }
  218. return
  219. }
  220. err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  221. if err != nil {
  222. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  223. }
  224. instance.Status = constants.Failed
  225. default:
  226. instance.Status = ins.Status
  227. }
  228. }
  229. err = svc.Scheduler.AiStorages.UpdateInferDeployInstance(instance, updatetime)
  230. if err != nil {
  231. if ch != nil {
  232. <-ch
  233. return
  234. }
  235. return
  236. }
  237. if ch != nil {
  238. <-ch
  239. return
  240. }
  241. }
  242. func UpdateAutoStoppedInstance(svc *svc.ServiceContext) {
  243. list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList()
  244. if err != nil {
  245. return
  246. }
  247. if len(list) == 0 {
  248. return
  249. }
  250. UpdateDeployInstanceStatusBatch(svc, list, false)
  251. }
  252. func CheckStopStatus(in *inference.DeployInstance) bool {
  253. switch in.ClusterType {
  254. case storeLink.TYPE_OCTOPUS:
  255. switch in.Status {
  256. case "stopped":
  257. return true
  258. default:
  259. return false
  260. }
  261. case storeLink.TYPE_MODELARTS:
  262. switch in.Status {
  263. case "stopped":
  264. return true
  265. default:
  266. return false
  267. }
  268. case storeLink.TYPE_SHUGUANGAI:
  269. switch in.Status {
  270. case "Terminated":
  271. return true
  272. default:
  273. return false
  274. }
  275. case storeLink.TYPE_OPENI:
  276. switch in.Status {
  277. case "STOPPED":
  278. return true
  279. default:
  280. return false
  281. }
  282. default:
  283. return false
  284. }
  285. }
  286. func CheckRunningStatus(in *inference.DeployInstance) bool {
  287. switch in.ClusterType {
  288. case storeLink.TYPE_OCTOPUS:
  289. switch in.Status {
  290. case "running":
  291. return true
  292. default:
  293. return false
  294. }
  295. case storeLink.TYPE_MODELARTS:
  296. switch in.Status {
  297. case "running":
  298. return true
  299. default:
  300. return false
  301. }
  302. case storeLink.TYPE_SHUGUANGAI:
  303. switch in.Status {
  304. case "Running":
  305. return true
  306. default:
  307. return false
  308. }
  309. case storeLink.TYPE_OPENI:
  310. switch in.Status {
  311. case "RUNNING":
  312. return true
  313. case "WAITING":
  314. return true
  315. default:
  316. return false
  317. }
  318. default:
  319. return false
  320. }
  321. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.