Skip to content

Commit 6161c64

Browse files
author
Michal Tichák
committed
fixup! [core] OCTRL-1003
1 parent 19c9a33 commit 6161c64

File tree

10 files changed

+414
-209
lines changed

10 files changed

+414
-209
lines changed

common/ecsmetrics/metric.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
@@ -7,7 +31,7 @@ import (
731
)
832

933
func NewMetric(name string) monitoring.Metric {
10-
metric := monitoring.Metric{Name: name, Timestamp: time.Now()}
34+
metric := monitoring.NewMetric(name, time.Now())
1135
metric.AddTag("subsystem", "ECS")
1236
return metric
1337
}
@@ -17,13 +41,13 @@ func NewMetric(name string) monitoring.Metric {
1741
func TimerMS(metric *monitoring.Metric) func() {
1842
start := time.Now()
1943
return func() {
20-
metric.AddValueInt64("execution_time_ms", time.Since(start).Milliseconds())
44+
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
2145
}
2246
}
2347

2448
func TimerNS(metric *monitoring.Metric) func() {
2549
start := time.Now()
2650
return func() {
27-
metric.AddValueInt64("execution_time_ns", time.Since(start).Nanoseconds())
51+
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
2852
}
2953
}

common/ecsmetrics/metrics.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
@@ -38,9 +62,9 @@ func gather() monitoring.Metric {
3862
for _, sample := range samples {
3963
switch sample.Value.Kind() {
4064
case internalmetrics.KindUint64:
41-
metric.AddValueUInt64(sample.Name, sample.Value.Uint64())
65+
metric.SetFieldUInt64(sample.Name, sample.Value.Uint64())
4266
case internalmetrics.KindFloat64:
43-
metric.AddValueFloat64(sample.Name, sample.Value.Float64())
67+
metric.SetFieldFloat64(sample.Name, sample.Value.Float64())
4468
case internalmetrics.KindFloat64Histogram:
4569
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
4670
continue

common/ecsmetrics/metrics_test.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,30 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
4-
"fmt"
528
"testing"
629
"time"
730

@@ -17,12 +40,11 @@ func measureFunc(metric *monitoring.Metric) {
1740
func TestSimpleStartStop(t *testing.T) {
1841
metric := NewMetric("test")
1942
measureFunc(&metric)
20-
fmt.Println(metric.Values["execution_time_ms"])
21-
fmt.Println(metric.Values["execution_time_ns"])
22-
if metric.Values["execution_time_ms"].(int64) < 100 {
43+
fields := metric.GetFields()
44+
if fields["execution_time_ms"].(int64) < 100 {
2345
t.Error("wrong milliseconds")
2446
}
25-
if metric.Values["execution_time_ns"].(int64) < 100000000 {
47+
if fields["execution_time_ns"].(int64) < 100000000 {
2648
t.Error("wrong nanoseconds")
2749
}
2850
}

common/event/writer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
105105
writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
106106
defer ecsmetrics.TimerNS(metric)()
107107
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
108-
metric.AddValueUInt64("messages_failed", uint64(len(messages)))
108+
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
109109
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
110110
}
111111
}
@@ -145,8 +145,8 @@ func (w *KafkaWriter) writingLoop() {
145145
}
146146

147147
metric := w.newMetric(KAFKAWRITER)
148-
metric.AddValueUInt64("messages_sent", uint64(len(messagesToSend)))
149-
metric.AddValueUInt64("messages_failed", 0)
148+
metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend)))
149+
metric.SetFieldUInt64("messages_failed", 0)
150150

151151
w.writeFunction(messagesToSend, &metric)
152152

common/monitoring/metric.go

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package monitoring
226

327
import (
@@ -10,58 +34,70 @@ import (
1034

1135
type (
1236
Tag struct {
13-
Name string
14-
Value string
37+
name string
38+
value string
1539
}
1640

1741
TagsType []Tag
18-
ValuesType map[string]interface{}
42+
FieldsType map[string]any
1943
)
2044

2145
type Metric struct {
22-
Name string
23-
Values ValuesType
24-
Tags TagsType
25-
Timestamp time.Time
46+
name string
47+
fields FieldsType
48+
tags TagsType
49+
timestamp time.Time
50+
}
51+
52+
func NewMetric(name string, timestamp time.Time) Metric {
53+
return Metric{
54+
name: name, timestamp: timestamp,
55+
}
56+
}
57+
58+
func (metric *Metric) GetFields() (fields FieldsType) {
59+
for fieldName, field := range metric.fields {
60+
fields[fieldName] = field
61+
}
62+
return
2663
}
2764

2865
func (metric *Metric) AddTag(tagName string, value string) {
29-
metric.Tags = append(metric.Tags, Tag{Name: tagName, Value: value})
66+
metric.tags = append(metric.tags, Tag{name: tagName, value: value})
3067
}
3168

32-
func (metric *Metric) addValue(valueName string, value interface{}) {
33-
if metric.Values == nil {
34-
metric.Values = make(ValuesType)
69+
func (metric *Metric) setField(fieldName string, field any) {
70+
if metric.fields == nil {
71+
metric.fields = make(FieldsType)
3572
}
36-
metric.Values[valueName] = value
73+
metric.fields[fieldName] = field
3774
}
3875

39-
func (metric *Metric) AddValueInt64(valueName string, value int64) {
40-
metric.addValue(valueName, value)
76+
func (metric *Metric) SetFieldInt64(fieldName string, field int64) {
77+
metric.setField(fieldName, field)
4178
}
4279

43-
func (metric *Metric) AddValueUInt64(valueName string, value uint64) {
44-
metric.addValue(valueName, value)
80+
func (metric *Metric) SetFieldUInt64(fieldName string, field uint64) {
81+
metric.setField(fieldName, field)
4582
}
4683

47-
func (metric *Metric) AddValueFloat64(valueName string, value float64) {
48-
metric.addValue(valueName, value)
84+
func (metric *Metric) SetFieldFloat64(fieldName string, field float64) {
85+
metric.setField(fieldName, field)
4986
}
5087

51-
func (metric *Metric) MergeValues(other *Metric) {
52-
for valueName, value := range other.Values {
53-
if storedValue, ok := metric.Values[valueName]; ok {
54-
switch v := value.(type) {
88+
func (metric *Metric) MergeFields(other *Metric) {
89+
for fieldName, field := range other.fields {
90+
if storedField, ok := metric.fields[fieldName]; ok {
91+
switch v := field.(type) {
5592
case int64:
56-
metric.Values[valueName] = v + storedValue.(int64)
93+
metric.fields[fieldName] = v + storedField.(int64)
5794
case uint64:
58-
metric.Values[valueName] = v + storedValue.(uint64)
95+
metric.fields[fieldName] = v + storedField.(uint64)
5996
case float64:
60-
metric.Values[valueName] = v + storedValue.(float64)
61-
case string:
97+
metric.fields[fieldName] = v + storedField.(float64)
6298
}
6399
} else {
64-
metric.Values[valueName] = value
100+
metric.fields[fieldName] = field
65101
}
66102
}
67103
}
@@ -70,16 +106,16 @@ func Format(writer io.Writer, metrics []Metric) error {
70106
var enc lp.Encoder
71107

72108
for _, metric := range metrics {
73-
enc.StartLine(metric.Name)
74-
for _, tag := range metric.Tags {
75-
enc.AddTag(tag.Name, tag.Value)
109+
enc.StartLine(metric.name)
110+
for _, tag := range metric.tags {
111+
enc.AddTag(tag.name, tag.value)
76112
}
77113

78-
for valueName, value := range metric.Values {
79-
// we cannot panic as we provide accessors only for allowed type with AddValue*
80-
enc.AddField(valueName, lp.MustNewValue(value))
114+
for fieldName, field := range metric.fields {
115+
// we cannot panic as we provide accessors only for allowed type with AddField*
116+
enc.AddField(fieldName, lp.MustNewValue(field))
81117
}
82-
enc.EndLine(metric.Timestamp)
118+
enc.EndLine(metric.timestamp)
83119
}
84120

85121
if err := enc.Err(); err != nil {

common/monitoring/metricsaggregate.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package monitoring
226

327
import (
@@ -27,20 +51,20 @@ func (this *MetricsAggregate) AddMetric(metric *Metric) {
2751
metricNameTagsToHash(&this.hash, metric)
2852
hashKey := hashValueAndReset(&this.hash)
2953

30-
k := key{nameTagsHash: hashKey, timestamp: time.Unix(metric.Timestamp.Unix(), 0)}
54+
k := key{nameTagsHash: hashKey, timestamp: time.Unix(metric.timestamp.Unix(), 0)}
3155
if storedMetric, ok := this.metricsBuckets[k]; ok {
32-
storedMetric.MergeValues(metric)
56+
storedMetric.MergeFields(metric)
3357
} else {
3458
this.metricsBuckets[k] = metric
3559
}
3660
}
3761

3862
func metricNameTagsToHash(hash *maphash.Hash, metric *Metric) {
39-
hash.WriteString(metric.Name)
63+
hash.WriteString(metric.name)
4064

41-
for _, tag := range metric.Tags {
42-
hash.WriteString(tag.Name)
43-
hash.WriteString(tag.Value)
65+
for _, tag := range metric.tags {
66+
hash.WriteString(tag.name)
67+
hash.WriteString(tag.value)
4468
}
4569
}
4670

@@ -58,7 +82,7 @@ func (this *MetricsAggregate) Clear() {
5882
func (this *MetricsAggregate) GetMetrics() []Metric {
5983
var result []Metric
6084
for key, metric := range this.metricsBuckets {
61-
metric.Timestamp = key.timestamp
85+
metric.timestamp = key.timestamp
6286
result = append(result, *metric)
6387
}
6488
return result

0 commit comments

Comments
 (0)