Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions common/monitoring/grpcinterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,33 @@ import (
"google.golang.org/grpc"
)

type (
EnvIDKey struct{}
RunTypeKey struct{}
)

func AddEnvAndRunType(ctx context.Context, envId, runType string) context.Context {
ctx = context.WithValue(ctx, EnvIDKey{}, envId)
ctx = context.WithValue(ctx, RunTypeKey{}, runType)
return ctx
}

type measuredClientStream struct {
grpc.ClientStream
ctx context.Context
method string
metricName string
}

func (t *measuredClientStream) RecvMsg(m interface{}) error {
metric := NewMetric(t.metricName)
metric.AddTag("method", t.method)
if env, ok := t.ctx.Value(EnvIDKey{}).(string); ok {
metric.AddTag("envId", env)
}
if rt, ok := t.ctx.Value(RunTypeKey{}).(string); ok {
metric.AddTag("runtype", rt)
}
defer TimerSendSingle(&metric, Millisecond)()

err := t.ClientStream.RecvMsg(m)
Expand All @@ -63,6 +81,7 @@ func SetupStreamClientInterceptor(metricName string, convert NameConvertType) gr

return &measuredClientStream{
ClientStream: clientStream,
ctx: ctx,
method: convert(method),
metricName: metricName,
}, nil
Expand All @@ -73,6 +92,12 @@ func SetupUnaryClientInterceptor(name string, convert NameConvertType) grpc.Unar
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
metric := NewMetric(name)
metric.AddTag("method", convert(method))
if env, ok := ctx.Value(EnvIDKey{}).(string); ok {
metric.AddTag("envId", env)
}
if rt, ok := ctx.Value(RunTypeKey{}).(string); ok {
metric.AddTag("runtype", rt)
}
defer TimerSendSingle(&metric, Millisecond)()
return invoker(ctx, method, req, reply, cc, opts...)
}
Expand Down
1 change: 1 addition & 0 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig
metric := monitoring.NewMetric("hooks")
metric.AddTag("trigger", trigger)
metric.AddTag("envId", env.id.String())
metric.AddTag("runtype", env.GetRunType().String())
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

allWeightsSet := make(callable.HooksMap)
Expand Down
8 changes: 5 additions & 3 deletions core/integration/ccdb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}
p.existingRuns[grp.runNumber] = types.Nil{}
err := p.uploadCurrentGRP(grp, envId, true)
err := p.uploadCurrentGRP(grp, envId, true, varStack, "RunStart")
if err != nil {
log.WithField("call", "RunStop").
WithField("run", grp.runNumber).
Expand All @@ -506,7 +506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
_, runExists := p.existingRuns[grp.runNumber]
if runExists {
delete(p.existingRuns, grp.runNumber)
err := p.uploadCurrentGRP(grp, envId, false)
err := p.uploadCurrentGRP(grp, envId, false, varStack, "RunStop")
if err != nil {
log.WithField("call", "RunStop").
WithField("run", grp.runNumber).
Expand All @@ -525,7 +525,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool, varStack map[string]string, callName string) error {
if grp == nil {
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
}
Expand All @@ -550,6 +550,8 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre

metric := monitoring.NewMetric("ccdb")
metric.AddTag("envId", envId)
metric.AddTag("runtype", integration.ExtractRunTypeOrUndefined(varStack))
metric.AddTag("call", callName)
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)
Expand Down
37 changes: 21 additions & 16 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,18 @@ func (p *Plugin) Init(instanceId string) error {
in := &dcspb.SubscriptionRequest{
InstanceId: instanceId,
}

// Always start the goroutine, even if initial subscription fails
go func() {
var evStream dcspb.Configurator_SubscribeClient
var err error

for {
// Try to establish subscription if we don't have one
if evStream == nil {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
Debug("attempting to subscribe to DCS service")

evStream, err = p.dcsClient.Subscribe(context.Background(), in, grpc.EmptyCallOption{})
if err != nil {
log.WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
Expand Down Expand Up @@ -699,7 +699,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

var stream dcspb.Configurator_StartOfRunClient
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
Expand Down Expand Up @@ -746,7 +746,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))

dcsFailedEcsDetectors := make([]string, 0)
dcsopOk := true
Expand Down Expand Up @@ -1064,7 +1065,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

var stream dcspb.Configurator_StartOfRunClient
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
Expand Down Expand Up @@ -1112,7 +1113,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
}
p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later

err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))

dcsFailedEcsDetectors := make([]string, 0)
dcsopOk := true
Expand Down Expand Up @@ -1298,7 +1300,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

var stream dcspb.Configurator_EndOfRunClient
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

payload := map[string]interface{}{
Expand Down Expand Up @@ -1356,7 +1358,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
}

err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload)
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))

