Skip to content

Commit bb85ab4

Browse files
author
Michal Tichák
committed
[core] OCTRL-1003
-aggregating metrics so we don't flod influxdb -changed json to influxdb line format to properly handle tags
1 parent 37c81be commit bb85ab4

File tree

9 files changed

+802
-101
lines changed

9 files changed

+802
-101
lines changed

common/ecsmetrics/metric.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import (
77
)
88

99
func NewMetric(name string) monitoring.Metric {
10-
timestamp := time.Now()
11-
metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()}
10+
metric := monitoring.Metric{Name: name, Timestamp: time.Now()}
1211
metric.AddTag("subsystem", "ECS")
1312
return metric
1413
}
@@ -18,13 +17,13 @@ func NewMetric(name string) monitoring.Metric {
1817
func TimerMS(metric *monitoring.Metric) func() {
1918
start := time.Now()
2019
return func() {
21-
metric.AddValue("execution_time_ms", time.Since(start).Milliseconds())
20+
metric.AddValueInt64("execution_time_ms", time.Since(start).Milliseconds())
2221
}
2322
}
2423

2524
func TimerNS(metric *monitoring.Metric) func() {
2625
start := time.Now()
2726
return func() {
28-
metric.AddValue("execution_time_ns", time.Since(start).Nanoseconds())
27+
metric.AddValueInt64("execution_time_ns", time.Since(start).Nanoseconds())
2928
}
3029
}

common/ecsmetrics/metrics.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func gather() monitoring.Metric {
3838
for _, sample := range samples {
3939
switch sample.Value.Kind() {
4040
case internalmetrics.KindUint64:
41-
metric.AddValue(sample.Name, sample.Value.Uint64())
41+
metric.AddValueUInt64(sample.Name, sample.Value.Uint64())
4242
case internalmetrics.KindFloat64:
43-
metric.AddValue(sample.Name, sample.Value.Float64())
43+
metric.AddValueFloat64(sample.Name, sample.Value.Float64())
4444
case internalmetrics.KindFloat64Histogram:
4545
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
4646
continue
@@ -64,7 +64,8 @@ func StartGolangMetrics(period time.Duration) {
6464
return
6565
default:
6666
log.Debug("sending golang metrics")
67-
monitoring.Send(gather())
67+
metric := gather()
68+
monitoring.Send(&metric)
6869
time.Sleep(period)
6970
}
7071
}

common/monitoring/metric.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,91 @@
11
package monitoring
22

3+
import (
4+
"fmt"
5+
"io"
6+
"time"
7+
8+
lp "github.com/influxdata/line-protocol/v2/lineprotocol"
9+
)
10+
311
type (
4-
TagsType map[string]any
5-
ValuesType map[string]any
12+
Tag struct {
13+
Name string
14+
Value string
15+
}
16+
17+
TagsType []Tag
18+
ValuesType map[string]interface{}
619
)
720

821
type Metric struct {
9-
Name string `json:"name"`
10-
Values ValuesType `json:"values"`
11-
Tags TagsType `json:"tags,omitempty"`
12-
Timestamp int64 `json:"timestamp"`
22+
Name string
23+
Values ValuesType
24+
Tags TagsType
25+
Timestamp time.Time
1326
}
1427

15-
func (metric *Metric) AddTag(tagName string, value any) {
16-
if metric.Tags == nil {
17-
metric.Tags = make(TagsType)
18-
}
19-
metric.Tags[tagName] = value
28+
func (metric *Metric) AddTag(tagName string, value string) {
29+
metric.Tags = append(metric.Tags, Tag{Name: tagName, Value: value})
2030
}
2131

22-
func (metric *Metric) AddValue(valueName string, value any) {
32+
func (metric *Metric) addValue(valueName string, value interface{}) {
2333
if metric.Values == nil {
2434
metric.Values = make(ValuesType)
2535
}
2636
metric.Values[valueName] = value
2737
}
38+
39+
func (metric *Metric) AddValueInt64(valueName string, value int64) {
40+
metric.addValue(valueName, value)
41+
}
42+
43+
func (metric *Metric) AddValueUInt64(valueName string, value uint64) {
44+
metric.addValue(valueName, value)
45+
}
46+
47+
func (metric *Metric) AddValueFloat64(valueName string, value float64) {
48+
metric.addValue(valueName, value)
49+
}
50+
51+
func (metric *Metric) MergeValues(other *Metric) {
52+
for valueName, value := range other.Values {
53+
if storedValue, ok := metric.Values[valueName]; ok {
54+
switch v := value.(type) {
55+
case int64:
56+
metric.Values[valueName] = v + storedValue.(int64)
57+
case uint64:
58+
metric.Values[valueName] = v + storedValue.(uint64)
59+
case float64:
60+
metric.Values[valueName] = v + storedValue.(float64)
61+
case string:
62+
}
63+
} else {
64+
metric.Values[valueName] = value
65+
}
66+
}
67+
}
68+
69+
func Format(writer io.Writer, metrics []Metric) error {
70+
var enc lp.Encoder
71+
72+
for _, metric := range metrics {
73+
enc.StartLine(metric.Name)
74+
for _, tag := range metric.Tags {
75+
enc.AddTag(tag.Name, tag.Value)
76+
}
77+
78+
for valueName, value := range metric.Values {
79+
// we cannot panic as we provide accessors only for allowed type with AddValue*
80+
enc.AddField(valueName, lp.MustNewValue(value))
81+
}
82+
enc.EndLine(metric.Timestamp)
83+
}
84+
85+
if err := enc.Err(); err != nil {
86+
return err
87+
}
88+
89+
_, err := fmt.Fprintf(writer, "%s", enc.Bytes())
90+
return err
91+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package monitoring
2+
3+
import (
4+
"hash/maphash"
5+
"time"
6+
)
7+
8+
type key struct {
9+
nameTagsHash uint64
10+
timestamp time.Time
11+
}
12+
13+
type bucketsType map[key]*Metric
14+
15+
type MetricsAggregate struct {
16+
hash maphash.Hash
17+
metricsBuckets bucketsType
18+
}
19+
20+
func NewMetricsAggregate() *MetricsAggregate {
21+
metrics := &MetricsAggregate{}
22+
metrics.metricsBuckets = make(bucketsType)
23+
return metrics
24+
}
25+
26+
func (this *MetricsAggregate) AddMetric(metric *Metric) {
27+
metricNameTagsToHash(&this.hash, metric)
28+
hashKey := hashValueAndReset(&this.hash)
29+
30+
k := key{nameTagsHash: hashKey, timestamp: time.Unix(metric.Timestamp.Unix(), 0)}
31+
if storedMetric, ok := this.metricsBuckets[k]; ok {
32+
storedMetric.MergeValues(metric)
33+
} else {
34+
this.metricsBuckets[k] = metric
35+
}
36+
}
37+
38+
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
39+
hash.WriteString(metric.Name)
40+
41+
for _, tag := range metric.Tags {
42+
hash.WriteString(tag.Name)
43+
hash.WriteString(tag.Value)
44+
}
45+
}
46+
47+
func hashValueAndReset(hash *maphash.Hash) uint64 {
48+
hashValue := hash.Sum64()
49+
hash.Reset()
50+
return hashValue
51+
}
52+
53+
func (this *MetricsAggregate) Clear() {
54+
this.hash.Reset()
55+
clear(this.metricsBuckets)
56+
}
57+
58+
func (this *MetricsAggregate) GetMetrics() []Metric {
59+
var result []Metric
60+
for key, metric := range this.metricsBuckets {
61+
metric.Timestamp = key.timestamp
62+
result = append(result, *metric)
63+
}
64+
return result
65+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package monitoring
2+
3+
import (
4+
"hash/maphash"
5+
"math/rand"
6+
"sort"
7+
"time"
8+
)
9+
10+
type bucketsReservoirSampleType map[key]*metricReservoirSample
11+
12+
type metricReservoirSample struct {
13+
metric Metric
14+
reservoir reservoirSampling
15+
}
16+
17+
type MetricsReservoirSampling struct {
18+
hash maphash.Hash
19+
metricsBuckets bucketsReservoirSampleType
20+
}
21+
22+
func NewMetricsReservoirSampling() *MetricsReservoirSampling {
23+
metrics := &MetricsReservoirSampling{}
24+
metrics.metricsBuckets = make(bucketsReservoirSampleType)
25+
return metrics
26+
}
27+
28+
func metricValueToFloat64(value interface{}) float64 {
29+
var asserted float64
30+
switch v := value.(type) {
31+
case int64:
32+
asserted = float64(v)
33+
case uint64:
34+
asserted = float64(v)
35+
case float64:
36+
asserted = v
37+
}
38+
return asserted
39+
}
40+
41+
func (this *MetricsReservoirSampling) AddMetric(metric *Metric) {
42+
for valueKey, value := range metric.Values {
43+
metricNameTagsToHash(&this.hash, metric)
44+
this.hash.WriteString(valueKey)
45+
k := key{nameTagsHash: hashValueAndReset(&this.hash), timestamp: time.Unix(metric.Timestamp.Unix(), 0)}
46+
if storedMetric, ok := this.metricsBuckets[k]; !ok {
47+
newReservoir := newReservoirSampling(valueKey, 1000)
48+
newReservoir.AddPoint(metricValueToFloat64(value))
49+
this.metricsBuckets[k] = &metricReservoirSample{metric: *metric, reservoir: newReservoir}
50+
} else {
51+
storedMetric.reservoir.AddPoint(metricValueToFloat64(value))
52+
}
53+
}
54+
}
55+
56+
func (this *MetricsReservoirSampling) Clear() {
57+
this.hash.Reset()
58+
clear(this.metricsBuckets)
59+
}
60+
61+
func (this *MetricsReservoirSampling) GetMetrics() []Metric {
62+
var result []Metric
63+
for key, reservoirMetric := range this.metricsBuckets {
64+
m := Metric{Name: reservoirMetric.metric.Name, Tags: reservoirMetric.metric.Tags, Timestamp: key.timestamp}
65+
66+
mean, median, min, p10, p30, p70, p90, max, count, poolSize := reservoirMetric.reservoir.GetStats()
67+
68+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_mean", mean)
69+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_median", median)
70+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_min", min)
71+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_p10", p10)
72+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_p30", p30)
73+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_p70", p70)
74+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_p90", p90)
75+
m.AddValueFloat64(reservoirMetric.reservoir.name+"_max", max)
76+
m.AddValueUInt64(reservoirMetric.reservoir.name+"_count", count)
77+
m.AddValueUInt64(reservoirMetric.reservoir.name+"_poolsize", poolSize)
78+
79+
result = append(result, m)
80+
}
81+
return result
82+
}
83+
84+
type reservoirSampling struct {
85+
samples []float64
86+
samplesLimit uint64
87+
name string
88+
countSinceReset uint64
89+
}
90+
91+
func newReservoirSampling(name string, limit uint64) reservoirSampling {
92+
return reservoirSampling{
93+
samples: make([]float64, 0, limit),
94+
samplesLimit: limit,
95+
name: name,
96+
countSinceReset: 0,
97+
}
98+
}
99+
100+
// https://en.wikipedia.org/wiki/Reservoir_sampling
101+
func (this *reservoirSampling) AddPoint(val float64) {
102+
this.countSinceReset += 1
103+
if len(this.samples) < int(this.samplesLimit) {
104+
this.samples = append(this.samples, val)
105+
} else {
106+
if j := rand.Int63n(int64(this.countSinceReset)); j < int64(len(this.samples)) {
107+
this.samples[j] = val
108+
}
109+
}
110+
}
111+
112+
func (this *reservoirSampling) indexForPercentile(percentile int) int {
113+
return int(float64(len(this.samples)) * 0.01 * float64(percentile))
114+
}
115+
116+
func (this *reservoirSampling) Reset() {
117+
this.samples = this.samples[:0]
118+
this.countSinceReset = 0
119+
}
120+
121+
func (this *reservoirSampling) GetStats() (mean float64, median float64, min float64, percentile10 float64, percentile30 float64, percentile70 float64, percentile90 float64, max float64, count uint64, poolSize uint64) {
122+
sort.Slice(this.samples, func(i, j int) bool { return this.samples[i] < this.samples[j] })
123+
124+
samplesCount := len(this.samples)
125+
if samplesCount == 0 {
126+
return 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
127+
}
128+
129+
var sum float64
130+
for _, val := range this.samples {
131+
sum += float64(val)
132+
}
133+
134+
return sum / float64(samplesCount),
135+
this.samples[this.indexForPercentile(50)],
136+
this.samples[0],
137+
this.samples[this.indexForPercentile(10)],
138+
this.samples[this.indexForPercentile(30)],
139+
this.samples[this.indexForPercentile(70)],
140+
this.samples[this.indexForPercentile(90)],
141+
this.samples[len(this.samples)-1],
142+
this.countSinceReset,
143+
uint64(len(this.samples))
144+
}

0 commit comments

Comments
 (0)