You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

getcenteroverviewlogic.go 3.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package ai
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  8. "strconv"
  9. "sync"
  10. "time"
  11. )
  12. type GetCenterOverviewLogic struct {
  13. logx.Logger
  14. ctx context.Context
  15. svcCtx *svc.ServiceContext
  16. }
  17. func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterOverviewLogic {
  18. return &GetCenterOverviewLogic{
  19. Logger: logx.WithContext(ctx),
  20. ctx: ctx,
  21. svcCtx: svcCtx,
  22. }
  23. }
  24. func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
  25. resp = &types.CenterOverviewResp{}
  26. var mu sync.RWMutex
  27. ch := make(chan struct{})
  28. var centerNum int32
  29. var taskNum int32
  30. var cardNum int32
  31. var totalTops float64
  32. adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
  33. if err != nil {
  34. return nil, err
  35. }
  36. centerNum = int32(len(adapterList))
  37. resp.CenterNum = centerNum
  38. go l.updateClusterResource(&mu, ch, adapterList)
  39. for _, adapter := range adapterList {
  40. taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
  41. if err != nil {
  42. continue
  43. }
  44. taskNum += int32(len(taskList))
  45. }
  46. resp.TaskNum = taskNum
  47. for _, adapter := range adapterList {
  48. clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
  49. if err != nil {
  50. continue
  51. }
  52. for _, cluster := range clusters.List {
  53. mu.RLock()
  54. clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
  55. mu.RUnlock()
  56. if err != nil {
  57. continue
  58. }
  59. cardNum += int32(clusterResource.CardTotal)
  60. totalTops += clusterResource.CardTopsTotal
  61. }
  62. }
  63. resp.CardNum = cardNum
  64. resp.PowerInTops = totalTops
  65. select {
  66. case _ = <-ch:
  67. return resp, nil
  68. case <-time.After(2 * time.Second):
  69. return resp, nil
  70. }
  71. }
  72. func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
  73. var wg sync.WaitGroup
  74. for _, adapter := range list {
  75. clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
  76. if err != nil {
  77. continue
  78. }
  79. for _, cluster := range clusters.List {
  80. c := cluster
  81. mu.RLock()
  82. clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
  83. mu.RUnlock()
  84. if err != nil {
  85. continue
  86. }
  87. wg.Add(1)
  88. go func() {
  89. stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
  90. if err != nil {
  91. wg.Done()
  92. return
  93. }
  94. clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
  95. if err != nil {
  96. wg.Done()
  97. return
  98. }
  99. var cardTotal int64
  100. var topsTotal float64
  101. for _, card := range stat.CardsAvail {
  102. cardTotal += int64(card.CardNum)
  103. topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
  104. }
  105. mu.Lock()
  106. if (models.TClusterResource{} == *clusterResource) {
  107. err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
  108. stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
  109. if err != nil {
  110. mu.Unlock()
  111. wg.Done()
  112. return
  113. }
  114. } else {
  115. clusterResource.CardTotal = cardTotal
  116. clusterResource.CardTopsTotal = topsTotal
  117. err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
  118. if err != nil {
  119. mu.Unlock()
  120. wg.Done()
  121. return
  122. }
  123. }
  124. mu.Unlock()
  125. wg.Done()
  126. }()
  127. }
  128. }
  129. wg.Wait()
  130. ch <- struct{}{}
  131. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.