Skip to content

Commit 3a2d1aa

Browse files
author
Michal Tichák
committed
[dcs] sending SOR,PFR,EOR metrics per detector
1 parent 2d0bcb1 commit 3a2d1aa

File tree

1 file changed

+54
-9
lines changed

1 file changed

+54
-9
lines changed

core/integration/dcs/plugin.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -289,18 +289,18 @@ func (p *Plugin) Init(instanceId string) error {
289289
in := &dcspb.SubscriptionRequest{
290290
InstanceId: instanceId,
291291
}
292-
292+
293293
// Always start the goroutine, even if initial subscription fails
294294
go func() {
295295
var evStream dcspb.Configurator_SubscribeClient
296296
var err error
297-
297+
298298
for {
299299
// Try to establish subscription if we don't have one
300300
if evStream == nil {
301301
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
302302
Debug("attempting to subscribe to DCS service")
303-
303+
304304
evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
305305
if err != nil {
306306
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
@@ -1462,8 +1462,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14621462
payloadJsonForKafka []byte, stream dcspb.Configurator_EndOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
14631463
callFailedStr string, payload map[string]interface{},
14641464
) (error, []byte) {
1465-
metric := newMetric("EOR")
1466-
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
1465+
eor := "EOR"
1466+
detectorDurations := map[dcspb.Detector]time.Duration{}
1467+
start := time.Now()
1468+
1469+
wholeMetric := newMetric(eor)
1470+
wholeMetric.AddTag("detector", "All")
1471+
defer monitoring.TimerSendSingle(&wholeMetric, monitoring.Millisecond)()
14671472

14681473
var dcsEvent *dcspb.RunEvent
14691474
var err error
@@ -1576,6 +1581,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15761581
}
15771582

15781583
detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
1584+
detectorDurations[dcsEvent.GetDetector()] = time.Since(start)
15791585
ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())
15801586

15811587
if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE {
@@ -1687,15 +1693,23 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16871693
})
16881694
}
16891695
}
1696+
1697+
convertAndSendDetectorDurationsAndStates(eor, detectorStatusMap, detectorDurations, &wholeMetric)
1698+
16901699
return err, payloadJsonForKafka
16911700
}
16921701

16931702
func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
16941703
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
16951704
callFailedStr string, payload map[string]interface{},
16961705
) (error, []byte) {
1697-
metric := newMetric("SOR")
1698-
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
1706+
sor := "SOR"
1707+
detectorDurations := map[dcspb.Detector]time.Duration{}
1708+
start := time.Now()
1709+
1710+
wholeMetric := newMetric(sor)
1711+
wholeMetric.AddTag("detector", "All")
1712+
defer monitoring.TimerSendSingle(&wholeMetric, monitoring.Millisecond)()
16991713

17001714
var dcsEvent *dcspb.RunEvent
17011715
var err error
@@ -1808,6 +1822,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18081822
}
18091823

18101824
detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
1825+
detectorDurations[dcsEvent.GetDetector()] = time.Since(start)
18111826
ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())
18121827

18131828
if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
@@ -1956,15 +1971,41 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19561971
})
19571972
}
19581973
}
1974+
1975+
convertAndSendDetectorDurationsAndStates(sor, detectorStatusMap, detectorDurations, &wholeMetric)
1976+
19591977
return err, payloadJsonForKafka
19601978
}
19611979

1980+
func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, detectorDurations map[dcspb.Detector]time.Duration, wholeMetric *monitoring.Metric) {
1981+
resultsMap := make(map[dcspb.DetectorState]int)
1982+
for dcsDet, state := range detectorStatusMap {
1983+
metric := newMetric(method)
1984+
det := dcsToEcsDetector(dcsDet)
1985+
metric.AddTag("detector", det)
1986+
metric.AddTag("state", dcspb.DetectorState_name[int32(state)])
1987+
resultsMap[state] += 1
1988+
if duration, ok := detectorDurations[dcsDet]; ok {
1989+
metric.SetFieldInt64("execution_time_ms", duration.Milliseconds())
1990+
monitoring.Send(&metric)
1991+
}
1992+
}
1993+
for detectorState, detectorCount := range resultsMap {
1994+
wholeMetric.SetFieldInt64(dcspb.DetectorState_name[int32(detectorState)], int64(detectorCount))
1995+
}
1996+
}
1997+
19621998
func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
19631999
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
19642000
callFailedStr string, payload map[string]interface{},
19652001
) (error, []byte) {
1966-
metric := newMetric("PFR")
1967-
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
2002+
pfr := "PFR"
2003+
detectorDurations := map[dcspb.Detector]time.Duration{}
2004+
start := time.Now()
2005+
2006+
wholeMetric := newMetric(pfr)
2007+
wholeMetric.AddTag("detector", "All")
2008+
defer monitoring.TimerSendSingle(&wholeMetric, monitoring.Millisecond)()
19682009

19692010
var err error
19702011
var dcsEvent *dcspb.RunEvent
@@ -2078,6 +2119,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20782119
}
20792120

20802121
detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
2122+
detectorDurations[dcsEvent.GetDetector()] = time.Since(start)
20812123
ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())
20822124

20832125
if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
@@ -2225,6 +2267,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
22252267
})
22262268
}
22272269
}
2270+
2271+
convertAndSendDetectorDurationsAndStates(pfr, detectorStatusMap, detectorDurations, &wholeMetric)
2272+
22282273
return err, payloadJsonForKafka
22292274
}
22302275

0 commit comments

Comments
 (0)