Skip to content

Commit 6db969a

Browse files
Michal Tichákjustonedev1
authored andcommitted
[monitoring] OCTRL-1042: Add run type to metrics calls
1 parent 01b9d2e commit 6db969a

File tree

9 files changed

+128
-57
lines changed

9 files changed

+128
-57
lines changed

common/monitoring/grpcinterceptor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,33 @@ import (
3030
"google.golang.org/grpc"
3131
)
3232

33+
type (
34+
EnvIDKey struct{}
35+
RunTypeKey struct{}
36+
)
37+
38+
func AddEnvAndRunType(ctx context.Context, envId, runType string) context.Context {
39+
ctx = context.WithValue(ctx, EnvIDKey{}, envId)
40+
ctx = context.WithValue(ctx, RunTypeKey{}, runType)
41+
return ctx
42+
}
43+
3344
type measuredClientStream struct {
3445
grpc.ClientStream
46+
ctx context.Context
3547
method string
3648
metricName string
3749
}
3850

3951
func (t *measuredClientStream) RecvMsg(m interface{}) error {
4052
metric := NewMetric(t.metricName)
4153
metric.AddTag("method", t.method)
54+
if env, ok := t.ctx.Value(EnvIDKey{}).(string); ok {
55+
metric.AddTag("envId", env)
56+
}
57+
if rt, ok := t.ctx.Value(RunTypeKey{}).(string); ok {
58+
metric.AddTag("runtype", rt)
59+
}
4260
defer TimerSendSingle(&metric, Millisecond)()
4361

4462
err := t.ClientStream.RecvMsg(m)
@@ -63,6 +81,7 @@ func SetupStreamClientInterceptor(metricName string, convert NameConvertType) gr
6381

6482
return &measuredClientStream{
6583
ClientStream: clientStream,
84+
ctx: ctx,
6685
method: convert(method),
6786
metricName: metricName,
6887
}, nil
@@ -73,6 +92,12 @@ func SetupUnaryClientInterceptor(name string, convert NameConvertType) grpc.Unar
7392
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
7493
metric := NewMetric(name)
7594
metric.AddTag("method", convert(method))
95+
if env, ok := ctx.Value(EnvIDKey{}).(string); ok {
96+
metric.AddTag("envId", env)
97+
}
98+
if rt, ok := ctx.Value(RunTypeKey{}).(string); ok {
99+
metric.AddTag("runtype", rt)
100+
}
76101
defer TimerSendSingle(&metric, Millisecond)()
77102
return invoker(ctx, method, req, reply, cc, opts...)
78103
}

core/environment/environment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig
669669
metric := monitoring.NewMetric("hooks")
670670
metric.AddTag("trigger", trigger)
671671
metric.AddTag("envId", env.id.String())
672+
metric.AddTag("runtype", env.GetRunType().String())
672673
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
673674

674675
allWeightsSet := make(callable.HooksMap)

