Skip to content

Commit bea152b

Browse files
author
Michal Tichák
committed
[core] add docs for metrics
1 parent c9449da commit bea152b

File tree

13 files changed

+330
-136
lines changed

13 files changed

+330
-136
lines changed

common/ecsmetrics/metric.go

Lines changed: 0 additions & 53 deletions
This file was deleted.

common/event/writer.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sync"
3131
"time"
3232

33-
"github.com/AliceO2Group/Control/common/ecsmetrics"
3433
"github.com/AliceO2Group/Control/common/event/topic"
3534
"github.com/AliceO2Group/Control/common/logger"
3635
"github.com/AliceO2Group/Control/common/logger/infologger"
@@ -83,7 +82,7 @@ type KafkaWriter struct {
8382
}
8483

8584
func (w *KafkaWriter) newMetric(name string) monitoring.Metric {
86-
metric := ecsmetrics.NewMetric(name)
85+
metric := monitoring.NewMetric(name)
8786
metric.AddTag("topic", w.Topic)
8887
return metric
8988
}
@@ -103,7 +102,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
103102
}
104103

105104
writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
106-
defer ecsmetrics.TimerNS(metric)()
105+
defer monitoring.TimerNS(metric)()
107106
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
108107
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
109108
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
@@ -243,7 +242,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
243242
metric := w.newMetric(KAFKAPREPARE)
244243

