You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

tracker.go 8.5 kB


  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package tracker
  13. import (
  14. "context"
  15. "github.com/prometheus/alertmanager/api/v2/client"
  16. "github.com/prometheus/alertmanager/cli"
  17. "github.com/prometheus/client_golang/api"
  18. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  19. "github.com/prometheus/client_golang/prometheus"
  20. "github.com/prometheus/common/model"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. )
  27. var (
  28. ClusterCpuUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  29. Name: "cluster_cpu_utilisation",
  30. Help: "Cluster CPU Utilisation Rate.",
  31. }, []string{"cluster_name", "adapter_id"})
  32. ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  33. Name: "cluster_cpu_avail",
  34. Help: "Cluster CPU Available.",
  35. }, []string{"cluster_name", "adapter_id"})
  36. ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  37. Name: "cluster_cpu_total",
  38. Help: "Cluster CPU Total.",
  39. }, []string{"cluster_name", "adapter_id"})
  40. ClusterMemoryUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  41. Name: "cluster_memory_utilisation",
  42. Help: "Cluster Memory Utilisation Rate.",
  43. }, []string{"cluster_name", "adapter_id"})
  44. ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  45. Name: "cluster_memory_avail",
  46. Help: "Cluster Memory Available.",
  47. }, []string{"cluster_name", "adapter_id"})
  48. ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  49. Name: "cluster_memory_total",
  50. Help: "Cluster Memory Total.",
  51. }, []string{"cluster_name", "adapter_id"})
  52. ClusterDiskUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  53. Name: "cluster_disk_utilisation",
  54. Help: "Cluster Disk Utilisation Rate.",
  55. }, []string{"cluster_name", "adapter_id"})
  56. ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  57. Name: "cluster_disk_avail",
  58. Help: "Cluster Disk Available.",
  59. }, []string{"cluster_name", "adapter_id"})
  60. ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  61. Name: "cluster_disk_total",
  62. Help: "Cluster Disk Total.",
  63. }, []string{"cluster_name", "adapter_id"})
  64. ClusterPodUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  65. Name: "cluster_pod_utilisation",
  66. Help: "Cluster Pod Utilisation.",
  67. }, []string{"cluster_name", "adapter_id"})
  68. ClusterPodCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  69. Name: "cluster_pod_count",
  70. Help: "Cluster Pod Count.",
  71. }, []string{"cluster_name", "adapter_id"})
  72. ClusterPodTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  73. Name: "cluster_pod_total",
  74. Help: "Cluster Pod total.",
  75. }, []string{"cluster_name", "adapter_id"})
  76. ClusterCpuCoreHoursGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  77. Name: "cluster_cpu_core_hours",
  78. Help: "Cluster Cpu Core Hours.",
  79. }, []string{"cluster_name", "adapter_id"})
  80. ClusterCardsAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  81. Name: "cluster_cards_avail",
  82. Help: "Cluster Cards Available.",
  83. }, []string{"cluster_name", "adapter_id"})
  84. ClusterGpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  85. Name: "cluster_gpu_avail",
  86. Help: "Cluster Gpu Available.",
  87. }, []string{"cluster_name", "adapter_id"})
  88. metrics = []prometheus.Collector{
  89. ClusterCpuUtilisationGauge,
  90. ClusterCpuAvailGauge,
  91. ClusterCpuTotalGauge,
  92. ClusterMemoryUtilisationGauge,
  93. ClusterMemoryAvailGauge,
  94. ClusterMemoryTotalGauge,
  95. ClusterDiskUtilisationGauge,
  96. ClusterDiskAvailGauge,
  97. ClusterDiskTotalGauge,
  98. ClusterPodUtilisationGauge,
  99. ClusterPodCountGauge,
  100. ClusterPodTotalGauge,
  101. }
  102. )
  103. func init() {
  104. prometheus.MustRegister(metrics...)
  105. }
  106. type Prometheus struct {
  107. prometheus Interface
  108. client v1.API
  109. }
  110. // NewPrometheus 初始化Prometheus客户端
  111. func NewPrometheus(address string) (Prometheus, error) {
  112. cfg := api.Config{
  113. Address: address,
  114. }
  115. promClient, err := api.NewClient(cfg)
  116. return Prometheus{client: v1.NewAPI(promClient)}, err
  117. }
  118. func NewAlertClient(address string) *client.AlertmanagerAPI {
  119. alertManagerClient := cli.NewAlertmanagerClient(&url.URL{Host: address})
  120. return alertManagerClient
  121. }
  122. func ParseTime(timestamp string) (time.Time, error) {
  123. // Parse time params
  124. startInt, err := strconv.ParseInt(timestamp, 10, 64)
  125. if err != nil {
  126. return time.Now(), err
  127. }
  128. return time.Unix(startInt, 0), nil
  129. }
  130. func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
  131. var res []Metric
  132. var mtx sync.Mutex
  133. var wg sync.WaitGroup
  134. opts := NewQueryOptions()
  135. o.Apply(opts)
  136. for _, metric := range metrics {
  137. wg.Add(1)
  138. go func(metric string) {
  139. parsedResp := Metric{MetricName: metric}
  140. startTimestamp, err := ParseTime(start)
  141. if err != nil {
  142. return
  143. }
  144. endTimestamp, err := ParseTime(end)
  145. if err != nil {
  146. return
  147. }
  148. timeRange := v1.Range{
  149. Start: startTimestamp,
  150. End: endTimestamp,
  151. Step: step,
  152. }
  153. p.client.Rules(context.Background())
  154. value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
  155. if err != nil {
  156. parsedResp.Error = err.Error()
  157. } else {
  158. parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
  159. }
  160. mtx.Lock()
  161. res = append(res, parsedResp)
  162. mtx.Unlock()
  163. wg.Done()
  164. }(metric)
  165. }
  166. wg.Wait()
  167. return res
  168. }
  169. func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
  170. res := MetricData{MetricType: MetricTypeMatrix}
  171. data, _ := value.(model.Matrix)
  172. for _, v := range data {
  173. if metricFilter != nil && !metricFilter(v.Metric) {
  174. continue
  175. }
  176. mv := MetricValue{
  177. Metadata: make(map[string]string),
  178. }
  179. for k, v := range v.Metric {
  180. mv.Metadata[string(k)] = string(v)
  181. }
  182. for _, k := range v.Values {
  183. mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
  184. }
  185. res.MetricValues = append(res.MetricValues, mv)
  186. }
  187. return res
  188. }
  189. func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
  190. var res []Metric
  191. var mtx sync.Mutex
  192. var wg sync.WaitGroup
  193. opts := NewQueryOptions()
  194. o.Apply(opts)
  195. for _, metric := range metrics {
  196. wg.Add(1)
  197. go func(metric string) {
  198. parsedResp := Metric{MetricName: metric}
  199. value, _, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
  200. if err != nil {
  201. parsedResp.Error = err.Error()
  202. } else {
  203. parsedResp.MetricData = parseQueryResp(value, genMetricFilter(o))
  204. }
  205. mtx.Lock()
  206. res = append(res, parsedResp)
  207. mtx.Unlock()
  208. wg.Done()
  209. }(metric)
  210. }
  211. wg.Wait()
  212. return res
  213. }
  214. func parseQueryResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
  215. res := MetricData{MetricType: MetricTypeVector}
  216. data, _ := value.(model.Vector)
  217. for _, v := range data {
  218. if metricFilter != nil && !metricFilter(v.Metric) {
  219. continue
  220. }
  221. mv := MetricValue{
  222. Metadata: make(map[string]string),
  223. }
  224. for k, v := range v.Metric {
  225. mv.Metadata[string(k)] = string(v)
  226. }
  227. mv.Sample = &Point{float64(v.Timestamp) / 1000, float64(v.Value)}
  228. res.MetricValues = append(res.MetricValues, mv)
  229. }
  230. return res
  231. }
  232. func genMetricFilter(o QueryOption) func(metric model.Metric) bool {
  233. if o != nil {
  234. if po, ok := o.(PodOption); ok {
  235. if po.NamespacedResourcesFilter != "" {
  236. namespacedPodsMap := make(map[string]struct{})
  237. for _, s := range strings.Split(po.NamespacedResourcesFilter, "|") {
  238. namespacedPodsMap[s] = struct{}{}
  239. }
  240. return func(metric model.Metric) bool {
  241. if len(metric) == 0 {
  242. return false
  243. }
  244. _, ok := namespacedPodsMap[string(metric["namespace"])+"/"+string(metric["pod"])]
  245. return ok
  246. }
  247. }
  248. }
  249. }
  250. return func(metric model.Metric) bool {
  251. return true
  252. }
  253. }
  254. func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) {
  255. opts := NewQueryOptions()
  256. o.Apply(opts)
  257. value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now())
  258. if err != nil {
  259. return nil, err
  260. }
  261. return value, nil
  262. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.