Skip to content

Commit dbfc94a

Browse files
author
Michal Tichák
committed
[core] Added monitoring to hooks and communication with outside services
1 parent c187adf commit dbfc94a

File tree

11 files changed

+366
-20
lines changed

11 files changed

+366
-20
lines changed

common/event/writer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
110110
metric.SetFieldUInt64("messages_failed", 0)
111111

112112
metricDuration := writer.newMetric(KAFKAWRITER)
113-
defer monitoring.SendHistogrammable(&metricDuration)
114-
defer monitoring.TimerNS(&metricDuration)()
113+
defer monitoring.TimerSend(&metricDuration, monitoring.Nanoseconds)()
115114

116115
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
117116
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
@@ -250,8 +249,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
250249

251250
metric := w.newMetric(KAFKAPREPARE)
252251

253-
defer monitoring.SendHistogrammable(&metric)
254-
defer monitoring.TimerNS(&metric)()
252+
defer monitoring.TimerSend(&metric, monitoring.Nanoseconds)()
255253

256254
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
257255
if err != nil {

common/monitoring/metric.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package monitoring
2727
import (
2828
"fmt"
2929
"io"
30+
"sort"
3031
"time"
3132

3233
lp "github.com/influxdata/line-protocol/v2/lineprotocol"
@@ -110,6 +111,10 @@ func Format(writer io.Writer, metrics []Metric) error {
110111
var enc lp.Encoder
111112

112113
for _, metric := range metrics {
114+
// AddTag requires tags sorted lexicografically
115+
sort.Slice(metric.tags, func(i int, j int) bool {
116+
return metric.tags[i].name < metric.tags[j].name
117+
})
113118
enc.StartLine(metric.name)
114119
for _, tag := range metric.tags {
115120
enc.AddTag(tag.name, tag.value)

common/monitoring/monitoring_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ func TestMetricsHistogramObject(t *testing.T) {
546546
}
547547

548548
func measureFunc(metric *Metric) {
549-
defer TimerMS(metric)()
550-
defer TimerNS(metric)()
549+
defer Timer(metric, Milliseconds)()
550+
defer Timer(metric, Nanoseconds)()
551551
time.Sleep(100 * time.Millisecond)
552552
}
553553

common/monitoring/timer.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,42 @@
11
package monitoring
22

3-
import "time"
3+
import (
4+
"time"
5+
)
46

5-
// Timer* functions are meant to be used with defer statement to measure runtime of given function:
6-
// defer TimerNS(&metric)()
7-
func TimerMS(metric *Metric) func() {
8-
start := time.Now()
9-
return func() {
10-
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
11-
}
7+
type TimeUnit int
8+
9+
const (
10+
Milliseconds TimeUnit = iota
11+
Nanoseconds
12+
)
13+
14+
// Timer function is meant to be used with defer statement to measure runtime of given function:
15+
// defer Timer(&metric, Milliseconds)()
16+
func Timer(metric *Metric, unit TimeUnit) func() {
17+
return timer(metric, unit, false)
1218
}
1319

14-
func TimerNS(metric *Metric) func() {
20+
// TimerSend function is meant to be used with defer statement to measure runtime of given function:
21+
// defer TimerSend(&metric, Milliseconds)()
22+
func TimerSend(metric *Metric, unit TimeUnit) func() {
23+
return timer(metric, unit, true)
24+
}
25+
26+
func timer(metric *Metric, unit TimeUnit, send bool) func() {
1527
start := time.Now()
28+
1629
return func() {
17-
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
30+
dur := time.Since(start)
31+
// we are setting default value as Nanoseconds
32+
if unit == Milliseconds {
33+
metric.SetFieldInt64("execution_time_ms", dur.Milliseconds())
34+
} else {
35+
metric.SetFieldInt64("execution_time_ns", dur.Nanoseconds())
36+
}
37+
38+
if send {
39+
SendHistogrammable(metric)
40+
}
1841
}
1942
}

core/environment/environment.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/AliceO2Group/Control/common/gera"
4343
"github.com/AliceO2Group/Control/common/logger"
4444
"github.com/AliceO2Group/Control/common/logger/infologger"
45+
"github.com/AliceO2Group/Control/common/monitoring"
4546
pb "github.com/AliceO2Group/Control/common/protos"
4647
"github.com/AliceO2Group/Control/common/runtype"
4748
"github.com/AliceO2Group/Control/common/system"
@@ -647,6 +648,11 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig
647648
hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger)
648649
callsMapForAwait := env.callsPendingAwait[trigger]
649650

651+
metric := monitoring.NewMetric("hooks")
652+
metric.AddTag("trigger", trigger)
653+
metric.AddTag("envId", env.id.String())
654+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
655+
650656
allWeightsSet := make(callable.HooksMap)
651657
for k := range hooksMapForTrigger {
652658
allWeightsSet[k] = callable.Hooks{}

core/integration/ccdb/plugin.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"time"
3838

3939
"github.com/AliceO2Group/Control/common/logger/infologger"
40+
"github.com/AliceO2Group/Control/common/monitoring"
4041

4142
"github.com/AliceO2Group/Control/core/environment"
4243

@@ -109,7 +110,6 @@ func getFlpIdList(envId string) (flps []string, err error) {
109110
}
110111

111112
func NewGRPObject(varStack map[string]string) *GeneralRunParameters {
112-
113113
envId, ok := varStack["environment_id"]
114114
if !ok {
115115
log.WithField("level", infologger.IL_Support).
@@ -366,8 +366,8 @@ func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack ma
366366

367367
func (p *Plugin) NewCcdbGrpWriteCommand(grp *GeneralRunParameters, ccdbUrl string, refresh bool) (cmd string, err error) {
368368
// o2-ecs-grp-create -h
369-
//Create GRP-ECS object and upload to the CCDB
370-
//Usage:
369+
// Create GRP-ECS object and upload to the CCDB
370+
// Usage:
371371
// o2-ecs-grp-create:
372372
// -h [ --help ] Print this help message
373373
// -p [ --period ] arg data taking period
@@ -524,7 +524,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
524524
}
525525

526526
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
527-
528527
if grp == nil {
529528
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
530529
}
@@ -547,6 +546,10 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre
547546
ctx, cancel := context.WithTimeout(context.Background(), timeoutSeconds*time.Second)
548547
defer cancel()
549548

549+
metric := monitoring.NewMetric("ccdb")
550+
metric.AddTag("envId", envId)
551+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
552+
550553
cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)
551554
// execute the DPL command in the repo of the workflow used
552555
cmd.Dir = "/tmp"

core/integration/dcs/client.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/common/monitoring"
3233
dcspb "github.com/AliceO2Group/Control/core/integration/dcs/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -40,6 +41,10 @@ import (
4041

4142
var log = logger.New(logrus.StandardLogger(), "dcsclient")
4243

44+
func newMetric() monitoring.Metric {
45+
return monitoring.NewMetric("dcs")
46+
}
47+
4348
type RpcClient struct {
4449
dcspb.ConfiguratorClient
4550
conn *grpc.ClientConn
@@ -67,6 +72,7 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
6772
Timeout: time.Second,
6873
PermitWithoutStream: true,
6974
}),
75+
grpc.WithStreamInterceptor(setupStreamClientInterceptor()),
7076
}
7177
if !viper.GetBool("dcsServiceUseSystemProxy") {
7278
dialOptions = append(dialOptions, grpc.WithNoProxy())
@@ -121,6 +127,57 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
121127
return client
122128
}
123129

130+
type measuredClientStream struct {
131+
grpc.ClientStream
132+
method string
133+
}
134+
135+
func (t *measuredClientStream) RecvMsg(m interface{}) error {
136+
metric := monitoring.NewMetric("dcs_stream")
137+
metric.AddTag("method", t.method)
138+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
139+
err := t.ClientStream.RecvMsg(m)
140+
141+
return err
142+
}
143+
144+
func convertMethodName(method string) (converted string) {
145+
switch method {
146+
case dcspb.Configurator_Subscribe_FullMethodName:
147+
converted = "Subscribe"
148+
case dcspb.Configurator_PrepareForRun_FullMethodName:
149+
converted = "PFR"
150+
case dcspb.Configurator_StartOfRun_FullMethodName:
151+
converted = "SOR"
152+
case dcspb.Configurator_EndOfRun_FullMethodName:
153+
converted = "EOR"
154+
default:
155+
converted = "Unknown"
156+
}
157+
return
158+
}
159+
160+
func setupStreamClientInterceptor() grpc.StreamClientInterceptor {
161+
return func(
162+
ctx context.Context,
163+
desc *grpc.StreamDesc,
164+
cc *grpc.ClientConn,
165+
method string,
166+
streamer grpc.Streamer,
167+
opts ...grpc.CallOption,
168+
) (grpc.ClientStream, error) {
169+
clientStream, err := streamer(ctx, desc, cc, method, opts...)
170+
if err != nil {
171+
return nil, err
172+
}
173+
174+
return &measuredClientStream{
175+
ClientStream: clientStream,
176+
method: convertMethodName(method),
177+
}, nil
178+
}
179+
}
180+
124181
func (m *RpcClient) GetConnState() connectivity.State {
125182
if m.conn == nil {
126183
return connectivity.Idle
@@ -132,3 +189,38 @@ func (m *RpcClient) Close() error {
132189
m.cancel()
133190
return m.conn.Close()
134191
}
192+
193+
func (m *RpcClient) Subscribe(ctx context.Context, in *dcspb.SubscriptionRequest, opts ...grpc.CallOption) (dcspb.Configurator_SubscribeClient, error) {
194+
metric := newMetric()
195+
metric.AddTag("stream_setup", "Subscribe")
196+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
197+
return m.ConfiguratorClient.Subscribe(ctx, in, opts...)
198+
}
199+
200+
func (m *RpcClient) PrepareForRun(ctx context.Context, in *dcspb.PfrRequest, opts ...grpc.CallOption) (dcspb.Configurator_PrepareForRunClient, error) {
201+
metric := newMetric()
202+
metric.AddTag("stream_setup", "PFR")
203+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
204+
return m.ConfiguratorClient.PrepareForRun(ctx, in, opts...)
205+
}
206+
207+
func (m *RpcClient) StartOfRun(ctx context.Context, in *dcspb.SorRequest, opts ...grpc.CallOption) (dcspb.Configurator_StartOfRunClient, error) {
208+
metric := newMetric()
209+
metric.AddTag("stream_setup", "SOR")
210+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
211+
return m.ConfiguratorClient.StartOfRun(ctx, in, opts...)
212+
}
213+
214+
func (m *RpcClient) EndOfRun(ctx context.Context, in *dcspb.EorRequest, opts ...grpc.CallOption) (dcspb.Configurator_EndOfRunClient, error) {
215+
metric := newMetric()
216+
metric.AddTag("stream_setup", "EOR")
217+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
218+
return m.ConfiguratorClient.EndOfRun(ctx, in, opts...)
219+
}
220+
221+
func (m *RpcClient) GetStatus(ctx context.Context, in *dcspb.StatusRequest, opts ...grpc.CallOption) (*dcspb.StatusReply, error) {
222+
metric := newMetric()
223+
metric.AddTag("method", "GetStatus")
224+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
225+
return m.ConfiguratorClient.GetStatus(ctx, in, opts...)
226+
}

core/integration/ddsched/client.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/common/monitoring"
3233
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -40,6 +41,10 @@ import (
4041

4142
var log = logger.New(logrus.StandardLogger(), "ddschedclient")
4243

44+
func newMetric() monitoring.Metric {
45+
return monitoring.NewMetric("ddsched")
46+
}
47+
4348
type RpcClient struct {
4449
ddpb.DataDistributionControlClient
4550
conn *grpc.ClientConn
@@ -131,3 +136,24 @@ func (m *RpcClient) Close() error {
131136
m.cancel()
132137
return m.conn.Close()
133138
}
139+
140+
func (m *RpcClient) PartitionInitialize(ctx context.Context, in *ddpb.PartitionInitRequest, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
141+
metric := newMetric()
142+
metric.AddTag("method", "PartitionInitialize")
143+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
144+
return m.DataDistributionControlClient.PartitionInitialize(ctx, in, opts...)
145+
}
146+
147+
func (m *RpcClient) PartitionTerminate(ctx context.Context, in *ddpb.PartitionTermRequest, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
148+
metric := newMetric()
149+
metric.AddTag("method", "PartitionTerminate")
150+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
151+
return m.DataDistributionControlClient.PartitionTerminate(ctx, in, opts...)
152+
}
153+
154+
func (m *RpcClient) PartitionStatus(ctx context.Context, in *ddpb.PartitionInfo, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
155+
metric := newMetric()
156+
metric.AddTag("method", "PartitionStatus")
157+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
158+
return m.DataDistributionControlClient.PartitionStatus(ctx, in, opts...)
159+
}

0 commit comments

Comments
 (0)