245244
func() {
246-
defer ecsmetrics.TimerNS(&metric)()
245+
defer monitoring.TimerNS(&metric)()
247246
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
248247
if err != nil {
249248
log.WithField("event", e).
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Intergovernmental Organization or submit itself to any jurisdiction.
2323
*/
2424

25-
package ecsmetrics
25+
package golangmetrics
2626

2727
import (
2828
internalmetrics "runtime/metrics"
@@ -57,7 +57,7 @@ func gather() monitoring.Metric {
5757

5858
internalmetrics.Read(samples)
5959

60-
metric := NewMetric("golangruntimemetrics")
60+
metric := monitoring.NewMetric("golangruntimemetrics")
6161

6262
for _, sample := range samples {
6363
switch sample.Value.Kind() {
@@ -76,7 +76,7 @@ func gather() monitoring.Metric {
7676
return metric
7777
}
7878

79-
func StartGolangMetrics(period time.Duration) {
79+
func Start(period time.Duration) {
8080
log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting")
8181
go func() {
8282
log.Debug("Starting golang metrics goroutine")
@@ -96,7 +96,7 @@ func StartGolangMetrics(period time.Duration) {
9696
}()
9797
}
9898

99-
func StopGolangMetrics() {
99+
func Stop() {
100100
endRequestChannel <- struct{}{}
101101
<-endRequestChannel
102102
}
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,29 @@
2222
* Intergovernmental Organization or submit itself to any jurisdiction.
2323
*/
2424

25-
package ecsmetrics
25+
package monitoring
2626

2727
import (
28-
"testing"
28+
"hash/maphash"
2929
"time"
30-
31-
"github.com/AliceO2Group/Control/common/monitoring"
3230
)
3331

34-
func measureFunc(metric *monitoring.Metric) {
35-
defer TimerMS(metric)()
36-
defer TimerNS(metric)()
37-
time.Sleep(100 * time.Millisecond)
32+
type key struct {
33+
nameTagsHash uint64
34+
timestamp time.Time
3835
}
3936

40-
func TestSimpleStartStop(t *testing.T) {
41-
metric := NewMetric("test")
42-
measureFunc(&metric)
43-
fields := metric.GetFields()
44-
if fields["execution_time_ms"].(int64) < 100 {
45-
t.Error("wrong milliseconds")
46-
}
47-
if fields["execution_time_ns"].(int64) < 100000000 {
48-
t.Error("wrong nanoseconds")
37+
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
38+
hash.WriteString(metric.name)
39+
40+
for _, tag := range metric.tags {
41+
hash.WriteString(tag.name)
42+
hash.WriteString(tag.value)
4943
}
5044
}
45+
46+
func hashValueAndReset(hash *maphash.Hash) uint64 {
47+
hashValue := hash.Sum64()
48+
hash.Reset()
49+
return hashValue
50+
}

common/monitoring/metric.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,21 @@ type Metric struct {
4949
timestamp time.Time
5050
}
5151

52-
func NewMetric(name string, timestamp time.Time) Metric {
52+
// Return empty metric, it is used right now mostly in string
53+
// to report metrics correctly to influxdb use NewECSMetric
54+
func NewDefaultMetric(name string, timestamp time.Time) Metric {
5355
return Metric{
5456
name: name, timestamp: timestamp,
5557
}
5658
}
5759

58-
func (metric *Metric) GetFields() (fields FieldsType) {
59-
for fieldName, field := range metric.fields {
60-
fields[fieldName] = field
61-
}
62-
return
60+
// creates empty metric with tag subsystem=ECS, which
61+
// is used by Telegraf to send metrics from ECS to correct
62+
// bucket
63+
func NewMetric(name string) Metric {
64+
metric := NewDefaultMetric(name, time.Now())
65+
metric.AddTag("subsystem", "ECS")
66+
return metric
6367
}
6468

6569
func (metric *Metric) AddTag(tagName string, value string) {

common/monitoring/metricsaggregate.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ import (
2929
"time"
3030
)
3131

32-
type key struct {
33-
nameTagsHash uint64
34-
timestamp time.Time
35-
}
36-
3732
type bucketsType map[key]*Metric
3833

3934
type MetricsAggregate struct {
@@ -59,21 +54,6 @@ func (this *MetricsAggregate) AddMetric(metric *Metric) {
5954
}
6055
}
6156

62-
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
63-
hash.WriteString(metric.name)
64-
65-
for _, tag := range metric.tags {
66-
hash.WriteString(tag.name)
67-
hash.WriteString(tag.value)
68-
}
69-
}
70-
71-
func hashValueAndReset(hash *maphash.Hash) uint64 {
72-
hashValue := hash.Sum64()
73-
hash.Reset()
74-
return hashValue
75-
}
76-
7757
func (this *MetricsAggregate) Clear() {
7858
this.hash.Reset()
7959
clear(this.metricsBuckets)

common/monitoring/metricsreservoirsampling.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type MetricsReservoirSampling struct {
4646
func NewMetricsReservoirSampling() *MetricsReservoirSampling {
4747
metrics := &MetricsReservoirSampling{}
4848
metrics.metricsBuckets = make(bucketsReservoirSampleType)
49+
metrics.hash.SetSeed(maphash.MakeSeed())
4950
return metrics
5051
}
5152

common/monitoring/monitoring.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ import (
3636
)
3737

3838
var (
39-
server *http.Server
39+
// scraping endpoint implementation
40+
server *http.Server
41+
// objects to store incoming metrics
4042
metricsInternal *MetricsAggregate
4143
metricsHistogramInternal *MetricsReservoirSampling
42-
// metrics []Metric
4344
// channel that is used to request end of metrics server, it sends notification when server ended.
4445
// It needs to be read!!!
4546
endChannel chan struct{}
@@ -59,7 +60,7 @@ var (
5960
log = logger.New(logrus.StandardLogger(), "metrics").WithField("level", infologger.IL_Devel)
6061
)
6162

62-
func initChannels(messageBufferSize int) {
63+
func initChannels() {
6364
endChannel = make(chan struct{})
6465
metricsRequestedChannel = make(chan struct{})
6566
// 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
@@ -79,11 +80,12 @@ func closeChannels() {
7980
}
8081

8182
// this eventLoop is the main part that processes all metrics send to the package
82-
// 3 events can happen:
83+
// 4 events can happen:
8384
// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice
84-
// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
85+
// 2. metricsHistosChannel receives message from Send() method. We just add the new metric to metrics slice
86+
// 3. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
8587
// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice
86-
// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
88+
// 4. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
8789
// that eventLoop stopped
8890
func eventLoop() {
8991
for {
@@ -144,15 +146,14 @@ func handleFunc(endpointName string) {
144146

145147
// \param port port where the scraping endpoint will be created
146148
// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics"
147-
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
148149
//
149150
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
150-
func Run(port uint16, endpointName string, messageBufferSize int) error {
151+
func Run(port uint16, endpointName string) error {
151152
if IsRunning() {
152153
return nil
153154
}
154155

155-
initChannels(messageBufferSize)
156+
initChannels()
156157

157158
go eventLoop()
158159

common/monitoring/monitoring_test.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages in
6969
}
7070

7171
func TestSimpleStartStop(t *testing.T) {
72-
go Run(1234, "/random", 100)
72+
go Run(1234, "/random")
7373
isRunningWithTimeout(t, time.Second)
7474
Stop()
7575
}
7676

7777
func TestStartMultipleStop(t *testing.T) {
78-
go Run(1234, "/random", 100)
78+
go Run(1234, "/random")
7979
isRunningWithTimeout(t, time.Second)
8080
Stop()
8181
Stop()
@@ -86,7 +86,7 @@ func cleaningUpAfterTest() {
8686
}
8787

8888
func initTest() {
89-
go Run(12345, "notimportant", 100)
89+
go Run(12345, "notimportant")
9090
}
9191

9292
// decorator function that properly inits and cleans after higher level test of Monitoring package
@@ -131,7 +131,7 @@ func TestExportingMetrics(t *testing.T) {
131131
}
132132

133133
func TestHttpRun(t *testing.T) {
134-
go Run(9876, "/metrics", 10)
134+
go Run(9876, "/metrics")
135135
defer Stop()
136136

137137
isRunningWithTimeout(t, time.Second)
@@ -545,6 +545,24 @@ func TestMetricsHistogramObject(t *testing.T) {
545545
}
546546
}
547547

548+
func measureFunc(metric *Metric) {
549+
defer TimerMS(metric)()
550+
defer TimerNS(metric)()
551+
time.Sleep(100 * time.Millisecond)
552+
}
553+
554+
func TestTimers(t *testing.T) {
555+
metric := NewMetric("test")
556+
measureFunc(&metric)
557+
fields := metric.fields
558+
if fields["execution_time_ms"].(int64) < 100 {
559+
t.Error("wrong milliseconds")
560+
}
561+
if fields["execution_time_ns"].(int64) < 100000000 {
562+
t.Error("wrong nanoseconds")
563+
}
564+
}
565+
548566
func BenchmarkSimple(b *testing.B) {
549567
cpuProfileFile, err := os.Create("cpu_profile.pprof")
550568
if err != nil {
@@ -575,19 +593,8 @@ func BenchmarkSimple(b *testing.B) {
575593
pprof.WriteHeapProfile(heapProfileFile)
576594
}
577595

578-
// This benchmark cannot be run for too long as it will fill whole RAM even with
579-
// results:
580-
// goos: linux
581-
// goarch: amd64
582-
// pkg: github.com/AliceO2Group/Control/common/monitoring
583-
// cpu: 11th Gen Intel(R) Core(TM) i9-11900H @ 2.50GHz
584-
// BenchmarkSendingMetrics-16
585-
//
586-
// 123365481 192.6 ns/op
587-
// PASS
588-
// ok github.com/AliceO2Group/Control/common/monitoring 44.686s
589596
func BenchmarkSendingMetrics(b *testing.B) {
590-
go Run(12345, "/metrics", 100)
597+
go Run(12345, "/metrics")
591598

592599
defer Stop()
593600

common/monitoring/timer.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package monitoring
2+
3+
import "time"
4+
5+
// Timer* functions are meant to be used with defer statement to measure runtime of given function:
6+
// defer TimerNS(&metric)()
7+
func TimerMS(metric *Metric) func() {
8+
start := time.Now()
9+
return func() {
10+
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
11+
}
12+
}
13+
14+
func TimerNS(metric *Metric) func() {
15+
start := time.Now()
16+
return func() {
17+
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
18+
}
19+
}

0 commit comments

Comments
 (0)