@@ -1469,6 +1469,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14691469) (error , []byte ) {
14701470 metric := newMetric (runType , envId , "EOR" )
14711471 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1472+ eor := "EOR"
1473+ detectorDurations := map [dcspb.Detector ]time.Duration {}
1474+ start := time .Now ()
1475+
1476+ wholeMetric := newMetric (runType , envId , eor )
1477+ wholeMetric .AddTag ("detector" , "All" )
1478+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
14721479
14731480 var dcsEvent * dcspb.RunEvent
14741481 var err error
@@ -1581,6 +1588,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15811588 }
15821589
15831590 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1591+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
15841592 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
15851593
15861594 if dcsEvent .GetState () == dcspb .DetectorState_EOR_FAILURE {
@@ -1692,15 +1700,23 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16921700 })
16931701 }
16941702 }
1703+
1704+ convertAndSendDetectorDurationsAndStates (eor , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
1705+
16951706 return err , payloadJsonForKafka
16961707}
16971708
16981709func SORgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
16991710 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
17001711 callFailedStr string , payload map [string ]interface {}, runType string ,
17011712) (error , []byte ) {
1702- metric := newMetric (runType , envId , "SOR" )
1703- defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1713+ sor := "SOR"
1714+ detectorDurations := map [dcspb.Detector ]time.Duration {}
1715+ start := time .Now ()
1716+
1717+ wholeMetric := newMetric (runType , envId , sor )
1718+ wholeMetric .AddTag ("detector" , "All" )
1719+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
17041720
17051721 var dcsEvent * dcspb.RunEvent
17061722 var err error
@@ -1813,6 +1829,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18131829 }
18141830
18151831 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1832+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
18161833 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
18171834
18181835 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -1961,15 +1978,41 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19611978 })
19621979 }
19631980 }
1981+
1982+ convertAndSendDetectorDurationsAndStates (sor , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
1983+
19641984 return err , payloadJsonForKafka
19651985}
19661986
1987+ func convertAndSendDetectorDurationsAndStates (method string , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState , detectorDurations map [dcspb.Detector ]time.Duration , envId , runType string , wholeMetric * monitoring.Metric ) {
1988+ resultsMap := make (map [dcspb.DetectorState ]int )
1989+ for dcsDet , state := range detectorStatusMap {
1990+ metric := newMetric (runType , envId , method )
1991+ det := dcsToEcsDetector (dcsDet )
1992+ metric .AddTag ("detector" , det )
1993+ metric .AddTag ("state" , dcspb .DetectorState_name [int32 (state )])
1994+ resultsMap [state ] += 1
1995+ if duration , ok := detectorDurations [dcsDet ]; ok {
1996+ metric .SetFieldInt64 ("execution_time_ms" , duration .Milliseconds ())
1997+ monitoring .Send (& metric )
1998+ }
1999+ }
2000+ for detectorState , detectorCount := range resultsMap {
2001+ wholeMetric .SetFieldInt64 (dcspb .DetectorState_name [int32 (detectorState )], int64 (detectorCount ))
2002+ }
2003+ }
2004+
19672005func PFRgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
19682006 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
19692007 callFailedStr string , payload map [string ]interface {}, runType string ,
19702008) (error , []byte ) {
1971- metric := newMetric (runType , envId , "PFR" )
1972- defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
2009+ pfr := "PFR"
2010+ detectorDurations := map [dcspb.Detector ]time.Duration {}
2011+ start := time .Now ()
2012+
2013+ wholeMetric := newMetric (runType , envId , pfr )
2014+ wholeMetric .AddTag ("detector" , "All" )
2015+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
19732016
19742017 var err error
19752018 var dcsEvent * dcspb.RunEvent
@@ -2083,6 +2126,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20832126 }
20842127
20852128 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
2129+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
20862130 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
20872131
20882132 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -2230,6 +2274,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
22302274 })
22312275 }
22322276 }
2277+
2278+ convertAndSendDetectorDurationsAndStates (pfr , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
2279+
22332280 return err , payloadJsonForKafka
22342281}
22352282
0 commit comments