Skip to content

Commit 9258177

Browse files
Michal Tichákjustonedev1
authored andcommitted
[monitoring] Refactor monitoring so Send doesn't have to wait until library is running
1 parent 365efca commit 9258177

File tree

3 files changed

+51
-49
lines changed

3 files changed

+51
-49
lines changed

common/monitoring/monitoring.go

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -42,50 +42,38 @@ var (
4242
// atomic holder for the HTTP server instance
4343
server atomic.Pointer[http.Server]
4444
// objects to store incoming metrics
45-
metricsInternal *MetricsAggregate
46-
metricsHistogramInternal *MetricsReservoirSampling
45+
metricsInternal *MetricsAggregate = NewMetricsAggregate()
46+
metricsHistogramInternal *MetricsReservoirSampling = NewMetricsReservoirSampling()
4747
// channel that is used to request end of metrics server, it sends notification when server ended.
4848
// It needs to be read!!!
49-
endChannel chan struct{}
49+
endChannel chan struct{} = make(chan struct{})
5050

5151
// channel used to send metrics into the event loop
52-
metricsChannel chan Metric
52+
// 100000 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
53+
// multiple goroutines want to send metrics without blocking each other
54+
metricsChannel chan Metric = make(chan Metric, 100000)
5355

5456
// channel used to send metrics meant to be proceesed as histogram into the event loop
55-
metricsHistosChannel chan Metric
57+
// 100000 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
58+
// multiple goroutines want to send metrics without blocking each other
59+
metricsHistosChannel chan Metric = make(chan Metric, 100000)
5660

5761
// channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest
58-
metricsRequestedChannel chan struct{}
62+
metricsRequestedChannel chan struct{} = make(chan struct{})
5963

6064
// channel used to send metrics to be reported by http request from event loop
61-
metricsExportedToRequest chan []Metric
65+
metricsExportedToRequest chan []Metric = make(chan []Metric)
6266

63-
log = logger.New(logrus.StandardLogger(), "metrics").WithField("level", infologger.IL_Devel)
64-
)
65-
66-
func initChannels() {
67-
endChannel = make(chan struct{})
68-
metricsRequestedChannel = make(chan struct{})
69-
// 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
70-
// multiple goroutines want to send metrics without blocking each other
71-
metricsChannel = make(chan Metric, 100000)
72-
metricsHistosChannel = make(chan Metric, 100000)
73-
metricsExportedToRequest = make(chan []Metric)
74-
metricsInternal = NewMetricsAggregate()
75-
metricsHistogramInternal = NewMetricsReservoirSampling()
76-
}
67+
// WaitUntilRunning is waiting until this channel is closed
68+
waitUntilRunningChannel chan struct{} = make(chan struct{})
7769

78-
func closeChannels() {
79-
close(endChannel)
80-
close(metricsRequestedChannel)
81-
close(metricsChannel)
82-
close(metricsExportedToRequest)
83-
}
70+
log = logger.New(logrus.StandardLogger(), "metrics").WithField(infologger.Level, infologger.IL_Devel)
71+
)
8472

8573
// this eventLoop is the main part that processes all metrics send to the package
8674
// 4 events can happen:
87-
// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice
88-
// 2. metricsHistosChannel receives message from Send() method. We just add the new metric to metrics slice
75+
// 1. metricsChannel receives message from Send() method. We add the new metric to metrics slice
76+
// 2. metricsHistosChannel receives message from Send() method. We add the new metric to metrics slice
8977
// 3. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
9078
// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice
9179
// 4. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
@@ -130,14 +118,18 @@ func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
130118
}
131119

