@@ -67,6 +67,12 @@ const (
6767 DCS_GENERAL_OP_TIMEOUT = 45 * time .Second
6868 DCS_TIME_FORMAT = "2006-01-02 15:04:05.000"
6969 TOPIC = topic .IntegratedService + topic .Separator + "dcs"
70+
71+ DCS_RESULT_OK = "ok"
72+ DCS_RESULT_TIMEOUT = "timeout"
73+ DCS_RESULT_GRPC_TIMEOUT = "gRPC_timeout"
74+ DCS_RESULT_GRPC_UNKNOWN = "gRPC_unknown"
75+ DCS_RESULT_GRPC_ERROR = "gRPC_error"
7076)
7177
7278type Plugin struct {
@@ -1470,12 +1476,12 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14701476 metric := newMetric (runType , envId , "EOR" )
14711477 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
14721478 eor := "EOR"
1473- detectorDurations := map [dcspb.Detector ]time.Duration {}
1479+ durationsPerDetector := map [dcspb.Detector ]time.Duration {}
14741480 start := time .Now ()
14751481
1476- wholeMetric := newMetric (runType , envId , eor )
1477- wholeMetric .AddTag ("detector" , "All" )
1478- defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
1482+ totalCallDurationMetric := newMetric (runType , envId , eor )
1483+ totalCallDurationMetric .AddTag ("detector" , "All" )
1484+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
14791485
14801486 var dcsEvent * dcspb.RunEvent
14811487 var err error
@@ -1495,13 +1501,15 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14951501 Error : err .Error (),
14961502 })
14971503
1504+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
14981505 break
14991506 }
15001507 dcsEvent , err = stream .Recv ()
15011508 if errors .Is (err , io .EOF ) { // correct stream termination
15021509 log .Debug ("DCS EOR event stream was closed from the DCS side (EOF)" )
15031510 err = nil
15041511
1512+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
15051513 break // no more data
15061514 }
15071515 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1521,6 +1529,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15211529 Error : err .Error (),
15221530 })
15231531
1532+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
15241533 break
15251534 }
15261535 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1542,6 +1551,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15421551 Payload : string (payloadJsonForKafka [:]),
15431552 Error : err .Error (),
15441553 })
1554+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
15451555
15461556 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
15471557 logMsg := "bad DCS EOR event received, any future DCS events are ignored"
@@ -1558,6 +1568,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15581568 Payload : string (payloadJsonForKafka [:]),
15591569 Error : logMsg ,
15601570 })
1571+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
15611572 } else { // some other gRPC error code
15621573 log .WithError (err ).
15631574 Debug ("DCS EOR call error" )
@@ -1573,6 +1584,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15731584 Payload : string (payloadJsonForKafka [:]),
15741585 Error : err .Error (),
15751586 })
1587+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
15761588 }
15771589
15781590 break
@@ -1588,7 +1600,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15881600 }
15891601
15901602 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1591- detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
1603+ durationsPerDetector [dcsEvent .GetDetector ()] = time .Since (start )
15921604 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
15931605
15941606 if dcsEvent .GetState () == dcspb .DetectorState_EOR_FAILURE {
@@ -1701,7 +1713,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17011713 }
17021714 }
17031715
1704- convertAndSendDetectorDurationsAndStates (eor , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
1716+ convertAndSendDetectorDurationsAndStates (eor , detectorStatusMap , durationsPerDetector , envId , runType , & totalCallDurationMetric )
17051717
17061718 return err , payloadJsonForKafka
17071719}
@@ -1714,9 +1726,9 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17141726 detectorDurations := map [dcspb.Detector ]time.Duration {}
17151727 start := time .Now ()
17161728
1717- wholeMetric := newMetric (runType , envId , sor )
1718- wholeMetric .AddTag ("detector" , "All" )
1719- defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
1729+ totalCallDurationMetric := newMetric (runType , envId , sor )
1730+ totalCallDurationMetric .AddTag ("detector" , "All" )
1731+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
17201732
17211733 var dcsEvent * dcspb.RunEvent
17221734 var err error
@@ -1736,13 +1748,15 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17361748 Error : err .Error (),
17371749 })
17381750
1751+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
17391752 break
17401753 }
17411754 dcsEvent , err = stream .Recv ()
17421755 if errors .Is (err , io .EOF ) { // correct stream termination
17431756 log .Debug ("DCS SOR event stream was closed from the DCS side (EOF)" )
17441757 err = nil
17451758
1759+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
17461760 break // no more data
17471761 }
17481762 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1762,6 +1776,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17621776 Error : err .Error (),
17631777 })
17641778
1779+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
17651780 break
17661781 }
17671782 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1784,6 +1799,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17841799 Error : err .Error (),
17851800 })
17861801
1802+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
17871803 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
17881804 logMsg := "bad DCS SOR event received, any future DCS events are ignored"
17891805 log .WithError (err ).
@@ -1799,6 +1815,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17991815 Payload : string (payloadJsonForKafka [:]),
18001816 Error : logMsg ,
18011817 })
1818+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
18021819 } else { // some other gRPC error code
18031820 log .WithError (err ).
18041821 Debug ("DCS SOR call error" )
@@ -1814,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18141831 Payload : string (payloadJsonForKafka [:]),
18151832 Error : err .Error (),
18161833 })
1834+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
18171835 }
18181836
18191837 break
@@ -1979,12 +1997,12 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19791997 }
19801998 }
19811999
1982- convertAndSendDetectorDurationsAndStates (sor , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
2000+ convertAndSendDetectorDurationsAndStates (sor , detectorStatusMap , detectorDurations , envId , runType , & totalCallDurationMetric )
19832001
19842002 return err , payloadJsonForKafka
19852003}
19862004
1987- func convertAndSendDetectorDurationsAndStates (method string , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState , detectorDurations map [dcspb.Detector ]time.Duration , envId , runType string , wholeMetric * monitoring.Metric ) {
2005+ func convertAndSendDetectorDurationsAndStates (method string , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState , detectorDurations map [dcspb.Detector ]time.Duration , envId , runType string , totalCallMetric * monitoring.Metric ) {
19882006 resultsMap := make (map [dcspb.DetectorState ]int )
19892007 for dcsDet , state := range detectorStatusMap {
19902008 metric := newMetric (runType , envId , method )
@@ -1998,10 +2016,14 @@ func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap m
19982016 }
19992017 }
20002018 for detectorState , detectorCount := range resultsMap {
2001- wholeMetric .SetFieldInt64 (dcspb .DetectorState_name [int32 (detectorState )], int64 (detectorCount ))
2019+ totalCallMetric .SetFieldInt64 (dcspb .DetectorState_name [int32 (detectorState )], int64 (detectorCount ))
20022020 }
20032021}
20042022
2023+ func addFunctionResult (metric * monitoring.Metric , result string ) {
2024+ metric .AddTag ("result" , result )
2025+ }
2026+
20052027func PFRgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
20062028 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
20072029 callFailedStr string , payload map [string ]interface {}, runType string ,
@@ -2010,9 +2032,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20102032 detectorDurations := map [dcspb.Detector ]time.Duration {}
20112033 start := time .Now ()
20122034
2013- wholeMetric := newMetric (runType , envId , pfr )
2014- wholeMetric .AddTag ("detector" , "All" )
2015- defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
2035+ totalCallDurationMetric := newMetric (runType , envId , pfr )
2036+ totalCallDurationMetric .AddTag ("detector" , "All" )
2037+ defer monitoring .TimerSendSingle (& totalCallDurationMetric , monitoring .Millisecond )()
20162038
20172039 var err error
20182040 var dcsEvent * dcspb.RunEvent
@@ -2032,13 +2054,15 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20322054 Error : err .Error (),
20332055 })
20342056
2057+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
20352058 break
20362059 }
20372060 dcsEvent , err = stream .Recv ()
20382061 if errors .Is (err , io .EOF ) { // correct stream termination
20392062 log .Debug ("DCS PFR event stream was closed from the DCS side (EOF)" )
20402063 err = nil
20412064
2065+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_OK )
20422066 break // no more data
20432067 }
20442068 if errors .Is (err , context .DeadlineExceeded ) {
@@ -2058,6 +2082,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20582082 Error : err .Error (),
20592083 })
20602084
2085+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_TIMEOUT )
20612086 break
20622087 }
20632088 if err != nil { // stream termination in case of unknown or gRPC error
@@ -2079,6 +2104,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20792104 Payload : string (payloadJsonForKafka [:]),
20802105 Error : err .Error (),
20812106 })
2107+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_TIMEOUT )
20822108 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
20832109 logMsg := "bad DCS PFR event received, any future DCS events are ignored"
20842110 log .WithError (err ).
@@ -2095,6 +2121,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20952121 Payload : string (payloadJsonForKafka [:]),
20962122 Error : logMsg ,
20972123 })
2124+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_UNKNOWN )
20982125 } else { // some other gRPC error code
20992126 log .WithError (err ).
21002127 Error ("DCS PFR call error" )
@@ -2110,6 +2137,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
21102137 Payload : string (payloadJsonForKafka [:]),
21112138 Error : err .Error (),
21122139 })
2140+ addFunctionResult (& totalCallDurationMetric , DCS_RESULT_GRPC_ERROR )
21132141 }
21142142
21152143 break
@@ -2275,7 +2303,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
22752303 }
22762304 }
22772305
2278- convertAndSendDetectorDurationsAndStates (pfr , detectorStatusMap , detectorDurations , envId , runType , & wholeMetric )
2306+ convertAndSendDetectorDurationsAndStates (pfr , detectorStatusMap , detectorDurations , envId , runType , & totalCallDurationMetric )
22792307
22802308 return err , payloadJsonForKafka
22812309}
0 commit comments