@@ -67,6 +67,12 @@ const (
6767 DCS_GENERAL_OP_TIMEOUT = 45 * time .Second
6868 DCS_TIME_FORMAT = "2006-01-02 15:04:05.000"
6969 TOPIC = topic .IntegratedService + topic .Separator + "dcs"
70+
71+ DCS_RESULT_OK = "ok"
72+ DCS_RESULT_TIMEOUT = "timeout"
73+ DCS_RESULT_GRPC_TIMEOUT = "gRPC_timeout"
74+ DCS_RESULT_GRPC_UNKNOWN = "gRPC_unknown"
75+ DCS_RESULT_GRPC_ERROR = "gRPC_error"
7076)
7177
7278type Plugin struct {
@@ -1469,6 +1475,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14691475) (error , []byte ) {
14701476 metric := newMetric (runType , envId , "EOR" )
14711477 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1478+ eor := "EOR"
1479+ durationsPerDetector := map [dcspb.Detector ]time.Duration {}
1480+ start := time .Now ()
1481+
1482+ totalCallDurationMetric := newMetric (runType , envId , eor )
1483+ totalCallDurationMetric .AddTag ("detector" , "All" )
1484+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
14721485
14731486 var dcsEvent * dcspb.RunEvent
14741487 var err error
@@ -1488,13 +1501,15 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14881501 Error : err .Error (),
14891502 })
14901503
1504+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
14911505 break
14921506 }
14931507 dcsEvent , err = stream .Recv ()
14941508 if errors .Is (err , io .EOF ) { // correct stream termination
14951509 log .Debug ("DCS EOR event stream was closed from the DCS side (EOF)" )
14961510 err = nil
14971511
1512+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
14981513 break // no more data
14991514 }
15001515 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1514,6 +1529,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15141529 Error : err .Error (),
15151530 })
15161531
1532+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
15171533 break
15181534 }
15191535 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1535,6 +1551,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15351551 Payload : string (payloadJsonForKafka [:]),
15361552 Error : err .Error (),
15371553 })
1554+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
15381555
15391556 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
15401557 logMsg := "bad DCS EOR event received, any future DCS events are ignored"
@@ -1551,6 +1568,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15511568 Payload : string (payloadJsonForKafka [:]),
15521569 Error : logMsg ,
15531570 })
1571+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
15541572 } else { // some other gRPC error code
15551573 log .WithError (err ).
15561574 Debug ("DCS EOR call error" )
@@ -1566,6 +1584,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15661584 Payload : string (payloadJsonForKafka [:]),
15671585 Error : err .Error (),
15681586 })
1587+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
15691588 }
15701589
15711590 break
@@ -1581,6 +1600,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15811600 }
15821601
15831602 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1603+ durationsPerDetector [dcsEvent .GetDetector ()] = time .Since (start )
15841604 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
15851605
15861606 if dcsEvent .GetState () == dcspb .DetectorState_EOR_FAILURE {
@@ -1692,15 +1712,23 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16921712 })
16931713 }
16941714 }
1715+
1716+ convertAndSendDetectorDurationsAndStates (eor , detectorStatusMap , durationsPerDetector , envId , runType , & totalCallDurationMetric )
1717+
16951718 return err , payloadJsonForKafka
16961719}
16971720
16981721func SORgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
16991722 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
17001723 callFailedStr string , payload map [string ]interface {}, runType string ,
17011724) (error , []byte ) {
1702- metric := newMetric (runType , envId , "SOR" )
1703- defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1725+ sor := "SOR"
1726+ detectorDurations := map [dcspb.Detector ]time.Duration {}
1727+ start := time .Now ()
1728+
1729+ totalCallDurationMetric := newMetric (runType , envId , sor )
1730+ totalCallDurationMetric .AddTag ("detector" , "All" )
1731+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
17041732
17051733 var dcsEvent * dcspb.RunEvent
17061734 var err error
@@ -1720,13 +1748,15 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17201748 Error : err .Error (),
17211749 })
17221750
1751+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
17231752 break
17241753 }
17251754 dcsEvent , err = stream .Recv ()
17261755 if errors .Is (err , io .EOF ) { // correct stream termination
17271756 log .Debug ("DCS SOR event stream was closed from the DCS side (EOF)" )
17281757 err = nil
17291758
1759+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
17301760 break // no more data
17311761 }
17321762 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1746,6 +1776,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17461776 Error : err .Error (),
17471777 })
17481778
1779+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
17491780 break
17501781 }
17511782 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1768,6 +1799,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17681799 Error : err .Error (),
17691800 })
17701801
1802+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
17711803 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
17721804 logMsg := "bad DCS SOR event received, any future DCS events are ignored"
17731805 log .WithError (err ).
@@ -1783,6 +1815,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17831815 Payload : string (payloadJsonForKafka [:]),
17841816 Error : logMsg ,
17851817 })
1818+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
17861819 } else { // some other gRPC error code
17871820 log .WithError (err ).
17881821 Debug ("DCS SOR call error" )
@@ -1798,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17981831 Payload : string (payloadJsonForKafka [:]),
17991832 Error : err .Error (),
18001833 })
1834+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
18011835 }
18021836
18031837 break
@@ -1813,6 +1847,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18131847 }
18141848
18151849 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1850+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
18161851 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
18171852
18181853 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -1961,15 +1996,45 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19611996 })
19621997 }
19631998 }
1999+
2000+ convertAndSendDetectorDurationsAndStates (sor , detectorStatusMap , detectorDurations , envId , runType , & totalCallDurationMetric )
2001+
19642002 return err , payloadJsonForKafka
19652003}
19662004
2005+ func convertAndSendDetectorDurationsAndStates (method string , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState , detectorDurations map [dcspb.Detector ]time.Duration , envId , runType string , totalCallMetric * monitoring.Metric ) {
2006+ resultsMap := make (map [dcspb.DetectorState ]int )
2007+ for dcsDet , state := range detectorStatusMap {
2008+ metric := newMetric (runType , envId , method )
2009+ det := dcsToEcsDetector (dcsDet )
2010+ metric .AddTag ("detector" , det )
2011+ metric .AddTag ("state" , dcspb .DetectorState_name [int32 (state )])
2012+ resultsMap [state ] += 1
2013+ if duration , ok := detectorDurations [dcsDet ]; ok {
2014+ metric .SetFieldInt64 ("execution_time_ms" , duration .Milliseconds ())
2015+ monitoring .Send (& metric )
2016+ }
2017+ }
2018+ for detectorState , detectorCount := range resultsMap {
2019+ totalCallMetric .SetFieldInt64 (dcspb .DetectorState_name [int32 (detectorState )], int64 (detectorCount ))
2020+ }
2021+ }
2022+
2023+ func addFunctionResult (metric * monitoring.Metric , result string ) {
2024+ metric .AddTag ("result" , result )
2025+ }
2026+
19672027func PFRgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
19682028 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
19692029 callFailedStr string , payload map [string ]interface {}, runType string ,
19702030) (error , []byte ) {
1971- metric := newMetric (runType , envId , "PFR" )
1972- defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
2031+ pfr := "PFR"
2032+ detectorDurations := map [dcspb.Detector ]time.Duration {}
2033+ start := time .Now ()
2034+
2035+ totalCallDurationMetric := newMetric (runType , envId , pfr )
2036+ totalCallDurationMetric .AddTag ("detector" , "All" )
2037+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
19732038
19742039 var err error
19752040 var dcsEvent * dcspb.RunEvent
@@ -1989,13 +2054,15 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19892054 Error : err .Error (),
19902055 })
19912056
2057+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
19922058 break
19932059 }
19942060 dcsEvent , err = stream .Recv ()
19952061 if errors .Is (err , io .EOF ) { // correct stream termination
19962062 log .Debug ("DCS PFR event stream was closed from the DCS side (EOF)" )
19972063 err = nil
19982064
2065+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
19992066 break // no more data
20002067 }
20012068 if errors .Is (err , context .DeadlineExceeded ) {
@@ -2015,6 +2082,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20152082 Error : err .Error (),
20162083 })
20172084
2085+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
20182086 break
20192087 }
20202088 if err != nil { // stream termination in case of unknown or gRPC error
@@ -2036,6 +2104,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20362104 Payload : string (payloadJsonForKafka [:]),
20372105 Error : err .Error (),
20382106 })
2107+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
20392108 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
20402109 logMsg := "bad DCS PFR event received, any future DCS events are ignored"
20412110 log .WithError (err ).
@@ -2052,6 +2121,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20522121 Payload : string (payloadJsonForKafka [:]),
20532122 Error : logMsg ,
20542123 })
2124+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
20552125 } else { // some other gRPC error code
20562126 log .WithError (err ).
20572127 Error ("DCS PFR call error" )
@@ -2067,6 +2137,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20672137 Payload : string (payloadJsonForKafka [:]),
20682138 Error : err .Error (),
20692139 })
2140+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
20702141 }
20712142
20722143 break
@@ -2083,6 +2154,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20832154 }
20842155
20852156 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
2157+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
20862158 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
20872159
20882160 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -2230,6 +2302,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
22302302 })
22312303 }
22322304 }
2305+
2306+ convertAndSendDetectorDurationsAndStates (pfr , detectorStatusMap , detectorDurations , envId , runType , & totalCallDurationMetric )
2307+
22332308 return err , payloadJsonForKafka
22342309}
22352310
0 commit comments