Skip to content

Commit f67681e

Browse files
author
Michal Tichák
committed
[core] Added monitoring to hooks and communication with outside services
1 parent c187adf commit f67681e

File tree

10 files changed

+300
-20
lines changed

10 files changed

+300
-20
lines changed

common/event/writer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
110110
metric.SetFieldUInt64("messages_failed", 0)
111111

112112
metricDuration := writer.newMetric(KAFKAWRITER)
113-
defer monitoring.SendHistogrammable(&metricDuration)
114-
defer monitoring.TimerNS(&metricDuration)()
113+
defer monitoring.TimerSend(&metricDuration, monitoring.Nanoseconds)()
115114

116115
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
117116
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
@@ -250,8 +249,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
250249

251250
metric := w.newMetric(KAFKAPREPARE)
252251

253-
defer monitoring.SendHistogrammable(&metric)
254-
defer monitoring.TimerNS(&metric)()
252+
defer monitoring.TimerSend(&metric, monitoring.Nanoseconds)()
255253

256254
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
257255
if err != nil {

common/monitoring/monitoring_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ func TestMetricsHistogramObject(t *testing.T) {
546546
}
547547

548548
func measureFunc(metric *Metric) {
549-
defer TimerMS(metric)()
550-
defer TimerNS(metric)()
549+
defer Timer(metric, Milliseconds)()
550+
defer Timer(metric, Nanoseconds)()
551551
time.Sleep(100 * time.Millisecond)
552552
}
553553

common/monitoring/timer.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,42 @@
11
package monitoring
22

3-
import "time"
3+
import (
4+
"time"
5+
)
46

5-
// Timer* functions are meant to be used with defer statement to measure runtime of given function:
6-
// defer TimerNS(&metric)()
7-
func TimerMS(metric *Metric) func() {
8-
start := time.Now()
9-
return func() {
10-
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
11-
}
7+
type TimeUnit int
8+
9+
const (
10+
Milliseconds TimeUnit = iota
11+
Nanoseconds
12+
)
13+
14+
// Timer function is meant to be used with defer statement to measure runtime of given function:
15+
// defer Timer(&metric, Milliseconds)()
16+
func Timer(metric *Metric, unit TimeUnit) func() {
17+
return timer(metric, unit, false)
1218
}
1319

14-
func TimerNS(metric *Metric) func() {
20+
// TimerSend function is meant to be used with defer statement to measure runtime of given function:
21+
// defer TimerSend(&metric, Milliseconds)()
22+
func TimerSend(metric *Metric, unit TimeUnit) func() {
23+
return timer(metric, unit, true)
24+
}
25+
26+
func timer(metric *Metric, unit TimeUnit, send bool) func() {
1527
start := time.Now()
28+
1629
return func() {
17-
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
30+
dur := time.Since(start)
31+
// we are setting default value as Nanoseconds
32+
if unit == Milliseconds {
33+
metric.SetFieldInt64("execution_time_ms", dur.Milliseconds())
34+
} else {
35+
metric.SetFieldInt64("execution_time_ns", dur.Nanoseconds())
36+
}
37+
38+
if send {
39+
SendHistogrammable(metric)
40+
}
1841
}
1942
}

core/environment/environment.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/AliceO2Group/Control/common/gera"
4343
"github.com/AliceO2Group/Control/common/logger"
4444
"github.com/AliceO2Group/Control/common/logger/infologger"
45+
"github.com/AliceO2Group/Control/common/monitoring"
4546
pb "github.com/AliceO2Group/Control/common/protos"
4647
"github.com/AliceO2Group/Control/common/runtype"
4748
"github.com/AliceO2Group/Control/common/system"
@@ -647,6 +648,11 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig
647648
hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger)
648649
callsMapForAwait := env.callsPendingAwait[trigger]
649650

651+
metric := monitoring.NewMetric("hooks")
652+
metric.AddTag("trigger", trigger)
653+
metric.AddTag("envId", env.id.String())
654+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
655+
650656
allWeightsSet := make(callable.HooksMap)
651657
for k := range hooksMapForTrigger {
652658
allWeightsSet[k] = callable.Hooks{}

core/integration/ccdb/plugin.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"time"
3838

3939
"github.com/AliceO2Group/Control/common/logger/infologger"
40+
"github.com/AliceO2Group/Control/common/monitoring"
4041

4142
"github.com/AliceO2Group/Control/core/environment"
4243

@@ -109,7 +110,6 @@ func getFlpIdList(envId string) (flps []string, err error) {
109110
}
110111

111112
func NewGRPObject(varStack map[string]string) *GeneralRunParameters {
112-
113113
envId, ok := varStack["environment_id"]
114114
if !ok {
115115
log.WithField("level", infologger.IL_Support).
@@ -366,8 +366,8 @@ func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack ma
366366

367367
func (p *Plugin) NewCcdbGrpWriteCommand(grp *GeneralRunParameters, ccdbUrl string, refresh bool) (cmd string, err error) {
368368
// o2-ecs-grp-create -h
369-
//Create GRP-ECS object and upload to the CCDB
370-
//Usage:
369+
// Create GRP-ECS object and upload to the CCDB
370+
// Usage:
371371
// o2-ecs-grp-create:
372372
// -h [ --help ] Print this help message
373373
// -p [ --period ] arg data taking period
@@ -524,7 +524,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
524524
}
525525

526526
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
527-
528527
if grp == nil {
529528
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
530529
}
@@ -547,6 +546,10 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre
547546
ctx, cancel := context.WithTimeout(context.Background(), timeoutSeconds*time.Second)
548547
defer cancel()
549548

549+
metric := monitoring.NewMetric("ccdb")
550+
metric.AddTag("envId", envId)
551+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
552+
550553
cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)
551554
// execute the DPL command in the repo of the workflow used
552555
cmd.Dir = "/tmp"

core/integration/dcs/client.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/common/monitoring"
3233
dcspb "github.com/AliceO2Group/Control/core/integration/dcs/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -40,6 +41,10 @@ import (
4041

4142
var log = logger.New(logrus.StandardLogger(), "dcsclient")
4243

44+
func newMetric() monitoring.Metric {
45+
return monitoring.NewMetric("dcs")
46+
}
47+
4348
type RpcClient struct {
4449
dcspb.ConfiguratorClient
4550
conn *grpc.ClientConn
@@ -132,3 +137,38 @@ func (m *RpcClient) Close() error {
132137
m.cancel()
133138
return m.conn.Close()
134139
}
140+
141+
func (m *RpcClient) Subscribe(ctx context.Context, in *dcspb.SubscriptionRequest, opts ...grpc.CallOption) (dcspb.Configurator_SubscribeClient, error) {
142+
metric := newMetric()
143+
metric.AddTag("method", "Subscribe")
144+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
145+
return m.ConfiguratorClient.Subscribe(ctx, in, opts...)
146+
}
147+
148+
func (m *RpcClient) PrepareForRun(ctx context.Context, in *dcspb.PfrRequest, opts ...grpc.CallOption) (dcspb.Configurator_PrepareForRunClient, error) {
149+
metric := newMetric()
150+
metric.AddTag("method", "PFR")
151+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
152+
return m.ConfiguratorClient.PrepareForRun(ctx, in, opts...)
153+
}
154+
155+
func (m *RpcClient) StartOfRun(ctx context.Context, in *dcspb.SorRequest, opts ...grpc.CallOption) (dcspb.Configurator_StartOfRunClient, error) {
156+
metric := newMetric()
157+
metric.AddTag("method", "SOR")
158+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
159+
return m.ConfiguratorClient.StartOfRun(ctx, in, opts...)
160+
}
161+
162+
func (m *RpcClient) EndOfRun(ctx context.Context, in *dcspb.EorRequest, opts ...grpc.CallOption) (dcspb.Configurator_EndOfRunClient, error) {
163+
metric := newMetric()
164+
metric.AddTag("method", "EOR")
165+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
166+
return m.ConfiguratorClient.EndOfRun(ctx, in, opts...)
167+
}
168+
169+
func (m *RpcClient) GetStatus(ctx context.Context, in *dcspb.StatusRequest, opts ...grpc.CallOption) (*dcspb.StatusReply, error) {
170+
metric := newMetric()
171+
metric.AddTag("method", "GetStatus")
172+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
173+
return m.ConfiguratorClient.GetStatus(ctx, in, opts...)
174+
}

core/integration/ddsched/client.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/common/monitoring"
3233
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -40,6 +41,10 @@ import (
4041

4142
var log = logger.New(logrus.StandardLogger(), "ddschedclient")
4243

44+
func newMetric() monitoring.Metric {
45+
return monitoring.NewMetric("ddsched")
46+
}
47+
4348
type RpcClient struct {
4449
ddpb.DataDistributionControlClient
4550
conn *grpc.ClientConn
@@ -131,3 +136,24 @@ func (m *RpcClient) Close() error {
131136
m.cancel()
132137
return m.conn.Close()
133138
}
139+
140+
func (m *RpcClient) PartitionInitialize(ctx context.Context, in *ddpb.PartitionInitRequest, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
141+
metric := newMetric()
142+
metric.AddTag("method", "PartitionInitialize")
143+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
144+
return m.DataDistributionControlClient.PartitionInitialize(ctx, in, opts...)
145+
}
146+
147+
func (m *RpcClient) PartitionTerminate(ctx context.Context, in *ddpb.PartitionTermRequest, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
148+
metric := newMetric()
149+
metric.AddTag("method", "PartitionTerminate")
150+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
151+
return m.DataDistributionControlClient.PartitionTerminate(ctx, in, opts...)
152+
}
153+
154+
func (m *RpcClient) PartitionStatus(ctx context.Context, in *ddpb.PartitionInfo, opts ...grpc.CallOption) (*ddpb.PartitionResponse, error) {
155+
metric := newMetric()
156+
metric.AddTag("method", "PartitionStatus")
157+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
158+
return m.DataDistributionControlClient.PartitionStatus(ctx, in, opts...)
159+
}

core/integration/odc/client.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/AliceO2Group/Control/common/logger"
32+
"github.com/AliceO2Group/Control/common/monitoring"
3233
odcpb "github.com/AliceO2Group/Control/core/integration/odc/protos"
3334
"github.com/sirupsen/logrus"
3435
"github.com/spf13/viper"
@@ -40,6 +41,10 @@ import (
4041

4142
var log = logger.New(logrus.StandardLogger(), "odcclient")
4243

44+
func newMetric() monitoring.Metric {
45+
return monitoring.NewMetric("odc")
46+
}
47+
4348
type RpcClient struct {
4449
odcpb.ODCClient
4550
conn *grpc.ClientConn
@@ -131,3 +136,101 @@ func (m *RpcClient) Close() error {
131136
m.cancel()
132137
return m.conn.Close()
133138
}
139+
140+
func (m *RpcClient) Initialize(ctx context.Context, in *odcpb.InitializeRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
141+
metric := newMetric()
142+
metric.AddTag("method", "Initialize")
143+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
144+
return m.ODCClient.Initialize(ctx, in, opts...)
145+
}
146+
147+
func (m *RpcClient) Submit(ctx context.Context, in *odcpb.SubmitRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
148+
metric := newMetric()
149+
metric.AddTag("method", "Submit")
150+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
151+
return m.ODCClient.Submit(ctx, in, opts...)
152+
}
153+
154+
func (m *RpcClient) Activate(ctx context.Context, in *odcpb.ActivateRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
155+
metric := newMetric()
156+
metric.AddTag("method", "Activate")
157+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
158+
return m.ODCClient.Activate(ctx, in, opts...)
159+
}
160+
161+
func (m *RpcClient) Run(ctx context.Context, in *odcpb.RunRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
162+
metric := newMetric()
163+
metric.AddTag("method", "Run")
164+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
165+
return m.ODCClient.Run(ctx, in, opts...)
166+
}
167+
168+
func (m *RpcClient) Update(ctx context.Context, in *odcpb.UpdateRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
169+
metric := newMetric()
170+
metric.AddTag("method", "Update")
171+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
172+
return m.ODCClient.Update(ctx, in, opts...)
173+
}
174+
175+
func (m *RpcClient) Configure(ctx context.Context, in *odcpb.ConfigureRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
176+
metric := newMetric()
177+
metric.AddTag("method", "Configure")
178+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
179+
return m.ODCClient.Configure(ctx, in, opts...)
180+
}
181+
182+
func (m *RpcClient) SetProperties(ctx context.Context, in *odcpb.SetPropertiesRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
183+
metric := newMetric()
184+
metric.AddTag("method", "SetProperties")
185+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
186+
return m.ODCClient.SetProperties(ctx, in, opts...)
187+
}
188+
189+
func (m *RpcClient) GetState(ctx context.Context, in *odcpb.StateRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
190+
metric := newMetric()
191+
metric.AddTag("method", "GetState")
192+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
193+
return m.ODCClient.GetState(ctx, in, opts...)
194+
}
195+
196+
func (m *RpcClient) Start(ctx context.Context, in *odcpb.StartRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
197+
metric := newMetric()
198+
metric.AddTag("method", "Start")
199+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
200+
return m.ODCClient.Start(ctx, in, opts...)
201+
}
202+
203+
func (m *RpcClient) Stop(ctx context.Context, in *odcpb.StopRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
204+
metric := newMetric()
205+
metric.AddTag("method", "Stop")
206+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
207+
return m.ODCClient.Stop(ctx, in, opts...)
208+
}
209+
210+
func (m *RpcClient) Reset(ctx context.Context, in *odcpb.ResetRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
211+
metric := newMetric()
212+
metric.AddTag("method", "Reset")
213+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
214+
return m.ODCClient.Reset(ctx, in, opts...)
215+
}
216+
217+
func (m *RpcClient) Terminate(ctx context.Context, in *odcpb.TerminateRequest, opts ...grpc.CallOption) (*odcpb.StateReply, error) {
218+
metric := newMetric()
219+
metric.AddTag("method", "Terminate")
220+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
221+
return m.ODCClient.Terminate(ctx, in, opts...)
222+
}
223+
224+
func (m *RpcClient) Shutdown(ctx context.Context, in *odcpb.ShutdownRequest, opts ...grpc.CallOption) (*odcpb.GeneralReply, error) {
225+
metric := newMetric()
226+
metric.AddTag("method", "Shutdown")
227+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
228+
return m.ODCClient.Shutdown(ctx, in, opts...)
229+
}
230+
231+
func (m *RpcClient) Status(ctx context.Context, in *odcpb.StatusRequest, opts ...grpc.CallOption) (*odcpb.StatusReply, error) {
232+
metric := newMetric()
233+
metric.AddTag("method", "Status")
234+
defer monitoring.TimerSend(&metric, monitoring.Milliseconds)()
235+
return m.ODCClient.Status(ctx, in, opts...)
236+
}

0 commit comments

Comments
 (0)