You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

getclusterresourcespeclogic.go 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package core
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/mitchellh/mapstructure"
  7. "github.com/rs/zerolog/log"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  13. "time"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. "gorm.io/gorm"
  16. )
  17. type GetClusterResourceSpecLogic struct {
  18. logx.Logger
  19. ctx context.Context
  20. svcCtx *svc.ServiceContext
  21. }
  22. const (
  23. ChangeTypeCreated = 0
  24. ChangeTypeModified = 1
  25. ChangeTypeDeleted = 2
  26. )
  27. type APIResponse struct {
  28. ClusterId string `json:"ClusterId"`
  29. ClusterType string `json:"clusterType"`
  30. Region string `json:"region"`
  31. Resources []Resource `json:"resources"`
  32. Msg string `json:"msg"`
  33. }
  34. type Resource struct {
  35. Resource ResourceDetail `json:"resource"`
  36. BaseResources []ResourceDetail `json:"baseResources"`
  37. }
  38. type ResourceDetail struct {
  39. Type string `json:"type"`
  40. Name string `json:"name"`
  41. Total Metric `json:"total"`
  42. Available Metric `json:"available"`
  43. }
  44. type Metric struct {
  45. Unit string `json:"unit"`
  46. Value float64 `json:"value"`
  47. }
  48. func NewGetClusterResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterResourceSpecLogic {
  49. return &GetClusterResourceSpecLogic{
  50. Logger: logx.WithContext(ctx),
  51. ctx: ctx,
  52. svcCtx: svcCtx,
  53. }
  54. }
  55. func (l *GetClusterResourceSpecLogic) GetClusterResourceSpec(req *types.ResourceSpecReq) (*types.PageResult, error) {
  56. if req.ClusterId == "" {
  57. return nil, errors.New("ClusterId is required")
  58. }
  59. // 获取集群资源数据
  60. startTime := time.Now()
  61. apiResources, err := l.fetchClusterResources(req.ClusterId)
  62. log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime))
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
  65. }
  66. // 同步资源到数据库
  67. if err := l.syncResourcesToDB(apiResources); err != nil {
  68. return nil, fmt.Errorf("failed to sync resources: %w", err)
  69. }
  70. // 查询数据库结果
  71. return l.queryDatabaseResults(req)
  72. }
  73. func (l *GetClusterResourceSpecLogic) fetchClusterResources(ClusterId string) ([]APIResponse, error) {
  74. queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
  75. resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
  76. ClusterIDs: []string{ClusterId},
  77. })
  78. if err != nil {
  79. return nil, fmt.Errorf("query resources failed: %w", err)
  80. }
  81. var apiResponses []APIResponse
  82. if err := l.decodeAPIResponse(resources.Data, &apiResponses); err != nil {
  83. return nil, fmt.Errorf("decode response failed: %w", err)
  84. }
  85. return apiResponses, nil
  86. }
  87. func (l *GetClusterResourceSpecLogic) decodeAPIResponse(input interface{}, output *[]APIResponse) error {
  88. config := &mapstructure.DecoderConfig{
  89. Result: output,
  90. TagName: "json",
  91. ErrorUnused: true,
  92. DecodeHook: mapstructure.ComposeDecodeHookFunc(
  93. mapstructure.StringToTimeHookFunc(time.RFC3339),
  94. mapstructure.StringToSliceHookFunc(","),
  95. ),
  96. }
  97. decoder, err := mapstructure.NewDecoder(config)
  98. if err != nil {
  99. return fmt.Errorf("failed to create decoder: %w", err)
  100. }
  101. if err := decoder.Decode(input); err != nil {
  102. return fmt.Errorf("decoding error: %w", err)
  103. }
  104. return nil
  105. }
  106. func (l *GetClusterResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
  107. for _, response := range apiResponses {
  108. // 转换API响应到数据库模型
  109. dbSpecs, apiSpecs, err := l.processAPIResponse(response)
  110. if err != nil {
  111. return err
  112. }
  113. // 处理资源变更
  114. if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil {
  115. return fmt.Errorf("failed to handle resource changes: %w", err)
  116. }
  117. }
  118. return nil
  119. }
  120. func (l *GetClusterResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
  121. ClusterId := utils.StringToInt64(response.ClusterId)
  122. var dbSpecs []models.TResourceSpec
  123. if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  124. Where("cluster_id = ?", ClusterId).
  125. Find(&dbSpecs).Error; err != nil {
  126. return nil, nil, fmt.Errorf("database query failed: %w", err)
  127. }
  128. var apiSpecs []models.TResourceSpec
  129. for _, res := range response.Resources {
  130. spec := l.convertToResourceSpec(ClusterId, res)
  131. apiSpecs = append(apiSpecs, spec)
  132. }
  133. return dbSpecs, apiSpecs, nil
  134. }
  135. func (l *GetClusterResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
  136. // 创建资源映射
  137. dbMap := make(map[string]models.TResourceSpec)
  138. for _, spec := range dbSpecs {
  139. key := resourceKey(spec.Type, spec.Name)
  140. dbMap[key] = spec
  141. }
  142. apiMap := make(map[string]models.TResourceSpec)
  143. for _, spec := range apiSpecs {
  144. key := resourceKey(spec.Type, spec.Name)
  145. apiMap[key] = spec
  146. }
  147. // 处理新增或更新的资源
  148. for key, apiSpec := range apiMap {
  149. dbSpec, exists := dbMap[key]
  150. if !exists {
  151. if err := l.createNewResource(&apiSpec); err != nil {
  152. return err
  153. }
  154. continue
  155. }
  156. if l.isSpecChanged(dbSpec, apiSpec) {
  157. if err := l.updateResource(&dbSpec, apiSpec); err != nil {
  158. return err
  159. }
  160. }
  161. }
  162. // 处理删除的资源
  163. for key, dbSpec := range dbMap {
  164. if _, exists := apiMap[key]; !exists {
  165. if err := l.markResourceDeleted(dbSpec.Id); err != nil {
  166. return err
  167. }
  168. }
  169. }
  170. return nil
  171. }
  172. func resourceKey(resType, name string) string {
  173. return fmt.Sprintf("%s::%s", resType, name)
  174. }
  175. func (l *GetClusterResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
  176. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  177. if err := tx.Create(spec).Error; err != nil {
  178. return fmt.Errorf("failed to create resource: %w", err)
  179. }
  180. return nil
  181. })
  182. }
  183. func (l *GetClusterResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error {
  184. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  185. updates := map[string]interface{}{
  186. "total_count": newSpec.TotalCount,
  187. "available_count": newSpec.AvailableCount,
  188. "change_type": ChangeTypeModified,
  189. "update_time": time.Now(),
  190. }
  191. if err := tx.Model(existing).Updates(updates).Error; err != nil {
  192. return fmt.Errorf("failed to update resource: %w", err)
  193. }
  194. return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs)
  195. })
  196. }
  197. func (l *GetClusterResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
  198. // 处理基础资源更新
  199. var existingResources []models.TBaseResourceSpec
  200. if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
  201. return fmt.Errorf("failed to query base resources: %w", err)
  202. }
  203. existingMap := make(map[string]models.TBaseResourceSpec)
  204. for _, r := range existingResources {
  205. key := resourceKey(r.Type, r.Name)
  206. existingMap[key] = r
  207. }
  208. // 处理更新和新增
  209. for i, newRes := range newResources {
  210. newRes.ResourceSpecId = specID
  211. key := resourceKey(newRes.Type, newRes.Name)
  212. if existing, exists := existingMap[key]; exists {
  213. newRes.Id = existing.Id
  214. newRes.CreateTime = existing.CreateTime
  215. if err := tx.Save(&newRes).Error; err != nil {
  216. return fmt.Errorf("failed to update base resource: %w", err)
  217. }
  218. } else {
  219. if err := tx.Create(&newRes).Error; err != nil {
  220. return fmt.Errorf("failed to create base resource: %w", err)
  221. }
  222. }
  223. newResources[i] = newRes
  224. }
  225. // 处理删除
  226. currentIDs := make(map[int64]struct{})
  227. for _, r := range newResources {
  228. currentIDs[r.Id] = struct{}{}
  229. }
  230. for _, existing := range existingResources {
  231. if _, exists := currentIDs[existing.Id]; !exists {
  232. if err := tx.Delete(&existing).Error; err != nil {
  233. return fmt.Errorf("failed to delete base resource: %w", err)
  234. }
  235. }
  236. }
  237. return nil
  238. }
  239. func (l *GetClusterResourceSpecLogic) markResourceDeleted(id int64) error {
  240. return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  241. Where("id = ?", id).
  242. Update("change_type", ChangeTypeDeleted).
  243. Error
  244. }
  245. func (l *GetClusterResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
  246. if old.TotalCount != new.TotalCount ||
  247. old.AvailableCount != new.AvailableCount ||
  248. old.Region != new.Region {
  249. return true
  250. }
  251. // 比较基础资源
  252. oldBaseMap := make(map[string]models.TBaseResourceSpec)
  253. for _, br := range old.BaseResourceSpecs {
  254. oldBaseMap[resourceKey(br.Type, br.Name)] = br
  255. }
  256. for _, newBr := range new.BaseResourceSpecs {
  257. key := resourceKey(newBr.Type, newBr.Name)
  258. oldBr, exists := oldBaseMap[key]
  259. if !exists ||
  260. oldBr.TotalValue != newBr.TotalValue ||
  261. oldBr.AvailableValue != newBr.AvailableValue {
  262. return true
  263. }
  264. delete(oldBaseMap, key)
  265. }
  266. return len(oldBaseMap) > 0
  267. }
  268. func (l *GetClusterResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
  269. spec := models.TResourceSpec{
  270. Type: res.Resource.Type,
  271. Name: res.Resource.Name,
  272. TotalCount: int64(res.Resource.Total.Value),
  273. AvailableCount: int64(res.Resource.Available.Value),
  274. ClusterId: ClusterId,
  275. CreateTime: time.Now(),
  276. UpdateTime: time.Now(),
  277. ChangeType: ChangeTypeCreated,
  278. }
  279. for _, br := range res.BaseResources {
  280. spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{
  281. Type: br.Type,
  282. Name: br.Name,
  283. TotalValue: br.Total.Value,
  284. TotalUnit: br.Total.Unit,
  285. AvailableValue: br.Available.Value,
  286. AvailableUnit: br.Available.Unit,
  287. CreateTime: time.Now(),
  288. UpdateTime: time.Now(),
  289. })
  290. }
  291. return spec
  292. }
  293. func (l *GetClusterResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) {
  294. result := &types.PageResult{
  295. PageNum: req.PageNum,
  296. PageSize: req.PageSize,
  297. }
  298. query := l.buildBaseQuery(req)
  299. if err := query.Count(&result.Total).Error; err != nil {
  300. return nil, fmt.Errorf("failed to count records: %w", err)
  301. }
  302. var specs []*models.TResourceSpec
  303. if err := query.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  304. Scopes(paginate(req.PageNum, req.PageSize)).
  305. Find(&specs).Error; err != nil {
  306. return nil, fmt.Errorf("failed to query resources: %w", err)
  307. }
  308. result.List = specs
  309. return result, nil
  310. }
  311. func (l *GetClusterResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
  312. query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  313. Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)).
  314. Where("deleted_at IS NULL")
  315. if req.Status != "" {
  316. query = query.Where("status = ?", req.Status)
  317. }
  318. if req.ChangeType != "" {
  319. query = query.Where("change_type = ?", req.ChangeType)
  320. }
  321. if req.Type != "" {
  322. query = query.Where("type = ?", req.Type)
  323. }
  324. if req.Name != "" {
  325. query = query.Where("name LIKE ?", "%"+req.Name+"%")
  326. }
  327. return query
  328. }
  329. func paginate(pageNum, pageSize int) func(db *gorm.DB) *gorm.DB {
  330. return func(db *gorm.DB) *gorm.DB {
  331. offset := (pageNum - 1) * pageSize
  332. return db.Offset(offset).Limit(pageSize).Order("create_time DESC")
  333. }
  334. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.