Skip to content

Commit 3b7ece5

Browse files
author
Cairry
committed
🚀 feat(prometheus): Optimized prometheus client and query logic
1 parent 05fa4af commit 3b7ece5

6 files changed

Lines changed: 156 additions & 113 deletions

File tree

alert/eval/eval.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (t *AlertRule) Restart(rule models.AlertRule) {
9999
func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
100100
err := rule.Validate()
101101
if err != nil {
102-
logc.Errorf(t.ctx.Ctx, fmt.Sprintf("Rule validation failed, RuleName: %s, RuleId: %s, Error: %v", rule.RuleName, rule.RuleId, err))
102+
logc.Errorf(t.ctx.Ctx, "Rule validation failed, RuleName: %s, RuleId: %s, Error: %v", rule.RuleName, rule.RuleId, err)
103103
return
104104
}
105105

@@ -110,7 +110,7 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
110110
if r := recover(); r != nil {
111111
// 获取调用栈信息
112112
stack := debug.Stack()
113-
logc.Error(t.ctx.Ctx, fmt.Sprintf("Recovered from rule eval goroutine panic: %s, RuleName: %s, RuleId: %s\n%s", r, rule.RuleName, rule.RuleId, stack))
113+
logc.Errorf(t.ctx.Ctx, "Recovered from rule eval goroutine panic: %s, RuleName: %s, RuleId: %s\n%s", r, rule.RuleName, rule.RuleId, stack)
114114
t.Restart(rule)
115115
}
116116
}()
@@ -120,9 +120,10 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
120120
case <-timer.C:
121121
// 处理任务信号量
122122
taskChan <- struct{}{}
123+
logc.Infof(t.ctx.Ctx, fmt.Sprintf("Handle eval task, RuleId: %v, RuleName: %s", rule.RuleId, rule.RuleName))
123124
t.executeTask(rule, taskChan)
124125
case <-ctx.Done():
125-
logc.Infof(t.ctx.Ctx, fmt.Sprintf("停止 RuleId: %v, RuleName: %s 的 Watch 协程", rule.RuleId, rule.RuleName))
126+
logc.Infof(t.ctx.Ctx, fmt.Sprintf("Stop eval task, RuleId: %v, RuleName: %s", rule.RuleId, rule.RuleName))
126127
return
127128
}
128129
timer.Reset(t.getEvalTimeDuration(rule.EvalInterval))
@@ -187,7 +188,7 @@ func (t *AlertRule) processDatasources(rule models.AlertRule) []string {
187188
func (t *AlertRule) processSingleDatasource(dsId string, rule models.AlertRule) []string {
188189
instance, err := t.ctx.DB.Datasource().GetInstance(dsId)
189190
if err != nil {
190-
logc.Errorf(t.ctx.Ctx, fmt.Sprintf("Failed to get datasource instance %s: %v", dsId, err))
191+
logc.Errorf(t.ctx.Ctx, "Failed to get datasource instance %s: %v", dsId, err)
191192
return nil
192193
}
193194

alert/eval/query.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,19 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
4343
return nil
4444
}
4545

46+
// 检查查询结果数量,避免过多结果导致系统压力
47+
if len(resQuery) > 1000 {
48+
logc.Errorf(ctx.Ctx, "Prometheus查询结果过多,可能影响性能,今提取前 1000 个数据点,规则ID: %s, 规则名称: %s, 结果数量: %d", rule.RuleId, rule.RuleName, len(resQuery))
49+
resQuery = resQuery[:1000]
50+
}
51+
4652
externalLabels = cli.(provider.PrometheusProvider).GetExternalLabels()
4753
default:
4854
logc.Errorf(ctx.Ctx, "不支持的指标类型, 规则ID: %s, 规则名称: %s, 数据源ID: %s, 类型: %s", rule.RuleId, rule.RuleName, datasourceId, datasourceType)
4955
return nil
5056
}
5157

