Skip to content

Commit 54ca5f8

Browse files
author
Michal Tichák
committed
[core] Adding http metrics endpoint
1 parent de4d866 commit 54ca5f8

File tree

8 files changed

+485
-3
lines changed

8 files changed

+485
-3
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))
7171
GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
7272
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
7373
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
74-
GO_TEST_DIRS := ./core/repos ./core/integration/dcs
74+
GO_TEST_DIRS := ./core/repos ./core/integration/dcs ./common/monitoring
7575

7676
coverage:COVERAGE_PREFIX := ./coverage_results
7777
coverage:GOTEST_COVERAGE_FILE := $(COVERAGE_PREFIX)/gotest.out

common/ecsmetrics/metric.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package ecsmetrics
2+
3+
import (
4+
"time"
5+
6+
"github.com/AliceO2Group/Control/common/monitoring"
7+
)
8+
9+
func NewMetric(name string) monitoring.Metric {
10+
timestamp := time.Now()
11+
metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()}
12+
metric.AddTag("subsystem", "ECS")
13+
return metric
14+
}

common/ecsmetrics/metrics.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package ecsmetrics
2+
3+
import (
4+
internalmetrics "runtime/metrics"
5+
"time"
6+
7+
"github.com/AliceO2Group/Control/common/logger"
8+
"github.com/AliceO2Group/Control/common/monitoring"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
var (
13+
endRequestChannel chan struct{}
14+
log = logger.New(logrus.StandardLogger(), "ecsmetrics")
15+
)
16+
17+
func gather() monitoring.Metric {
18+
samples := []internalmetrics.Sample{
19+
{Name: "/gc/cycles/total:gc-cycles"},
20+
{Name: "/memory/classes/other:bytes"},
21+
{Name: "/memory/classes/total:bytes"},
22+
{Name: "/sched/goroutines:goroutines"},
23+
{Name: "/sync/mutex/wait/total:seconds"},
24+
{Name: "/memory/classes/other:bytes"},
25+
{Name: "/memory/classes/total:bytes"},
26+
{Name: "/memory/classes/heap/free:bytes"},
27+
{Name: "/memory/classes/heap/objects:bytes"},
28+
{Name: "/memory/classes/heap/released:bytes"},
29+
{Name: "/memory/classes/heap/stacks:bytes"},
30+
{Name: "/memory/classes/heap/unused:bytes"},
31+
}
32+
33+
// Collect metrics data
34+
internalmetrics.Read(samples)
35+
36+
metric := NewMetric("golangruntimemetrics")
37+
38+
for _, sample := range samples {
39+
switch sample.Value.Kind() {
40+
case internalmetrics.KindUint64:
41+
metric.AddValue(sample.Name, sample.Value.Uint64())
42+
case internalmetrics.KindFloat64:
43+
metric.AddValue(sample.Name, sample.Value.Float64())
44+
case internalmetrics.KindFloat64Histogram:
45+
log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name)
46+
continue
47+
default:
48+
log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
49+
continue
50+
}
51+
}
52+
return metric
53+
}
54+
55+
func StartGolangMetrics(period time.Duration) {
56+
go func() {
57+
for {
58+
select {
59+
case <-endRequestChannel:
60+
endRequestChannel <- struct{}{}
61+
return
62+
default:
63+
monitoring.Send(gather())
64+
time.Sleep(period)
65+
}
66+
}
67+
}()
68+
}
69+
70+
func StopGolangMetrics() {
71+
endRequestChannel <- struct{}{}
72+
<-endRequestChannel
73+
}

