Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ const (
DCS_GENERAL_OP_TIMEOUT = 45 * time.Second
DCS_TIME_FORMAT = "2006-01-02 15:04:05.000"
TOPIC = topic.IntegratedService + topic.Separator + "dcs"

DCS_RESULT_OK = "ok"
DCS_RESULT_TIMEOUT = "timeout"
DCS_RESULT_GRPC_TIMEOUT = "gRPC_timeout"
DCS_RESULT_GRPC_UNKNOWN = "gRPC_unknown"
DCS_RESULT_GRPC_ERROR = "gRPC_error"
)

type Plugin struct {
Expand Down Expand Up @@ -1469,6 +1475,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
) (error, []byte) {
metric := newMetric(runType, envId, "EOR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
eor := "EOR"
durationsPerDetector := map[dcspb.Detector]time.Duration{}
start := time.Now()

totalCallDurationMetric := newMetric(runType, envId, eor)
totalCallDurationMetric.AddTag("detector", "All")
defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)()

var dcsEvent *dcspb.RunEvent
var err error
Expand All @@ -1488,13 +1501,15 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT)
break
}
dcsEvent, err = stream.Recv()
if errors.Is(err, io.EOF) { // correct stream termination
log.Debug("DCS EOR event stream was closed from the DCS side (EOF)")
err = nil

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK)
break // no more data
}
if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -1514,6 +1529,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT)
break
}
if err != nil { // stream termination in case of unknown or gRPC error
Expand All @@ -1535,6 +1551,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: err.Error(),
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT)

} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
Expand All @@ -1551,6 +1568,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: logMsg,
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN)
} else { // some other gRPC error code
log.WithError(err).
Debug("DCS EOR call error")
Expand All @@ -1566,6 +1584,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: err.Error(),
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR)
}

break
Expand All @@ -1581,6 +1600,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
}

detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
durationsPerDetector[dcsEvent.GetDetector()] = time.Since(start)
ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())

if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE {
Expand Down Expand Up @@ -1692,15 +1712,23 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
})
}
}

convertAndSendDetectorDurationsAndStates(eor, detectorStatusMap, durationsPerDetector, envId, runType, &totalCallDurationMetric)

return err, payloadJsonForKafka
}

func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
callFailedStr string, payload map[string]interface{}, runType string,
) (error, []byte) {
metric := newMetric(runType, envId, "SOR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
sor := "SOR"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sor := "SOR"
callName := "SOR"

It's nitpicking, but perhaps this would be better?

After all, if I wanted to declare e.g. a constant defining a number of FLPs, we would not call it twoHundred but numberOfFLPs.

The same comment applies for other calls.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree generally with you, but I prefer calling it sor, eor, pfr in this case. It looks more readable when passing it around knowing it's value and inherently it's meaning. The reason is that wile coding you can see hints inside the editors , so I see something like method: sor making it better than method: callName

I also don't think that your example is quite applicable as twoHundred is really generic, but sor is quite specific thing in our tech stack.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's a matter of personal preference and I would not judge one as better than other in our case. It's OK.

detectorDurations := map[dcspb.Detector]time.Duration{}
start := time.Now()

totalCallDurationMetric := newMetric(runType, envId, sor)
totalCallDurationMetric.AddTag("detector", "All")
defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)()

var dcsEvent *dcspb.RunEvent
var err error
Expand All @@ -1720,13 +1748,15 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT)
break
}
dcsEvent, err = stream.Recv()
if errors.Is(err, io.EOF) { // correct stream termination
log.Debug("DCS SOR event stream was closed from the DCS side (EOF)")
err = nil

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK)
break // no more data
}
if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -1746,6 +1776,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT)
break
}
if err != nil { // stream termination in case of unknown or gRPC error
Expand All @@ -1768,6 +1799,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT)
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
log.WithError(err).
Expand All @@ -1783,6 +1815,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: logMsg,
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN)
} else { // some other gRPC error code
log.WithError(err).
Debug("DCS SOR call error")
Expand All @@ -1798,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: err.Error(),
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR)
}

break
Expand All @@ -1813,6 +1847,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
}

detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
detectorDurations[dcsEvent.GetDetector()] = time.Since(start)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, the duration will be incorrect if the detector times out or there is some weirder error, am I wrong? In such case, the loop will break before reaching this line.

Not sure what to propose for the timeout case... after all, some detectors might complete an operation and some not, so we would want to set the timeout value for those who did not complete, but we would have to do some guess work instead of relying on dcsEvent.GetDetector().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, I overlooked the break.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we both agree that we are not sure how to fix this, I tried to resolve the problem by adding result tag to the totalCallDurationMetric. It won't fix this problem with incorrect duration with some detectors, but we might at least know that we cannot truest results in a case, when we break outside of the loop early.

Copy link
Collaborator

@knopers8 knopers8 Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can then observe what we see in prod and adapt it if needed.

ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())

if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
Expand Down Expand Up @@ -1961,15 +1996,45 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
})
}
}

convertAndSendDetectorDurationsAndStates(sor, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric)

return err, payloadJsonForKafka
}

func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState, detectorDurations map[dcspb.Detector]time.Duration, envId, runType string, totalCallMetric *monitoring.Metric) {
resultsMap := make(map[dcspb.DetectorState]int)
for dcsDet, state := range detectorStatusMap {
metric := newMetric(runType, envId, method)
det := dcsToEcsDetector(dcsDet)
metric.AddTag("detector", det)
metric.AddTag("state", dcspb.DetectorState_name[int32(state)])
resultsMap[state] += 1
if duration, ok := detectorDurations[dcsDet]; ok {
metric.SetFieldInt64("execution_time_ms", duration.Milliseconds())
monitoring.Send(&metric)
}
}
for detectorState, detectorCount := range resultsMap {
totalCallMetric.SetFieldInt64(dcspb.DetectorState_name[int32(detectorState)], int64(detectorCount))
}
}

func addFunctionResult(metric *monitoring.Metric, result string) {
metric.AddTag("result", result)
}

func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
callFailedStr string, payload map[string]interface{}, runType string,
) (error, []byte) {
metric := newMetric(runType, envId, "PFR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
pfr := "PFR"
detectorDurations := map[dcspb.Detector]time.Duration{}
start := time.Now()

totalCallDurationMetric := newMetric(runType, envId, pfr)
totalCallDurationMetric.AddTag("detector", "All")
defer monitoring.TimerSendSingle(&totalCallDurationMetric, monitoring.Millisecond)()

var err error
var dcsEvent *dcspb.RunEvent
Expand All @@ -1989,13 +2054,15 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT)
break
}
dcsEvent, err = stream.Recv()
if errors.Is(err, io.EOF) { // correct stream termination
log.Debug("DCS PFR event stream was closed from the DCS side (EOF)")
err = nil

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_OK)
break // no more data
}
if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -2015,6 +2082,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Error: err.Error(),
})

addFunctionResult(&totalCallDurationMetric, DCS_RESULT_TIMEOUT)
break
}
if err != nil { // stream termination in case of unknown or gRPC error
Expand All @@ -2036,6 +2104,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: err.Error(),
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_TIMEOUT)
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
log.WithError(err).
Expand All @@ -2052,6 +2121,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: logMsg,
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_UNKNOWN)
} else { // some other gRPC error code
log.WithError(err).
Error("DCS PFR call error")
Expand All @@ -2067,6 +2137,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
Payload: string(payloadJsonForKafka[:]),
Error: err.Error(),
})
addFunctionResult(&totalCallDurationMetric, DCS_RESULT_GRPC_ERROR)
}

break
Expand All @@ -2083,6 +2154,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
}

detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState()
detectorDurations[dcsEvent.GetDetector()] = time.Since(start)
ecsDet := dcsToEcsDetector(dcsEvent.GetDetector())

if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
Expand Down Expand Up @@ -2230,6 +2302,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
})
}
}

convertAndSendDetectorDurationsAndStates(pfr, detectorStatusMap, detectorDurations, envId, runType, &totalCallDurationMetric)

return err, payloadJsonForKafka
}

Expand Down