52-
if resQuery == nil {
58+
if len(resQuery) == 0 {
5359
return nil
5460
}
5561

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ require (
122122
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
123123
github.com/pierrec/lz4/v4 v4.1.22 // indirect
124124
github.com/pkg/errors v0.9.1 // indirect
125+
github.com/prometheus/client_golang v1.23.2 // indirect
125126
github.com/prometheus/client_model v0.6.2 // indirect
126127
github.com/prometheus/common v0.67.4 // indirect
127128
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
318318
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
319319
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
320320
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
321+
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
322+
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
321323
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
322324
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
323325
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=

pkg/provider/metrics_prometheus.go

Lines changed: 120 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -3,193 +3,207 @@ package provider
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"net/http"
7-
"net/url"
8-
"strconv"
98
"time"
109
"watchAlert/internal/models"
11-
utilsHttp "watchAlert/pkg/tools"
10+
"watchAlert/pkg/tools"
11+
12+
"github.com/prometheus/client_golang/api"
13+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
14+
"github.com/prometheus/common/model"
1215

1316
"github.com/zeromicro/go-zero/core/logc"
1417
)
1518

1619
type PrometheusProvider struct {
20+
client v1.API
1721
ExternalLabels map[string]interface{}
1822
Address string
1923
Username string
2024
Password string
2125
Headers map[string]string
26+
Timeout int64
27+
}
28+
29+
// authenticatedTransport 包装 http.RoundTripper 以添加认证头和额外的headers
30+
type authenticatedTransport struct {
31+
Transport http.RoundTripper
32+
Username string
33+
Password string
34+
Headers map[string]string
35+
}
36+
37+
// RoundTrip 实现 http.RoundTripper 接口
38+
func (t *authenticatedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
39+
if t.Username != "" && t.Password != "" {
40+
req.SetBasicAuth(t.Username, t.Password)
41+
}
42+
43+
for key, value := range t.Headers {
44+
req.Header.Set(key, value)
45+
}
46+
47+
return t.Transport.RoundTrip(req)
2248
}
2349

2450
func NewPrometheusClient(ds models.AlertDataSource) (MetricsFactoryProvider, error) {
51+
transport := &http.Transport{
52+
Proxy: http.ProxyFromEnvironment,
53+
MaxIdleConns: 100,
54+
MaxIdleConnsPerHost: 10,
55+
IdleConnTimeout: 90 * time.Second,
56+
}
57+
58+
var roundTripper http.RoundTripper = transport
59+
if ds.Auth.User != "" || ds.Auth.Pass != "" || len(ds.HTTP.Headers) > 0 {
60+
roundTripper = &authenticatedTransport{
61+
Transport: transport,
62+
Username: ds.Auth.User,
63+
Password: ds.Auth.Pass,
64+
Headers: ds.HTTP.Headers,
65+
}
66+
}
67+
68+
clientConfig := api.Config{
69+
Address: ds.HTTP.URL,
70+
RoundTripper: roundTripper,
71+
}
72+
73+
client, err := api.NewClient(clientConfig)
74+
if err != nil {
75+
return nil, err
76+
}
77+
2578
return PrometheusProvider{
79+
client: v1.NewAPI(client),
2680
Address: ds.HTTP.URL,
2781
ExternalLabels: ds.Labels,
2882
Username: ds.Auth.User,
2983
Password: ds.Auth.Pass,
3084
Headers: ds.HTTP.Headers,
85+
Timeout: ds.HTTP.Timeout,
3186
}, nil
3287
}
3388

3489
type QueryResponse struct {
35-
Status string `json:"status"`
36-
VMData VMData `json:"data"`
90+
Status string `json:"status"`
91+
MetricData MetricData `json:"data"`
3792
}
3893

39-
type VMData struct {
40-
VMResult []VMResult `json:"result"`
41-
ResultType string `json:"resultType"`
94+
type MetricData struct {
95+
MetricResult []MetricResult `json:"result"`
96+
ResultType string `json:"resultType"`
4297
}
4398

44-
type VMResult struct {
99+
type MetricResult struct {
45100
Metric map[string]interface{} `json:"metric"`
46101
Value []interface{} `json:"value"`
47-
Values [][]interface{} `json:"values"` // for range query
102+
Values [][]interface{} `json:"values"`
48103
}
49104

50105
func (v PrometheusProvider) Query(promQL string) ([]Metrics, error) {
51-
params := url.Values{}
52-
params.Add("query", promQL)
53-
params.Add("time", strconv.FormatInt(time.Now().Unix(), 10))
54-
fullURL := fmt.Sprintf("%s%s?%s", v.Address, "/api/v1/query", params.Encode())
55-
56-
// 创建带认证的HTTP请求
57-
var headers = make(map[string]string)
58-
for key, value := range v.Headers {
59-
headers[key] = value
60-
}
61-
for key, value := range utilsHttp.CreateBasicAuthHeader(v.Username, v.Password) {
62-
headers[key] = value
63-
}
64-
65-
resp, err := utilsHttp.Get(headers, fullURL, 10)
106+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(v.Timeout)*time.Second)
107+
defer cancel()
108+
result, _, err := v.client.Query(ctx, promQL, time.Now(), v1.WithTimeout(time.Duration(v.Timeout)*time.Second))
66109
if err != nil {
67-
logc.Error(context.Background(), "Prometheus query failed", "error", err)
68-
return nil, fmt.Errorf("query failed: %w", err)
110+
return nil, err
69111
}
70-
defer resp.Body.Close()
71-
72-
if resp.StatusCode != http.StatusOK {
73-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
74-
}
75-
76-
var vmRespBody QueryResponse
77-
if err := utilsHttp.ParseReaderBody(resp.Body, &vmRespBody); err != nil {
78-
logc.Error(context.Background(), "Parse response failed", "error", err)
79-
return nil, fmt.Errorf("parse response failed: %w", err)
80-
}
81-
82-
return Vectors(vmRespBody.VMData.VMResult), nil
112+
return Vectors(result), nil
83113
}
84114

85115
func (v PrometheusProvider) QueryRange(promQL string, start, end time.Time, step time.Duration) ([]Metrics, error) {
86-
params := url.Values{}
87-
params.Add("query", promQL)
88-
params.Add("start", strconv.FormatInt(start.Unix(), 10))
89-
params.Add("end", strconv.FormatInt(end.Unix(), 10))
90-
params.Add("step", fmt.Sprintf("%.0fs", step.Seconds()))
91-
fullURL := fmt.Sprintf("%s%s?%s", v.Address, "/api/v1/query_range", params.Encode())
116+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(v.Timeout)*time.Second)
117+
defer cancel()
92118

93-
var headers = make(map[string]string)
94-
for key, value := range v.Headers {
95-
headers[key] = value
96-
}
97-
for key, value := range utilsHttp.CreateBasicAuthHeader(v.Username, v.Password) {
98-
headers[key] = value
119+
r := v1.Range{
120+
Start: start,
121+
End: end,
122+
Step: step,
99123
}
100124

101-
resp, err := utilsHttp.Get(headers, fullURL, 30)
125+
result, _, err := v.client.QueryRange(ctx, promQL, r, v1.WithTimeout(time.Duration(v.Timeout)*time.Second))
102126
if err != nil {
103-
logc.Error(context.Background(), "Prometheus query_range failed", "error", err)
104-
return nil, fmt.Errorf("query_range failed: %w", err)
127+
return nil, err
105128
}
106-
defer resp.Body.Close()
107129

108-
if resp.StatusCode != http.StatusOK {
109-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
110-
}
111-
112-
var vmRespBody QueryResponse
113-
if err := utilsHttp.ParseReaderBody(resp.Body, &vmRespBody); err != nil {
114-
logc.Error(context.Background(), "Parse response failed", "error", err)
115-
return nil, fmt.Errorf("parse response failed: %w", err)
116-
}
117-
118-
return Matrix(vmRespBody.VMData.VMResult), nil
130+
return Matrix(result), nil
119131
}
120132

121-
func Vectors(res []VMResult) []Metrics {
133+
func Vectors(value model.Value) []Metrics {
122134
var vectors []Metrics
123-
for _, item := range res {
124-
if len(item.Value) < 2 {
125-
continue
126-
}
135+
items, ok := value.(model.Vector)
136+
if !ok {
137+
return []Metrics{}
138+
}
127139

128-
timestamp, ok1 := item.Value[0].(float64)
129-
valueStr, ok2 := item.Value[1].(string)
130-
if !ok1 || !ok2 {
131-
logc.Error(context.Background(), "Invalid value format")
140+
for _, item := range items {
141+
if math.IsNaN(float64(item.Value)) {
142+
logc.Infof(context.Background(), "Skipping NaN or Inf value: %v", item.Value)
132143
continue
133144
}
134145

135-
valueFloat, err := strconv.ParseFloat(valueStr, 64)
136-
if err != nil {
137-
logc.Error(context.Background(), "Value conversion failed", "error", err)
138-
continue
146+
var metric = make(map[string]interface{})
147+
for k, v := range item.Metric {
148+
metric[string(k)] = string(v)
139149
}
140150

141151
vectors = append(vectors, Metrics{
142-
Metric: item.Metric,
143-
Value: valueFloat,
144-
Timestamp: timestamp,
152+
Metric: metric,
153+
Value: float64(item.Value),
154+
Timestamp: float64(item.Timestamp),
145155
})
146156
}
157+
147158
return vectors
148159
}
149160

150-
// vmMatrix 将 Prometheus QueryRange 结果转换为 Metrics 列表
151-
func Matrix(res []VMResult) []Metrics {
161+
// Matrix 将 Prometheus QueryRange 结果转换为 Metrics 列表
162+
func Matrix(value model.Value) []Metrics {
152163
var metrics []Metrics
153-
for _, item := range res {
154-
// 遍历每个时间序列的所有时间点
155-
for _, value := range item.Values {
156-
if len(value) < 2 {
157-
continue
158-
}
164+
matrix, ok := value.(model.Matrix)
165+
if !ok {
166+
return []Metrics{}
167+
}
159168

160-
timestamp, ok1 := value[0].(float64)
161-
valueStr, ok2 := value[1].(string)
162-
if !ok1 || !ok2 {
163-
logc.Error(context.Background(), "Invalid value format")
164-
continue
165-
}
169+
for _, stream := range matrix {
170+
var metric = make(map[string]interface{})
171+
for k, v := range stream.Metric {
172+
metric[string(k)] = string(v)
173+
}
166174

167-
valueFloat, err := strconv.ParseFloat(valueStr, 64)
168-
if err != nil {
169-
logc.Error(context.Background(), "Value conversion failed", "error", err)
175+
for _, value := range stream.Values {
176+
if math.IsNaN(float64(value.Value)) {
170177
continue
171178
}
172179

173180
metrics = append(metrics, Metrics{
174-
Metric: item.Metric,
175-
Value: valueFloat,
176-
Timestamp: timestamp,
181+
Timestamp: float64(value.Timestamp),
182+
Value: float64(value.Value),
183+
Metric: metric,
177184
})
178185
}
179186
}
187+
180188
return metrics
181189
}
182190

183191
func (v PrometheusProvider) Check() (bool, error) {
184-
res, err := utilsHttp.Get(utilsHttp.CreateBasicAuthHeader(v.Username, v.Password), v.Address+"/api/v1/query?query=1%2B1", 10)
192+
var headers map[string]string
193+
checkURL := v.Address + "/api/v1/query?query=1%2B1"
194+
if v.Username != "" && v.Password != "" {
195+
headers = tools.CreateBasicAuthHeader(v.Username, v.Password)
196+
}
197+
headers = tools.MergeHeaders(headers, v.Headers)
198+
res, err := tools.Get(headers, checkURL, int(v.Timeout))
185199
if err != nil {
186-
logc.Error(context.Background(), fmt.Errorf("health check failed: %w", err))
200+
logc.Errorf(context.Background(), "Health check failed, URL: %s, Error: %v", checkURL, err)
187201
return false, fmt.Errorf("health check failed: %w", err)
188202
}
189203
defer res.Body.Close()
190204

191205
if res.StatusCode != http.StatusOK {
192-
logc.Error(context.Background(), fmt.Errorf("unhealthy status: %d", res.StatusCode))
206+
logc.Errorf(context.Background(), "Health check received unhealthy status: %d, URL: %s", res.StatusCode, checkURL)
193207
return false, fmt.Errorf("unhealthy status: %d", res.StatusCode)
194208
}
195209
return true, nil

0 commit comments

Comments
 (0)