common/monitoring/metric.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package monitoring
2+
3+
type (
4+
TagsType map[string]any
5+
ValuesType map[string]any
6+
)
7+
8+
type Metric struct {
9+
Name string `json:"name"`
10+
Values ValuesType `json:"values"`
11+
Tags TagsType `json:"tags,omitempty"`
12+
Timestamp int64 `json:"timestamp"`
13+
}
14+
15+
func (metric *Metric) AddTag(tagName string, value any) {
16+
if metric.Tags == nil {
17+
metric.Tags = make(TagsType)
18+
}
19+
metric.Tags[tagName] = value
20+
}
21+
22+
func (metric *Metric) AddValue(valueName string, value any) {
23+
if metric.Values == nil {
24+
metric.Values = make(ValuesType)
25+
}
26+
metric.Values[valueName] = value
27+
}

common/monitoring/monitoring.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package monitoring
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
10+
"github.com/AliceO2Group/Control/common/logger"
11+
"github.com/sirupsen/logrus"
12+
)
13+
14+
var (
15+
server *http.Server
16+
metricsLimit int = 1000000
17+
metrics []Metric
18+
// channel that is used to request end of metrics server, it sends notification when server ended.
19+
// It needs to be read!!!
20+
endChannel chan struct{}
21+
22+
// channel used to send metrics into the event loop
23+
metricsChannel chan Metric
24+
25+
// channel for sending notifications to event loop that new http Request to report metrics arrived
26+
metricsRequestChannel chan struct{}
27+
28+
// channel used to send metrics to be reported by http request from event loop
29+
metricsToRequest chan []Metric
30+
31+
Log = logger.New(logrus.StandardLogger(), "metrics")
32+
)
33+
34+
func initChannels(messageBufferSize int) {
35+
endChannel = make(chan struct{})
36+
metricsRequestChannel = make(chan struct{})
37+
metricsChannel = make(chan Metric, 100)
38+
metricsToRequest = make(chan []Metric)
39+
metricsLimit = messageBufferSize
40+
}
41+
42+
func closeChannels() {
43+
close(endChannel)
44+
close(metricsRequestChannel)
45+
close(metricsChannel)
46+
close(metricsToRequest)
47+
}
48+
49+
func eventLoop() {
50+
for {
51+
select {
52+
case <-metricsRequestChannel:
53+
shallowCopyMetrics := metrics
54+
metrics = make([]Metric, 0)
55+
metricsToRequest <- shallowCopyMetrics
56+
57+
case metric := <-metricsChannel:
58+
if len(metrics) < metricsLimit {
59+
metrics = append(metrics, metric)
60+
} else {
61+
Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
62+
}
63+
64+
case <-endChannel:
65+
endChannel <- struct{}{}
66+
return
67+
}
68+
}
69+
}
70+
71+
func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
72+
w.Header().Set("Content-Type", "application/json")
73+
metricsRequestChannel <- struct{}{}
74+
metricsToConvert := <-metricsToRequest
75+
if metricsToConvert == nil {
76+
metricsToConvert = make([]Metric, 0)
77+
}
78+
json.NewEncoder(w).Encode(metricsToConvert)
79+
}
80+
81+
func Send(metric Metric) {
82+
metricsChannel <- metric
83+
}
84+
85+
func handleFunc(endpointName string) {
86+
// recover is here to correctly allow multiple Starts and Stops of server
87+
defer func() {
88+
recover()
89+
}()
90+
91+
http.HandleFunc(endpointName, exportMetricsAndReset)
92+
}
93+
94+
// \param port port where the scraping endpoint will be created
95+
// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics"
96+
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
97+
//
98+
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
99+
func Start(port uint16, endpointName string, messageBufferSize int) error {
100+
if server != nil {
101+
return nil
102+
}
103+
104+
initChannels(messageBufferSize)
105+
106+
go eventLoop()
107+
108+
server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
109+
handleFunc(endpointName)
110+
return server.ListenAndServe()
111+
}
112+
113+
func Stop() {
114+
if server == nil {
115+
return
116+
}
117+
118+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
119+
defer cancel()
120+
server.Shutdown(ctx)
121+
122+
endChannel <- struct{}{}
123+
<-endChannel
124+
server = nil
125+
}

0 commit comments

Comments
 (0)