core/integration/ccdb/plugin.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
481481
return
482482
}
483483
p.existingRuns[grp.runNumber] = types.Nil{}
484-
err := p.uploadCurrentGRP(grp, envId, true)
484+
err := p.uploadCurrentGRP(grp, envId, true, varStack, "RunStart")
485485
if err != nil {
486486
log.WithField("call", "RunStop").
487487
WithField("run", grp.runNumber).
@@ -506,7 +506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
506506
_, runExists := p.existingRuns[grp.runNumber]
507507
if runExists {
508508
delete(p.existingRuns, grp.runNumber)
509-
err := p.uploadCurrentGRP(grp, envId, false)
509+
err := p.uploadCurrentGRP(grp, envId, false, varStack, "RunStop")
510510
if err != nil {
511511
log.WithField("call", "RunStop").
512512
WithField("run", grp.runNumber).
@@ -525,7 +525,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
525525
return
526526
}
527527

528-
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
528+
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool, varStack map[string]string, callName string) error {
529529
if grp == nil {
530530
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
531531
}
@@ -550,6 +550,8 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre
550550

551551
metric := monitoring.NewMetric("ccdb")
552552
metric.AddTag("envId", envId)
553+
metric.AddTag("runtype", integration.ExtractRunTypeOrUndefined(varStack))
554+
metric.AddTag("call", callName)
553555
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
554556

555557
cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)

core/integration/dcs/plugin.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,18 +289,18 @@ func (p *Plugin) Init(instanceId string) error {
289289
in := &dcspb.SubscriptionRequest{
290290
InstanceId: instanceId,
291291
}
292-
292+
293293
// Always start the goroutine, even if initial subscription fails
294294
go func() {
295295
var evStream dcspb.Configurator_SubscribeClient
296296
var err error
297-
297+
298298
for {
299299
// Try to establish subscription if we don't have one
300300
if evStream == nil {
301301
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
302302
Debug("attempting to subscribe to DCS service")
303-
303+
304304
evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
305305
if err != nil {
306306
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
@@ -699,7 +699,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
699699

700700
var stream dcspb.Configurator_StartOfRunClient
701701
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId)
702-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
702+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
703703
defer cancel()
704704

705705
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -746,7 +746,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
746746
return
747747
}
748748

749-
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
749+
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
750+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
750751

751752
dcsFailedEcsDetectors := make([]string, 0)
752753
dcsopOk := true
@@ -1064,7 +1065,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
10641065

10651066
var stream dcspb.Configurator_StartOfRunClient
10661067
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
1067-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1068+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
10681069
defer cancel()
10691070

10701071
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -1112,7 +1113,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11121113
}
11131114
p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later
11141115

1115-
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
1116+
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1117+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
11161118

11171119
dcsFailedEcsDetectors := make([]string, 0)
11181120
dcsopOk := true
@@ -1298,7 +1300,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12981300

12991301
var stream dcspb.Configurator_EndOfRunClient
13001302
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
1301-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1303+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
13021304
defer cancel()
13031305

13041306
payload := map[string]interface{}{
@@ -1356,7 +1358,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13561358
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
13571359
}
13581360

1359-
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
1361+
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1362+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
13601363

13611364
dcsFailedEcsDetectors := make([]string, 0)
13621365
dcsopOk := true
@@ -1452,17 +1455,19 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14521455
return
14531456
}
14541457

1455-
func newMetric(method string) monitoring.Metric {
1458+
func newMetric(runType, envId, method string) monitoring.Metric {
14561459
metric := monitoring.NewMetric("dcsecs")
14571460
metric.AddTag("method", method)
1461+
metric.AddTag("envId", envId)
1462+
metric.AddTag("runtype", runType)
14581463
return metric
14591464
}
14601465

14611466
func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
14621467
payloadJsonForKafka []byte, stream dcspb.Configurator_EndOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1463-
callFailedStr string, payload map[string]interface{},
1468+
callFailedStr string, payload map[string]interface{}, runType string,
14641469
) (error, []byte) {
1465-
metric := newMetric("EOR")
1470+
metric := newMetric(runType, envId, "EOR")
14661471
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
14671472

14681473
var dcsEvent *dcspb.RunEvent
@@ -1692,9 +1697,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16921697

16931698
func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
16941699
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1695-
callFailedStr string, payload map[string]interface{},
1700+
callFailedStr string, payload map[string]interface{}, runType string,
16961701
) (error, []byte) {
1697-
metric := newMetric("SOR")
1702+
metric := newMetric(runType, envId, "SOR")
16981703
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
16991704

17001705
var dcsEvent *dcspb.RunEvent
@@ -1961,9 +1966,9 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19611966

19621967
func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
19631968
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
1964-
callFailedStr string, payload map[string]interface{},
1969+
callFailedStr string, payload map[string]interface{}, runType string,
19651970
) (error, []byte) {
1966-
metric := newMetric("PFR")
1971+
metric := newMetric(runType, envId, "PFR")
19671972
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
19681973

19691974
var err error

core/integration/ddsched/plugin.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
"github.com/AliceO2Group/Control/common/event/topic"
4242
"github.com/AliceO2Group/Control/common/logger/infologger"
43+
"github.com/AliceO2Group/Control/common/monitoring"
4344
pb "github.com/AliceO2Group/Control/common/protos"
4445
"github.com/AliceO2Group/Control/common/utils/uid"
4546
"github.com/AliceO2Group/Control/core/environment"
@@ -163,6 +164,7 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]map[string]s
163164
EnvironmentId: envId.String(),
164165
}
165166
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("ddschedStatusTimeout"))
167+
ctx = monitoring.AddEnvAndRunType(ctx, envId.String(), "none")
166168
state, err := p.ddSchedClient.PartitionStatus(ctx, &in, grpc.EmptyCallOption{})
167169
cancel()
168170
if err != nil {
@@ -321,11 +323,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
321323
return
322324
}
323325

