You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

infer.go 9.5 kB

5 months ago
4 months ago
5 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package tasksync
  2. import (
  3. "github.com/zeromicro/go-zero/core/logx"
  4. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "net/http"
  12. "strconv"
  13. "sync"
  14. "time"
  15. )
  16. type SyncInfer struct {
  17. mu sync.Mutex
  18. aiStorages *database.AiStorage
  19. inferenceAdapterMap map[string]map[string]inference.ICluster
  20. config *config.Config
  21. }
  22. func NewInferTask(storage *database.AiStorage, inferenceAdapterMap map[string]map[string]inference.ICluster, config *config.Config) *SyncInfer {
  23. return &SyncInfer{
  24. aiStorages: storage,
  25. inferenceAdapterMap: inferenceAdapterMap,
  26. config: config,
  27. }
  28. }
  29. func (s *SyncInfer) UpdateDeployInstanceStatusBatch(insList []*models.AiInferDeployInstance, needfilter bool) {
  30. s.mu.Lock()
  31. defer s.mu.Unlock()
  32. list := make([]*models.AiInferDeployInstance, len(insList))
  33. copy(list, insList)
  34. if needfilter {
  35. for i := len(list) - 1; i >= 0; i-- {
  36. if list[i].Status == constants.Running || list[i].Status == constants.Stopped || list[i].Status == constants.Failed {
  37. list = append(list[:i], list[i+1:]...)
  38. }
  39. }
  40. }
  41. if len(list) == 0 {
  42. return
  43. }
  44. buffer := make(chan bool, 3)
  45. for _, instance := range list {
  46. buffer <- true
  47. go s.UpdateDeployInstanceStatus(instance, false, buffer)
  48. }
  49. }
  50. func (s *SyncInfer) UpdateDeployTaskStatus() {
  51. list, err := s.aiStorages.GetAllDeployTasks()
  52. if err != nil {
  53. return
  54. }
  55. ins := list[0]
  56. for i := range list {
  57. uTime, _ := time.Parse(time.RFC3339, ins.UpdateTime)
  58. latest, _ := time.Parse(time.RFC3339, list[i].UpdateTime)
  59. if latest.After(uTime) {
  60. ins = list[i]
  61. }
  62. }
  63. inslist, err := s.aiStorages.GetInstanceListByDeployTaskId(ins.Id)
  64. if err != nil {
  65. return
  66. }
  67. buffer := make(chan bool, 2)
  68. for _, instance := range inslist {
  69. buffer <- true
  70. go s.UpdateDeployInstanceStatus(instance, false, buffer)
  71. }
  72. }
  73. func (s *SyncInfer) UpdateDeployInstanceStatus(instance *models.AiInferDeployInstance, updatetime bool, ch chan bool) {
  74. amap, found := s.inferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)]
  75. if !found {
  76. if ch != nil {
  77. <-ch
  78. return
  79. }
  80. return
  81. }
  82. cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)]
  83. if !found {
  84. if ch != nil {
  85. <-ch
  86. return
  87. }
  88. return
  89. }
  90. h := http.Request{}
  91. ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId)
  92. if err != nil {
  93. if ch != nil {
  94. <-ch
  95. return
  96. }
  97. return
  98. }
  99. switch instance.ClusterType {
  100. case storeLink.TYPE_OCTOPUS:
  101. switch ins.Status {
  102. case "running":
  103. if instance.Status == constants.Running {
  104. if ch != nil {
  105. <-ch
  106. return
  107. }
  108. return
  109. }
  110. url := ins.InferUrl
  111. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  112. if err != nil {
  113. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  114. }
  115. instance.Status = constants.Running
  116. case "stopped":
  117. if instance.Status == constants.Stopped {
  118. if ch != nil {
  119. <-ch
  120. return
  121. }
  122. return
  123. }
  124. instance.Status = constants.Stopped
  125. default:
  126. instance.Status = ins.Status
  127. }
  128. case storeLink.TYPE_MODELARTS:
  129. switch ins.Status {
  130. case "running":
  131. if instance.Status == constants.Running {
  132. if ch != nil {
  133. <-ch
  134. return
  135. }
  136. return
  137. }
  138. url := ins.InferUrl
  139. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  140. if err != nil {
  141. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  142. }
  143. instance.Status = constants.Running
  144. case "stopped":
  145. if instance.Status == constants.Stopped {
  146. if ch != nil {
  147. <-ch
  148. return
  149. }
  150. return
  151. }
  152. instance.Status = constants.Stopped
  153. case "failed":
  154. if instance.Status == constants.Failed {
  155. if ch != nil {
  156. <-ch
  157. return
  158. }
  159. return
  160. }
  161. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  162. if err != nil {
  163. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  164. }
  165. instance.Status = constants.Failed
  166. default:
  167. instance.Status = ins.Status
  168. }
  169. case storeLink.TYPE_SHUGUANGAI:
  170. switch ins.Status {
  171. case "Running":
  172. if instance.Status == constants.Running {
  173. if ch != nil {
  174. <-ch
  175. return
  176. }
  177. return
  178. }
  179. instance.Status = constants.Running
  180. case "Terminated":
  181. if instance.Status == constants.Stopped {
  182. if ch != nil {
  183. <-ch
  184. return
  185. }
  186. return
  187. }
  188. instance.Status = constants.Stopped
  189. default:
  190. instance.Status = ins.Status
  191. }
  192. case storeLink.TYPE_OPENI:
  193. switch ins.Status {
  194. case "RUNNING":
  195. if instance.Status == constants.Running {
  196. if ch != nil {
  197. <-ch
  198. return
  199. }
  200. return
  201. }
  202. url := ins.InferUrl
  203. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "")
  204. if err != nil {
  205. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  206. }
  207. instance.Status = constants.Running
  208. case "STOPPED":
  209. if instance.Status == constants.Stopped {
  210. if ch != nil {
  211. <-ch
  212. return
  213. }
  214. return
  215. }
  216. instance.Status = constants.Stopped
  217. case "CREATED_FAILED":
  218. if instance.Status == constants.Failed {
  219. if ch != nil {
  220. <-ch
  221. return
  222. }
  223. return
  224. }
  225. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  226. if err != nil {
  227. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  228. }
  229. instance.Status = constants.Failed
  230. case "FAILED":
  231. if instance.Status == constants.Failed {
  232. if ch != nil {
  233. <-ch
  234. return
  235. }
  236. return
  237. }
  238. err := s.ReportInferenceStatusMessages(instance, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status)
  239. if err != nil {
  240. logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error())
  241. }
  242. instance.Status = constants.Failed
  243. default:
  244. instance.Status = ins.Status
  245. }
  246. }
  247. err = s.aiStorages.UpdateInferDeployInstance(instance, updatetime)
  248. if err != nil {
  249. if ch != nil {
  250. <-ch
  251. return
  252. }
  253. return
  254. }
  255. if ch != nil {
  256. <-ch
  257. return
  258. }
  259. }
  260. func (s *SyncInfer) UpdateAutoStoppedInstance() {
  261. list, err := s.aiStorages.GetInferDeployInstanceListLastMonth()
  262. if err != nil {
  263. return
  264. }
  265. if len(list) == 0 {
  266. return
  267. }
  268. s.UpdateDeployInstanceStatusBatch(list, false)
  269. }
  270. func (s *SyncInfer) CheckStopStatus(in *inference.DeployInstance) bool {
  271. switch in.ClusterType {
  272. case storeLink.TYPE_OCTOPUS:
  273. switch in.Status {
  274. case "stopped":
  275. return true
  276. default:
  277. return false
  278. }
  279. case storeLink.TYPE_MODELARTS:
  280. switch in.Status {
  281. case "stopped":
  282. return true
  283. default:
  284. return false
  285. }
  286. case storeLink.TYPE_SHUGUANGAI:
  287. switch in.Status {
  288. case "Terminated":
  289. return true
  290. default:
  291. return false
  292. }
  293. case storeLink.TYPE_OPENI:
  294. switch in.Status {
  295. case "STOPPED":
  296. return true
  297. default:
  298. return false
  299. }
  300. default:
  301. return false
  302. }
  303. }
  304. func (s *SyncInfer) CheckRunningStatus(in *inference.DeployInstance) bool {
  305. switch in.ClusterType {
  306. case storeLink.TYPE_OCTOPUS:
  307. switch in.Status {
  308. case "running":
  309. return true
  310. default:
  311. return false
  312. }
  313. case storeLink.TYPE_MODELARTS:
  314. switch in.Status {
  315. case "running":
  316. return true
  317. default:
  318. return false
  319. }
  320. case storeLink.TYPE_SHUGUANGAI:
  321. switch in.Status {
  322. case "Running":
  323. return true
  324. default:
  325. return false
  326. }
  327. case storeLink.TYPE_OPENI:
  328. switch in.Status {
  329. case "RUNNING":
  330. return true
  331. case "WAITING":
  332. return true
  333. default:
  334. return false
  335. }
  336. default:
  337. return false
  338. }
  339. }
  340. func (s *SyncInfer) ReportInferenceStatusMessages(ins *models.AiInferDeployInstance, taskName string, taskId string, clusterId string, url string, status bool, msg string) error {
  341. var id string
  342. var adapterID string
  343. var clusterID string
  344. var instanceID string
  345. if ins != nil {
  346. id = strconv.FormatInt(ins.Id, 10)
  347. adapterID = strconv.FormatInt(ins.AdapterId, 10)
  348. clusterID = strconv.FormatInt(ins.ClusterId, 10)
  349. instanceID = ins.InstanceId
  350. }
  351. report := &jcs.JobStatusReportReq{}
  352. reportMsg := &jcs.InferReportMessage{
  353. Type: "Inference",
  354. TaskName: taskName,
  355. TaskID: taskId,
  356. Status: status,
  357. Message: msg,
  358. Url: url,
  359. ID: id,
  360. AdapterID: adapterID,
  361. ClusterID: clusterID,
  362. InstanceID: instanceID,
  363. }
  364. report.Report = reportMsg
  365. err := jcs.StatusReport(s.config.JcsMiddleware.JobStatusReportUrl, report)
  366. if err != nil {
  367. return err
  368. }
  369. return nil
  370. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.