132120
func Send(metric *Metric) {
133-
if IsRunning() {
134-
metricsChannel <- *metric
121+
// drop overflowing messages to not slowdown processing, we don't log so we don't flood IL
122+
select {
123+
case metricsChannel <- *metric:
124+
default:
135125
}
136126
}
137127

138128
func SendHistogrammable(metric *Metric) {
139-
if IsRunning() {
140-
metricsHistosChannel <- *metric
129+
// drop overflowing messages to not slowdown processing, we don't log so we don't flood IL
130+
select {
131+
case metricsHistosChannel <- *metric:
132+
default:
141133
}
142134
}
143135

@@ -160,10 +152,10 @@ func Run(port uint16, endpointName string) error {
160152
if !server.CompareAndSwap(nil, localServer) {
161153
return nil
162154
}
163-
initChannels()
164155
go eventLoop()
165156
handleFunc(endpointName)
166157
// block until Shutdown is called
158+
close(waitUntilRunningChannel)
167159
return localServer.ListenAndServe()
168160
}
169161

@@ -176,9 +168,22 @@ func Stop() {
176168
defer cancel()
177169
localServer.Shutdown(ctx)
178170
endChannel <- struct{}{}
171+
_, ok := <-waitUntilRunningChannel
172+
if !ok {
173+
waitUntilRunningChannel = make(chan struct{})
174+
}
179175
<-endChannel
180176
}
181177

182-
func IsRunning() bool {
183-
return server.Load() != nil
178+
// If monitoring is not running it will wait until monitoring is running or
179+
// timeout is triggered.
180+
// \return true if monitoring is running, false if timeout occured
181+
func WaitUntilRunning(timeout time.Duration) bool {
182+
timeoutChan := time.After(timeout)
183+
select {
184+
case <-waitUntilRunningChannel:
185+
return true
186+
case <-timeoutChan:
187+
return false
188+
}
184189
}

common/monitoring/monitoring_test.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,8 @@ import (
4040

4141
// blocks until either IsRunning() returns true or timeout is triggered
4242
func isRunningWithTimeout(t *testing.T, timeout time.Duration) {
43-
timeoutChan := time.After(timeout)
44-
for !IsRunning() {
45-
select {
46-
case <-timeoutChan:
47-
t.Errorf("Monitoring is not running even after %v", timeout)
48-
return
49-
50-
default:
51-
time.Sleep(10 * time.Millisecond)
52-
}
43+
if !WaitUntilRunning(timeout) {
44+
t.Errorf("Failed to init monitoring library in %v", timeout)
5345
}
5446
}
5547

@@ -126,7 +118,7 @@ func TestHttpRun(t *testing.T) {
126118
go Run(9876, "/metrics")
127119
defer Stop()
128120

129-
isRunningWithTimeout(t, time.Second)
121+
isRunningWithTimeout(t, 5*time.Second)
130122

131123
metric := Metric{name: "test"}
132124
metric.timestamp = time.Unix(10, 0)
@@ -140,12 +132,12 @@ func TestHttpRun(t *testing.T) {
140132
}
141133
message, err := io.ReadAll(response.Body)
142134
if err != nil {
143-
t.Errorf("Failed to read response Body: %v", err)
135+
t.Fatalf("Failed to read response Body: %v", err)
144136
}
145137

146138
receivedMetrics, err := parseMultipleLineProtocol(string(message))
147139
if err != nil {
148-
t.Errorf("Failed to parse message: %v", string(message))
140+
t.Fatalf("Failed to parse message: %v with err: %v", string(message), err)
149141
}
150142

151143
receivedMetric := receivedMetrics[0]

core/core.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ func runMetrics() {
9090
}
9191
}()
9292

93+
monitoringTimeout := 30 * time.Second
94+
if !monitoring.WaitUntilRunning(monitoringTimeout) {
95+
log.WithField(infologger.Level, infologger.IL_Devel).Warnf("Failed to initialize monitoring framework in %v, it might catch up later. For now we are starting without metrics", monitoringTimeout)
96+
}
97+
9398
golangmetrics.Start(10 * time.Second)
9499
}
95100

0 commit comments

Comments
 (0)