dcsFailedEcsDetectors := make([]string, 0)
dcsopOk := true
Expand Down Expand Up @@ -1452,17 +1455,19 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

func newMetric(method string) monitoring.Metric {
func newMetric(runType, envId, method string) monitoring.Metric {
metric := monitoring.NewMetric("dcsecs")
metric.AddTag("method", method)
metric.AddTag("envId", envId)
metric.AddTag("runtype", runType)
return metric
}

func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
payloadJsonForKafka []byte, stream dcspb.Configurator_EndOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
callFailedStr string, payload map[string]interface{},
callFailedStr string, payload map[string]interface{}, runType string,
) (error, []byte) {
metric := newMetric("EOR")
metric := newMetric(runType, envId, "EOR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

var dcsEvent *dcspb.RunEvent
Expand Down Expand Up @@ -1692,9 +1697,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *

func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
callFailedStr string, payload map[string]interface{},
callFailedStr string, payload map[string]interface{}, runType string,
) (error, []byte) {
metric := newMetric("SOR")
metric := newMetric(runType, envId, "SOR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

var dcsEvent *dcspb.RunEvent
Expand Down Expand Up @@ -1961,9 +1966,9 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *

func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *callable.Call, envId string,
payloadJsonForKafka []byte, stream dcspb.Configurator_StartOfRunClient, detectorStatusMap map[dcspb.Detector]dcspb.DetectorState,
callFailedStr string, payload map[string]interface{},
callFailedStr string, payload map[string]interface{}, runType string,
) (error, []byte) {
metric := newMetric("PFR")
metric := newMetric(runType, envId, "PFR")
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()

var err error
Expand Down
20 changes: 8 additions & 12 deletions core/integration/ddsched/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/common/utils/uid"
"github.com/AliceO2Group/Control/core/environment"
Expand Down Expand Up @@ -163,6 +164,7 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]map[string]s
EnvironmentId: envId.String(),
}
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("ddschedStatusTimeout"))
ctx = monitoring.AddEnvAndRunType(ctx, envId.String(), "none")
state, err := p.ddSchedClient.PartitionStatus(ctx, &in, grpc.EmptyCallOption{})
cancel()
if err != nil {
Expand Down Expand Up @@ -321,11 +323,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var (
response *ddpb.PartitionResponse
)
var response *ddpb.PartitionResponse
timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

payload := map[string]interface{}{
Expand Down Expand Up @@ -574,11 +574,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var (
response *ddpb.PartitionResponse
)
var response *ddpb.PartitionResponse
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

payload := map[string]interface{}{
Expand Down Expand Up @@ -821,16 +819,14 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var (
response *ddpb.PartitionResponse
)
var response *ddpb.PartitionResponse

infoReq := ddpb.PartitionInfo{
EnvironmentId: envId,
PartitionId: envId,
}
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Expand Down
24 changes: 12 additions & 12 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *Plugin) GetConnectionState() string {

func (p *Plugin) queryPartitionStatus() {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
defer cancel()

statusRep := &odc.StatusReply{}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (p *Plugin) queryPartitionStatus() {
go func(idx int, partId uid.ID) {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
defer cancel()

odcPartStateRep, err := p.odcClient.GetState(ctx, &odc.StateRequest{
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{
Expand Down Expand Up @@ -1292,7 +1292,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
}
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err := handleConfigure(ctx, p.odcClient, arguments, paddingTimeout, envId, call)
if err != nil {
Expand All @@ -1314,7 +1314,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

callFailedStr := "EPN Reset call failed"

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, call.VarStack, timeout)
defer cancel()
err := handleReset(ctx, p.odcClient, nil, paddingTimeout, envId, call)
if err != nil {
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

callFailedStr := "EPN PartitionTerminate call failed"

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err := handlePartitionTerminate(ctx, p.odcClient, nil, paddingTimeout, envId, call)
if err != nil {
Expand Down Expand Up @@ -1414,7 +1414,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
arguments["original_run_number"] = originalRunNumber
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err = handleStart(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
if err != nil {
Expand Down Expand Up @@ -1462,7 +1462,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
if err != nil {
Expand All @@ -1486,7 +1486,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "EnsureStop", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()

state, err := handleGetState(ctx, p.odcClient, envId)
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

callFailedStr := "EPN EnsureCleanup call failed"

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, envId, call)
if err != nil {
Expand All @@ -1572,7 +1572,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

callFailedStr := "EPN PreDeploymentCleanup call failed"

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, "", call)
if err != nil {
Expand All @@ -1593,7 +1593,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

callFailedStr := "EPN EnsureCleanupLegacy call failed"

ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := integration.NewContext(envId, varStack, timeout)
defer cancel()
err := handleCleanupLegacy(ctx, p.odcClient, nil, paddingTimeout, envId, call)
if err != nil {
Expand Down
Loading
Loading