Skip to content

Commit 92045b5

Browse files
author
Michal Tichák
committed
[core] add docs for metrics
1 parent c9449da commit 92045b5

File tree

7 files changed

+300
-32
lines changed

7 files changed

+300
-32
lines changed

common/monitoring/common.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package monitoring
26+
27+
import (
28+
"hash/maphash"
29+
"time"
30+
)
31+
32+
type key struct {
33+
nameTagsHash uint64
34+
timestamp time.Time
35+
}
36+
37+
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
38+
hash.WriteString(metric.name)
39+
40+
for _, tag := range metric.tags {
41+
hash.WriteString(tag.name)
42+
hash.WriteString(tag.value)
43+
}
44+
}
45+
46+
func hashValueAndReset(hash *maphash.Hash) uint64 {
47+
hashValue := hash.Sum64()
48+
hash.Reset()
49+
return hashValue
50+
}

common/monitoring/metricsaggregate.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ import (
2929
"time"
3030
)
3131

32-
type key struct {
33-
nameTagsHash uint64
34-
timestamp time.Time
35-
}
36-
3732
type bucketsType map[key]*Metric
3833

3934
type MetricsAggregate struct {
@@ -59,21 +54,6 @@ func (this *MetricsAggregate) AddMetric(metric *Metric) {
5954
}
6055
}
6156

62-
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
63-
hash.WriteString(metric.name)
64-
65-
for _, tag := range metric.tags {
66-
hash.WriteString(tag.name)
67-
hash.WriteString(tag.value)
68-
}
69-
}
70-
71-
func hashValueAndReset(hash *maphash.Hash) uint64 {
72-
hashValue := hash.Sum64()
73-
hash.Reset()
74-
return hashValue
75-
}
76-
7757
func (this *MetricsAggregate) Clear() {
7858
this.hash.Reset()
7959
clear(this.metricsBuckets)

common/monitoring/metricsreservoirsampling.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type MetricsReservoirSampling struct {
4646
func NewMetricsReservoirSampling() *MetricsReservoirSampling {
4747
metrics := &MetricsReservoirSampling{}
4848
metrics.metricsBuckets = make(bucketsReservoirSampleType)
49+
metrics.hash.SetSeed(maphash.MakeSeed())
4950
return metrics
5051
}
5152

common/monitoring/monitoring.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ import (
3636
)
3737

3838
var (
39-
server *http.Server
39+
// scraping endpoint implementation
40+
server *http.Server
41+
// objects to store incoming metrics
4042
metricsInternal *MetricsAggregate
4143
metricsHistogramInternal *MetricsReservoirSampling
42-
// metrics []Metric
4344
// channel that is used to request end of metrics server, it sends notification when server ended.
4445
// It needs to be read!!!
4546
endChannel chan struct{}
@@ -59,7 +60,7 @@ var (
5960
log = logger.New(logrus.StandardLogger(), "metrics").WithField("level", infologger.IL_Devel)
6061
)
6162

62-
func initChannels(messageBufferSize int) {
63+
func initChannels() {
6364
endChannel = make(chan struct{})
6465
metricsRequestedChannel = make(chan struct{})
6566
// 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
@@ -79,11 +80,12 @@ func closeChannels() {
7980
}
8081

8182
// this eventLoop is the main part that processes all metrics send to the package
82-
// 3 events can happen:
83+
// 4 events can happen:
8384
// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice
84-
// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
85+
// 2. metricsHistosChannel receives message from Send() method. We just add the new metric to metrics slice
86+
// 3. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
8587
// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice
86-
// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
88+
// 4. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
8789
// that eventLoop stopped
8890
func eventLoop() {
8991
for {
@@ -144,15 +146,14 @@ func handleFunc(endpointName string) {
144146

145147
// \param port port where the scraping endpoint will be created
146148
// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics"
147-
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
148149
//
149150
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
150-
func Run(port uint16, endpointName string, messageBufferSize int) error {
151+
func Run(port uint16, endpointName string) error {
151152
if IsRunning() {
152153
return nil
153154
}
154155

155-
initChannels(messageBufferSize)
156+
initChannels()
156157

157158
go eventLoop()
158159

core/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ func setDefaults() error {
129129
viper.SetDefault("enableKafka", true)
130130
viper.SetDefault("logAllIL", false)
131131
viper.SetDefault("metricsEndpoint", "8088/ecsmetrics")
132-
viper.SetDefault("metricsBufferSize", 1000000)
133132
return nil
134133
}
135134

@@ -201,7 +200,6 @@ func setFlags() error {
201200
pflag.Bool("enableKafka", viper.GetBool("enableKafka"), "Turn on the kafka messaging")
202201
pflag.Bool("logAllIL", viper.GetBool("logAllIL"), "Send all the logs into IL, including Debug and Trace messages")
203202
pflag.String("metricsEndpoint", viper.GetString("metricsEndpoint"), "Http endpoint from which metrics can be scraped: [port/endpoint]")
204-
pflag.Int("metricsBufferSize", viper.GetInt("metricsBufferSize"), "Limit for how many metrics can be stored in buffer in between scraping requests")
205203

206204
pflag.Parse()
207205
return viper.BindPFlags(pflag.CommandLine)

core/core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func runMetrics() {
8282

8383
go func() {
8484
log.Infof("Starting to listen on endpoint %s:%d for metrics", endpoint, port)
85-
if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed {
85+
if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint)); err != nil && err != http.ErrServerClosed {
8686
ecsmetrics.StopGolangMetrics()
8787
log.Errorf("failed to run metrics on port %d and endpoint: %s")
8888
}

0 commit comments

Comments
 (0)