324-
var (
325-
response *ddpb.PartitionResponse
326-
)
326+
var response *ddpb.PartitionResponse
327327
timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId)
328-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
328+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
329329
defer cancel()
330330

331331
payload := map[string]interface{}{
@@ -574,11 +574,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
574574
return
575575
}
576576

577-
var (
578-
response *ddpb.PartitionResponse
579-
)
577+
var response *ddpb.PartitionResponse
580578
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
581-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
579+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
582580
defer cancel()
583581

584582
payload := map[string]interface{}{
@@ -821,16 +819,14 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
821819
return
822820
}
823821

824-
var (
825-
response *ddpb.PartitionResponse
826-
)
822+
var response *ddpb.PartitionResponse
827823

828824
infoReq := ddpb.PartitionInfo{
829825
EnvironmentId: envId,
830826
PartitionId: envId,
831827
}
832828
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
833-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
829+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
834830
defer cancel()
835831

836832
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{

core/integration/odc/plugin.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (p *Plugin) GetConnectionState() string {
183183

184184
func (p *Plugin) queryPartitionStatus() {
185185
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
186-
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
186+
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
187187
defer cancel()
188188

189189
statusRep := &odc.StatusReply{}
@@ -238,7 +238,7 @@ func (p *Plugin) queryPartitionStatus() {
238238
go func(idx int, partId uid.ID) {
239239
defer wg.Done()
240240

241-
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
241+
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
242242
defer cancel()
243243

244244
odcPartStateRep, err := p.odcClient.GetState(ctx, &odc.StateRequest{
@@ -1179,7 +1179,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11791179

11801180
timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId)
11811181

1182-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1182+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
11831183
defer cancel()
11841184

11851185
err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{
@@ -1292,7 +1292,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12921292
}
12931293
}
12941294

1295-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1295+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
12961296
defer cancel()
12971297
err := handleConfigure(ctx, p.odcClient, arguments, paddingTimeout, envId, call)
12981298
if err != nil {
@@ -1314,7 +1314,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13141314

13151315
callFailedStr := "EPN Reset call failed"
13161316

1317-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1317+
ctx, cancel := integration.NewContext(envId, call.VarStack, timeout)
13181318
defer cancel()
13191319
err := handleReset(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13201320
if err != nil {
@@ -1343,7 +1343,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13431343

13441344
callFailedStr := "EPN PartitionTerminate call failed"
13451345

1346-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1346+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
13471347
defer cancel()
13481348
err := handlePartitionTerminate(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13491349
if err != nil {
@@ -1414,7 +1414,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14141414
arguments["original_run_number"] = originalRunNumber
14151415
}
14161416

1417-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1417+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14181418
defer cancel()
14191419
err = handleStart(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14201420
if err != nil {
@@ -1462,7 +1462,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14621462

14631463
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)
14641464

1465-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1465+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14661466
defer cancel()
14671467
err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14681468
if err != nil {
@@ -1486,7 +1486,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14861486

14871487
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "EnsureStop", envId)
14881488

1489-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1489+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14901490
defer cancel()
14911491

14921492
state, err := handleGetState(ctx, p.odcClient, envId)
@@ -1551,7 +1551,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15511551

15521552
callFailedStr := "EPN EnsureCleanup call failed"
15531553

1554-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1554+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15551555
defer cancel()
15561556
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15571557
if err != nil {
@@ -1572,7 +1572,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15721572

15731573
callFailedStr := "EPN PreDeploymentCleanup call failed"
15741574

1575-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1575+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15761576
defer cancel()
15771577
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, "", call)
15781578
if err != nil {
@@ -1593,7 +1593,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15931593

15941594
callFailedStr := "EPN EnsureCleanupLegacy call failed"
15951595

1596-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1596+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15971597
defer cancel()
15981598
err := handleCleanupLegacy(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15991599
if err != nil {

0 commit comments

Comments
 (0)