You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

getcenteroverviewlogic.go 4.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package ai
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  8. "strconv"
  9. "sync"
  10. "time"
  11. )
  12. type GetCenterOverviewLogic struct {
  13. logx.Logger
  14. ctx context.Context
  15. svcCtx *svc.ServiceContext
  16. }
  17. func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterOverviewLogic {
  18. return &GetCenterOverviewLogic{
  19. Logger: logx.WithContext(ctx),
  20. ctx: ctx,
  21. svcCtx: svcCtx,
  22. }
  23. }
  24. func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
  25. resp = &types.CenterOverviewResp{}
  26. var mu sync.RWMutex
  27. ch := make(chan struct{})
  28. var centerNum int32
  29. var taskNum int32
  30. var cardNum int32
  31. var totalTops float64
  32. adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
  33. if err != nil {
  34. return nil, err
  35. }
  36. centerNum = int32(len(adapterList))
  37. resp.CenterNum = centerNum
  38. go l.updateClusterResource(&mu, ch, adapterList)
  39. for _, adapter := range adapterList {
  40. taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  41. if err != nil {
  42. continue
  43. }
  44. taskNum += int32(len(taskList))
  45. }
  46. resp.TaskNum = taskNum
  47. for _, adapter := range adapterList {
  48. clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
  49. if err != nil {
  50. continue
  51. }
  52. for _, cluster := range clusters.List {
  53. mu.RLock()
  54. clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
  55. mu.RUnlock()
  56. if err != nil {
  57. continue
  58. }
  59. cardNum += int32(clusterResource.CardTotal)
  60. totalTops += clusterResource.CardTopsTotal
  61. }
  62. }
  63. resp.CardNum = cardNum
  64. resp.PowerInTops = totalTops
  65. select {
  66. case _ = <-ch:
  67. return resp, nil
  68. case <-time.After(1 * time.Second):
  69. return resp, nil
  70. }
  71. }
  72. func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
  73. var wg sync.WaitGroup
  74. for _, adapter := range list {
  75. clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
  76. if err != nil {
  77. continue
  78. }
  79. for _, cluster := range clusters.List {
  80. c := cluster
  81. mu.RLock()
  82. clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
  83. mu.RUnlock()
  84. if err != nil {
  85. continue
  86. }
  87. wg.Add(1)
  88. go func() {
  89. _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
  90. if !ok {
  91. wg.Done()
  92. return
  93. }
  94. stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
  95. if err != nil {
  96. wg.Done()
  97. return
  98. }
  99. if stat == nil {
  100. wg.Done()
  101. return
  102. }
  103. clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
  104. if err != nil {
  105. wg.Done()
  106. return
  107. }
  108. var cardTotal int64
  109. var topsTotal float64
  110. for _, card := range stat.CardsAvail {
  111. cardTotal += int64(card.CardNum)
  112. topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
  113. }
  114. mu.Lock()
  115. if (models.TClusterResource{} == *clusterResource) {
  116. err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
  117. stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
  118. if err != nil {
  119. mu.Unlock()
  120. wg.Done()
  121. return
  122. }
  123. } else {
  124. clusterResource.CardTotal = cardTotal
  125. clusterResource.CardTopsTotal = topsTotal
  126. clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
  127. clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
  128. clusterResource.MemAvail = stat.MemAvail
  129. clusterResource.MemTotal = stat.MemTotal
  130. clusterResource.DiskAvail = stat.DiskAvail
  131. clusterResource.DiskTotal = stat.DiskTotal
  132. err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
  133. if err != nil {
  134. mu.Unlock()
  135. wg.Done()
  136. return
  137. }
  138. }
  139. mu.Unlock()
  140. wg.Done()
  141. }()
  142. }
  143. }
  144. wg.Wait()
  145. ch <- struct{}{}
  146. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.