You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiStorage.go 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. package database
  2. import (
  3. "github.com/zeromicro/go-zero/core/logx"
  4. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
  10. "gorm.io/gorm"
  11. "strconv"
  12. "time"
  13. )
  14. type AiStorage struct {
  15. DbEngin *gorm.DB
  16. }
  17. func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
  18. var resp types.ClusterListResp
  19. tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List)
  20. if tx.Error != nil {
  21. logx.Errorf(tx.Error.Error())
  22. return nil, tx.Error
  23. }
  24. return &resp, nil
  25. }
  26. func (s *AiStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, error) {
  27. var resp types.ClusterListResp
  28. tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL and `adapter_id` = ? ORDER BY create_time Desc", id).Scan(&resp.List)
  29. if tx.Error != nil {
  30. logx.Errorf(tx.Error.Error())
  31. return nil, tx.Error
  32. }
  33. return &resp, nil
  34. }
  35. func (s *AiStorage) GetClusterNameById(id string) (string, error) {
  36. var name string
  37. tx := s.DbEngin.Raw("select `description` from t_cluster where `id` = ?", id).Scan(&name)
  38. if tx.Error != nil {
  39. logx.Errorf(tx.Error.Error())
  40. return "", tx.Error
  41. }
  42. return name, nil
  43. }
  44. func (s *AiStorage) GetAdapterNameById(id string) (string, error) {
  45. var name string
  46. tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name)
  47. if tx.Error != nil {
  48. logx.Errorf(tx.Error.Error())
  49. return "", tx.Error
  50. }
  51. return name, nil
  52. }
  53. func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
  54. var list []types.AdapterInfo
  55. var ids []string
  56. db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
  57. db = db.Where("type = ?", adapterType)
  58. err := db.Order("create_time desc").Find(&list).Error
  59. if err != nil {
  60. return nil, err
  61. }
  62. for _, info := range list {
  63. ids = append(ids, info.Id)
  64. }
  65. return ids, nil
  66. }
  67. func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) {
  68. var list []*types.AdapterInfo
  69. db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
  70. db = db.Where("type = ?", adapterType)
  71. err := db.Order("create_time desc").Find(&list).Error
  72. if err != nil {
  73. return nil, err
  74. }
  75. return list, nil
  76. }
  77. func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
  78. var resp []*models.TaskAi
  79. db := s.DbEngin.Model(&models.TaskAi{}).Table("task_ai")
  80. db = db.Where("adapter_id = ?", adapterId)
  81. err := db.Order("commit_time desc").Find(&resp).Error
  82. if err != nil {
  83. return nil, err
  84. }
  85. return resp, nil
  86. }
  87. func (s *AiStorage) GetAiTaskListById(id int64) ([]*models.TaskAi, error) {
  88. var aiTaskList []*models.TaskAi
  89. tx := s.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList)
  90. if tx.Error != nil {
  91. return nil, tx.Error
  92. }
  93. return aiTaskList, nil
  94. }
  95. func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64, aiType string) (int64, error) {
  96. startTime := time.Now()
  97. // 构建主任务结构体
  98. taskModel := models.Task{
  99. Status: constants.Saved,
  100. Description: "ai task",
  101. Name: name,
  102. SynergyStatus: synergyStatus,
  103. Strategy: strategyCode,
  104. AdapterTypeDict: "1",
  105. TaskTypeDict: aiType,
  106. StartTime: &startTime,
  107. CommitTime: time.Now(),
  108. }
  109. // 保存任务数据到数据库
  110. tx := s.DbEngin.Create(&taskModel)
  111. if tx.Error != nil {
  112. return 0, tx.Error
  113. }
  114. return taskModel.Id, nil
  115. }
  116. func (s *AiStorage) UpdateTask(task *types.TaskModel) error {
  117. task.UpdatedTime = time.Now().Format(constants.Layout)
  118. tx := s.DbEngin.Table("task").Model(task).Updates(task)
  119. if tx.Error != nil {
  120. logx.Errorf(tx.Error.Error())
  121. return tx.Error
  122. }
  123. return nil
  124. }
  125. func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error {
  126. var aiOpt *option.AiOption
  127. switch (opt).(type) {
  128. case *option.AiOption:
  129. aiOpt = (opt).(*option.AiOption)
  130. case *option.InferOption:
  131. inferOpt := (opt).(*option.InferOption)
  132. aiOpt = &option.AiOption{}
  133. aiOpt.TaskName = inferOpt.TaskName
  134. aiOpt.Replica = inferOpt.Replica
  135. aiOpt.AdapterId = inferOpt.AdapterId
  136. aiOpt.TaskType = inferOpt.ModelType
  137. aiOpt.ModelName = inferOpt.ModelName
  138. aiOpt.StrategyName = inferOpt.Strategy
  139. }
  140. // 构建主任务结构体
  141. aId, err := strconv.ParseInt(aiOpt.AdapterId, 10, 64)
  142. if err != nil {
  143. return err
  144. }
  145. cId, err := strconv.ParseInt(clusterId, 10, 64)
  146. if err != nil {
  147. return err
  148. }
  149. aiTaskModel := models.TaskAi{
  150. TaskId: taskId,
  151. AdapterId: aId,
  152. AdapterName: adapterName,
  153. ClusterId: cId,
  154. ClusterName: clusterName,
  155. Name: aiOpt.TaskName,
  156. Replica: int64(aiOpt.Replica),
  157. JobId: jobId,
  158. TaskType: aiOpt.TaskType,
  159. ModelName: aiOpt.ModelName,
  160. Strategy: aiOpt.StrategyName,
  161. Status: status,
  162. Msg: msg,
  163. Card: aiOpt.ComputeCard,
  164. StartTime: time.Now().Format(time.RFC3339),
  165. CommitTime: time.Now(),
  166. }
  167. // 保存任务数据到数据库
  168. tx := s.DbEngin.Create(&aiTaskModel)
  169. if tx.Error != nil {
  170. return tx.Error
  171. }
  172. return nil
  173. }
  174. func (s *AiStorage) SaveAiTaskImageSubTask(ta *models.TaskAiSub) error {
  175. tx := s.DbEngin.Table("task_ai_sub").Create(ta)
  176. if tx.Error != nil {
  177. return tx.Error
  178. }
  179. return nil
  180. }
  181. func (s *AiStorage) SaveClusterTaskQueue(adapterId string, clusterId string, queueNum int64) error {
  182. aId, err := strconv.ParseInt(adapterId, 10, 64)
  183. if err != nil {
  184. return err
  185. }
  186. cId, err := strconv.ParseInt(clusterId, 10, 64)
  187. if err != nil {
  188. return err
  189. }
  190. taskQueue := models.TClusterTaskQueue{
  191. AdapterId: aId,
  192. ClusterId: cId,
  193. QueueNum: queueNum,
  194. }
  195. tx := s.DbEngin.Create(&taskQueue)
  196. if tx.Error != nil {
  197. return tx.Error
  198. }
  199. return nil
  200. }
  201. func (s *AiStorage) GetClusterTaskQueues(adapterId string, clusterId string) ([]*models.TClusterTaskQueue, error) {
  202. var taskQueues []*models.TClusterTaskQueue
  203. tx := s.DbEngin.Raw("select * from t_cluster_task_queue where `adapter_id` = ? and `cluster_id` = ?", adapterId, clusterId).Scan(&taskQueues)
  204. if tx.Error != nil {
  205. logx.Errorf(tx.Error.Error())
  206. return nil, tx.Error
  207. }
  208. return taskQueues, nil
  209. }
  210. func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) {
  211. var aiTask models.TaskAi
  212. tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask)
  213. if tx.Error != nil {
  214. logx.Errorf(tx.Error.Error())
  215. return "", tx.Error
  216. }
  217. return aiTask.JobId, nil
  218. }
  219. func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterResource, error) {
  220. var clusterResource models.TClusterResource
  221. tx := s.DbEngin.Raw("select * from t_cluster_resource where `cluster_id` = ?", clusterId).Scan(&clusterResource)
  222. if tx.Error != nil {
  223. logx.Errorf(tx.Error.Error())
  224. return nil, tx.Error
  225. }
  226. return &clusterResource, nil
  227. }
  228. func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
  229. memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64, cardHours float64,
  230. balance float64, taskCompleted int64) error {
  231. cId, err := strconv.ParseInt(clusterId, 10, 64)
  232. if err != nil {
  233. return err
  234. }
  235. aId, err := strconv.ParseInt(adapterId, 10, 64)
  236. if err != nil {
  237. return err
  238. }
  239. clusterResource := models.TClusterResource{
  240. AdapterId: aId,
  241. ClusterId: cId,
  242. ClusterName: clusterName,
  243. ClusterType: clusterType,
  244. CpuAvail: cpuAvail,
  245. CpuTotal: cpuTotal,
  246. MemAvail: memAvail,
  247. MemTotal: memTotal,
  248. DiskAvail: diskAvail,
  249. DiskTotal: diskTotal,
  250. GpuAvail: gpuAvail,
  251. GpuTotal: gpuTotal,
  252. CardTotal: cardTotal,
  253. CardTopsTotal: topsTotal,
  254. CardHours: cardHours,
  255. Balance: balance,
  256. TaskCompleted: taskCompleted,
  257. }
  258. tx := s.DbEngin.Create(&clusterResource)
  259. if tx.Error != nil {
  260. return tx.Error
  261. }
  262. // prometheus
  263. param := tracker.ClusterLoadRecord{
  264. AdapterId: aId,
  265. ClusterName: clusterName,
  266. CpuAvail: cpuAvail,
  267. CpuTotal: cpuTotal,
  268. CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal,
  269. MemoryAvail: memAvail,
  270. MemoryTotal: memTotal,
  271. MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal,
  272. DiskAvail: diskAvail,
  273. DiskTotal: diskTotal,
  274. DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal,
  275. }
  276. tracker.SyncClusterLoad(param)
  277. return nil
  278. }
  279. func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
  280. tx := s.DbEngin.Where("cluster_id = ?", clusterResource.ClusterId).Updates(clusterResource)
  281. if tx.Error != nil {
  282. return tx.Error
  283. }
  284. // prometheus
  285. param := tracker.ClusterLoadRecord{
  286. AdapterId: clusterResource.AdapterId,
  287. ClusterName: clusterResource.ClusterName,
  288. CpuAvail: clusterResource.CpuAvail,
  289. CpuTotal: clusterResource.CpuTotal,
  290. CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal,
  291. MemoryAvail: clusterResource.MemAvail,
  292. MemoryTotal: clusterResource.MemTotal,
  293. MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal,
  294. DiskAvail: clusterResource.DiskAvail,
  295. DiskTotal: clusterResource.DiskTotal,
  296. DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal,
  297. }
  298. tracker.SyncClusterLoad(param)
  299. return nil
  300. }
  301. func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
  302. tx := s.DbEngin.Updates(task)
  303. if tx.Error != nil {
  304. return tx.Error
  305. }
  306. return nil
  307. }
  308. func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
  309. var strategy int64
  310. sqlStr := `select t_dict_item.item_value
  311. from t_dict
  312. left join t_dict_item on t_dict.id = t_dict_item.dict_id
  313. where item_text = ?
  314. and t_dict.dict_code = 'schedule_Strategy'`
  315. //查询调度策略
  316. err := s.DbEngin.Raw(sqlStr, name).Scan(&strategy).Error
  317. if err != nil {
  318. return strategy, nil
  319. }
  320. return strategy, nil
  321. }
  322. func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) {
  323. aId, err := strconv.ParseInt(adapterId, 10, 64)
  324. if err != nil {
  325. logx.Errorf("adapterId convert failure, err: %v", err)
  326. }
  327. var cId int64
  328. if clusterId != "" {
  329. cId, err = strconv.ParseInt(clusterId, 10, 64)
  330. if err != nil {
  331. logx.Errorf("clusterId convert failure, err: %v", err)
  332. }
  333. }
  334. noticeInfo := clientCore.NoticeInfo{
  335. AdapterId: aId,
  336. AdapterName: adapterName,
  337. ClusterId: cId,
  338. ClusterName: clusterName,
  339. NoticeType: noticeType,
  340. TaskName: taskName,
  341. Incident: incident,
  342. CreatedTime: time.Now(),
  343. }
  344. result := s.DbEngin.Table("t_notice").Create(&noticeInfo)
  345. if result.Error != nil {
  346. logx.Errorf("Task creation failure, err: %v", result.Error)
  347. }
  348. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.