You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

tracker.go 7.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package tracker
  13. import (
  14. "context"
  15. "github.com/prometheus/alertmanager/api/v2/client"
  16. "github.com/prometheus/alertmanager/cli"
  17. "github.com/prometheus/client_golang/api"
  18. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  19. "github.com/prometheus/client_golang/prometheus"
  20. "github.com/prometheus/common/model"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. )
  27. var (
  28. ClusterCpuUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  29. Name: "cluster_cpu_utilisation",
  30. Help: "Cluster CPU Utilisation Rate.",
  31. }, []string{"cluster_name", "adapter_id"})
  32. ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  33. Name: "cluster_cpu_avail",
  34. Help: "Cluster CPU Available.",
  35. }, []string{"cluster_name", "adapter_id"})
  36. ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  37. Name: "cluster_cpu_total",
  38. Help: "Cluster CPU Total.",
  39. }, []string{"cluster_name", "adapter_id"})
  40. ClusterMemoryUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  41. Name: "cluster_memory_utilisation",
  42. Help: "Cluster Memory Utilisation Rate.",
  43. }, []string{"cluster_name", "adapter_id"})
  44. ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  45. Name: "cluster_memory_avail",
  46. Help: "Cluster Memory Available.",
  47. }, []string{"cluster_name", "adapter_id"})
  48. ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  49. Name: "cluster_memory_total",
  50. Help: "Cluster Memory Total.",
  51. }, []string{"cluster_name", "adapter_id"})
  52. ClusterDiskUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  53. Name: "cluster_disk_utilisation",
  54. Help: "Cluster Disk Utilisation Rate.",
  55. }, []string{"cluster_name", "adapter_id"})
  56. ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  57. Name: "cluster_disk_avail",
  58. Help: "Cluster Disk Available.",
  59. }, []string{"cluster_name", "adapter_id"})
  60. ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
  61. Name: "cluster_disk_total",
  62. Help: "Cluster Disk Total.",
  63. }, []string{"cluster_name", "adapter_id"})
  64. metrics = []prometheus.Collector{
  65. ClusterCpuUtilisationGauge,
  66. ClusterCpuAvailGauge,
  67. ClusterCpuTotalGauge,
  68. ClusterMemoryUtilisationGauge,
  69. ClusterMemoryAvailGauge,
  70. ClusterMemoryTotalGauge,
  71. ClusterDiskUtilisationGauge,
  72. ClusterDiskAvailGauge,
  73. ClusterDiskTotalGauge,
  74. }
  75. )
  76. func init() {
  77. prometheus.MustRegister(metrics...)
  78. }
  79. type Prometheus struct {
  80. prometheus Interface
  81. client v1.API
  82. }
  83. // NewPrometheus 初始化Prometheus客户端
  84. func NewPrometheus(address string) (Prometheus, error) {
  85. cfg := api.Config{
  86. Address: address,
  87. }
  88. promClient, err := api.NewClient(cfg)
  89. return Prometheus{client: v1.NewAPI(promClient)}, err
  90. }
  91. func NewAlertClient(address string) *client.AlertmanagerAPI {
  92. alertManagerClient := cli.NewAlertmanagerClient(&url.URL{Host: address})
  93. return alertManagerClient
  94. }
  95. func ParseTime(timestamp string) (time.Time, error) {
  96. // Parse time params
  97. startInt, err := strconv.ParseInt(timestamp, 10, 64)
  98. if err != nil {
  99. return time.Now(), err
  100. }
  101. return time.Unix(startInt, 0), nil
  102. }
  103. func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
  104. var res []Metric
  105. var mtx sync.Mutex
  106. var wg sync.WaitGroup
  107. opts := NewQueryOptions()
  108. o.Apply(opts)
  109. for _, metric := range metrics {
  110. wg.Add(1)
  111. go func(metric string) {
  112. parsedResp := Metric{MetricName: metric}
  113. startTimestamp, err := ParseTime(start)
  114. if err != nil {
  115. return
  116. }
  117. endTimestamp, err := ParseTime(end)
  118. if err != nil {
  119. return
  120. }
  121. timeRange := v1.Range{
  122. Start: startTimestamp,
  123. End: endTimestamp,
  124. Step: step,
  125. }
  126. value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
  127. if err != nil {
  128. parsedResp.Error = err.Error()
  129. } else {
  130. parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
  131. }
  132. mtx.Lock()
  133. res = append(res, parsedResp)
  134. mtx.Unlock()
  135. wg.Done()
  136. }(metric)
  137. }
  138. wg.Wait()
  139. return res
  140. }
  141. func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
  142. res := MetricData{MetricType: MetricTypeMatrix}
  143. data, _ := value.(model.Matrix)
  144. for _, v := range data {
  145. if metricFilter != nil && !metricFilter(v.Metric) {
  146. continue
  147. }
  148. mv := MetricValue{
  149. Metadata: make(map[string]string),
  150. }
  151. for k, v := range v.Metric {
  152. mv.Metadata[string(k)] = string(v)
  153. }
  154. for _, k := range v.Values {
  155. mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
  156. }
  157. res.MetricValues = append(res.MetricValues, mv)
  158. }
  159. return res
  160. }
  161. func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
  162. var res []Metric
  163. var mtx sync.Mutex
  164. var wg sync.WaitGroup
  165. opts := NewQueryOptions()
  166. o.Apply(opts)
  167. for _, metric := range metrics {
  168. wg.Add(1)
  169. go func(metric string) {
  170. parsedResp := Metric{MetricName: metric}
  171. value, _, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
  172. if err != nil {
  173. parsedResp.Error = err.Error()
  174. } else {
  175. parsedResp.MetricData = parseQueryResp(value, genMetricFilter(o))
  176. }
  177. mtx.Lock()
  178. res = append(res, parsedResp)
  179. mtx.Unlock()
  180. wg.Done()
  181. }(metric)
  182. }
  183. wg.Wait()
  184. return res
  185. }
  186. func parseQueryResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
  187. res := MetricData{MetricType: MetricTypeVector}
  188. data, _ := value.(model.Vector)
  189. for _, v := range data {
  190. if metricFilter != nil && !metricFilter(v.Metric) {
  191. continue
  192. }
  193. mv := MetricValue{
  194. Metadata: make(map[string]string),
  195. }
  196. for k, v := range v.Metric {
  197. mv.Metadata[string(k)] = string(v)
  198. }
  199. mv.Sample = &Point{float64(v.Timestamp) / 1000, float64(v.Value)}
  200. res.MetricValues = append(res.MetricValues, mv)
  201. }
  202. return res
  203. }
  204. func genMetricFilter(o QueryOption) func(metric model.Metric) bool {
  205. if o != nil {
  206. if po, ok := o.(PodOption); ok {
  207. if po.NamespacedResourcesFilter != "" {
  208. namespacedPodsMap := make(map[string]struct{})
  209. for _, s := range strings.Split(po.NamespacedResourcesFilter, "|") {
  210. namespacedPodsMap[s] = struct{}{}
  211. }
  212. return func(metric model.Metric) bool {
  213. if len(metric) == 0 {
  214. return false
  215. }
  216. _, ok := namespacedPodsMap[string(metric["namespace"])+"/"+string(metric["pod"])]
  217. return ok
  218. }
  219. }
  220. }
  221. }
  222. return func(metric model.Metric) bool {
  223. return true
  224. }
  225. }
  226. func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) {
  227. opts := NewQueryOptions()
  228. o.Apply(opts)
  229. value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now())
  230. if err != nil {
  231. return nil, err
  232. }
  233. return value, nil
  234. }
  235. func AddAlertRule() {
  236. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.