Skip to content

Commit 3430a73

Browse files
author
Michal Tichák
committed
others
1 parent cdcd86a commit 3430a73

File tree

7 files changed

+80
-57
lines changed

7 files changed

+80
-57
lines changed

core/integration/ccdb/plugin.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
481481
return
482482
}
483483
p.existingRuns[grp.runNumber] = types.Nil{}
484-
err := p.uploadCurrentGRP(grp, envId, true)
484+
err := p.uploadCurrentGRP(grp, envId, true, varStack, "RunStart")
485485
if err != nil {
486486
log.WithField("call", "RunStop").
487487
WithField("run", grp.runNumber).
@@ -506,7 +506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
506506
_, runExists := p.existingRuns[grp.runNumber]
507507
if runExists {
508508
delete(p.existingRuns, grp.runNumber)
509-
err := p.uploadCurrentGRP(grp, envId, false)
509+
err := p.uploadCurrentGRP(grp, envId, false, varStack, "RunStop")
510510
if err != nil {
511511
log.WithField("call", "RunStop").
512512
WithField("run", grp.runNumber).
@@ -525,7 +525,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
525525
return
526526
}
527527

528-
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool) error {
528+
func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refresh bool, varStack map[string]string, callName string) error {
529529
if grp == nil {
530530
return errors.New(fmt.Sprintf("Failed to create a GRP object"))
531531
}
@@ -550,6 +550,8 @@ func (p *Plugin) uploadCurrentGRP(grp *GeneralRunParameters, envId string, refre
550550

551551
metric := monitoring.NewMetric("ccdb")
552552
metric.AddTag("envId", envId)
553+
metric.AddTag("runtype", integration.ExtractRunTypeOrUndefined(varStack))
554+
metric.AddTag("call", callName)
553555
defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)()
554556

555557
cmd := exec.CommandContext(ctx, "bash", "-c", cmdStr)

core/integration/dcs/plugin.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
699699

700700
var stream dcspb.Configurator_StartOfRunClient
701701
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "PFR", envId)
702-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
702+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
703703
defer cancel()
704704

705705
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -746,7 +746,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
746746
return
747747
}
748748

749-
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
749+
err, payloadJson = PFRgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
750+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
750751

751752
dcsFailedEcsDetectors := make([]string, 0)
752753
dcsopOk := true
@@ -1064,7 +1065,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
10641065

10651066
var stream dcspb.Configurator_StartOfRunClient
10661067
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
1067-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
1068+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
10681069
defer cancel()
10691070

10701071
detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
@@ -1112,7 +1113,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11121113
}
11131114
p.pendingEORs[envId] = runNumber64 // make sure the corresponding EOR runs sooner or later
11141115

1115-
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
1116+
err, payloadJson = SORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1117+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
11161118

11171119
dcsFailedEcsDetectors := make([]string, 0)
11181120
dcsopOk := true
@@ -1298,7 +1300,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12981300

12991301
var stream dcspb.Configurator_EndOfRunClient
13001302
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
1301-
ctx, cancel := context.WithTimeout(monitoring.AddEnvAndRunType(context.Background(), envId, extractRunType(varStack)), timeout)
1303+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
13021304
defer cancel()
13031305

13041306
payload := map[string]interface{}{
@@ -1356,7 +1358,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13561358
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
13571359
}
13581360

1359-
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream, detectorStatusMap, callFailedStr, payload, extractRunType(varStack))
1361+
err, payloadJson = EORgRPCCommunicationLoop(ctx, timeout, call, envId, payloadJson, stream,
1362+
detectorStatusMap, callFailedStr, payload, integration.ExtractRunTypeOrUndefined(varStack))
13601363

13611364
dcsFailedEcsDetectors := make([]string, 0)
13621365
dcsopOk := true
@@ -1452,14 +1455,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14521455
return
14531456
}
14541457

1455-
func extractRunType(varStack map[string]string) string {
1456-
runType, ok := varStack["run_type"]
1457-
if !ok {
1458-
runType = "undefined"
1459-
}
1460-
return runType
1461-
}
1462-
14631458
func newMetric(runType, envId, method string) monitoring.Metric {
14641459
metric := monitoring.NewMetric("dcsecs")
14651460
metric.AddTag("method", method)

core/integration/ddsched/plugin.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
"github.com/AliceO2Group/Control/common/event/topic"
4242
"github.com/AliceO2Group/Control/common/logger/infologger"
43+
"github.com/AliceO2Group/Control/common/monitoring"
4344
pb "github.com/AliceO2Group/Control/common/protos"
4445
"github.com/AliceO2Group/Control/common/utils/uid"
4546
"github.com/AliceO2Group/Control/core/environment"
@@ -163,6 +164,7 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]map[string]s
163164
EnvironmentId: envId.String(),
164165
}
165166
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("ddschedStatusTimeout"))
167+
ctx = monitoring.AddEnvAndRunType(ctx, envId.String(), "none")
166168
state, err := p.ddSchedClient.PartitionStatus(ctx, &in, grpc.EmptyCallOption{})
167169
cancel()
168170
if err != nil {
@@ -321,11 +323,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
321323
return
322324
}
323325

324-
var (
325-
response *ddpb.PartitionResponse
326-
)
326+
var response *ddpb.PartitionResponse
327327
timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId)
328-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
328+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
329329
defer cancel()
330330

331331
payload := map[string]interface{}{
@@ -574,11 +574,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
574574
return
575575
}
576576

577-
var (
578-
response *ddpb.PartitionResponse
579-
)
577+
var response *ddpb.PartitionResponse
580578
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
581-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
579+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
582580
defer cancel()
583581

584582
payload := map[string]interface{}{
@@ -821,16 +819,14 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
821819
return
822820
}
823821

824-
var (
825-
response *ddpb.PartitionResponse
826-
)
822+
var response *ddpb.PartitionResponse
827823

828824
infoReq := ddpb.PartitionInfo{
829825
EnvironmentId: envId,
830826
PartitionId: envId,
831827
}
832828
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
833-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
829+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
834830
defer cancel()
835831

836832
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{

core/integration/odc/plugin.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (p *Plugin) GetConnectionState() string {
183183

184184
func (p *Plugin) queryPartitionStatus() {
185185
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
186-
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
186+
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
187187
defer cancel()
188188

189189
statusRep := &odc.StatusReply{}
@@ -238,7 +238,7 @@ func (p *Plugin) queryPartitionStatus() {
238238
go func(idx int, partId uid.ID) {
239239
defer wg.Done()
240240

241-
ctx, cancel := context.WithTimeout(context.Background(), ODC_STATUS_TIMEOUT)
241+
ctx, cancel := integration.NewContextEmptyEnvIdRunType(ODC_STATUS_TIMEOUT)
242242
defer cancel()
243243

244244
odcPartStateRep, err := p.odcClient.GetState(ctx, &odc.StateRequest{
@@ -1179,7 +1179,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
11791179

11801180
timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId)
11811181

1182-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1182+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
11831183
defer cancel()
11841184

11851185
err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{
@@ -1292,7 +1292,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12921292
}
12931293
}
12941294

1295-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1295+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
12961296
defer cancel()
12971297
err := handleConfigure(ctx, p.odcClient, arguments, paddingTimeout, envId, call)
12981298
if err != nil {
@@ -1314,7 +1314,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13141314

13151315
callFailedStr := "EPN Reset call failed"
13161316

1317-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1317+
ctx, cancel := integration.NewContext(envId, call.VarStack, timeout)
13181318
defer cancel()
13191319
err := handleReset(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13201320
if err != nil {
@@ -1343,7 +1343,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13431343

13441344
callFailedStr := "EPN PartitionTerminate call failed"
13451345

1346-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1346+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
13471347
defer cancel()
13481348
err := handlePartitionTerminate(ctx, p.odcClient, nil, paddingTimeout, envId, call)
13491349
if err != nil {
@@ -1414,7 +1414,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14141414
arguments["original_run_number"] = originalRunNumber
14151415
}
14161416

1417-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1417+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14181418
defer cancel()
14191419
err = handleStart(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14201420
if err != nil {
@@ -1462,7 +1462,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14621462

14631463
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)
14641464

1465-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1465+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14661466
defer cancel()
14671467
err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call)
14681468
if err != nil {
@@ -1486,7 +1486,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
14861486

14871487
timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "EnsureStop", envId)
14881488

1489-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1489+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
14901490
defer cancel()
14911491

14921492
state, err := handleGetState(ctx, p.odcClient, envId)
@@ -1551,7 +1551,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15511551

15521552
callFailedStr := "EPN EnsureCleanup call failed"
15531553

1554-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1554+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15551555
defer cancel()
15561556
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15571557
if err != nil {
@@ -1572,7 +1572,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15721572

15731573
callFailedStr := "EPN PreDeploymentCleanup call failed"
15741574

1575-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1575+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15761576
defer cancel()
15771577
err := handleCleanup(ctx, p.odcClient, nil, paddingTimeout, "", call)
15781578
if err != nil {
@@ -1593,7 +1593,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
15931593

15941594
callFailedStr := "EPN EnsureCleanupLegacy call failed"
15951595

1596-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1596+
ctx, cancel := integration.NewContext(envId, varStack, timeout)
15971597
defer cancel()
15981598
err := handleCleanupLegacy(ctx, p.odcClient, nil, paddingTimeout, envId, call)
15991599
if err != nil {

core/integration/plugin.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
package integration
2828

2929
import (
30+
"context"
3031
"sync"
32+
"time"
3133

3234
"github.com/AliceO2Group/Control/common/logger"
35+
"github.com/AliceO2Group/Control/common/monitoring"
3336
"github.com/AliceO2Group/Control/common/utils/uid"
3437
"github.com/sirupsen/logrus"
3538
"github.com/spf13/viper"
@@ -121,7 +124,7 @@ func (p Plugins) CallStack(data interface{}) (stack map[string]interface{}) {
121124
func (p Plugins) ObjectStack(varStack map[string]string, baseConfigStack map[string]string) (stack map[string]interface{}) {
122125
stack = make(map[string]interface{})
123126

124-
//HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role
127+
// HACK: this is a dummy object+function to allow odc.GenerateEPNTopologyFullname in the root role
125128
stack["odc"] = map[string]interface{}{
126129
"GenerateEPNTopologyFullname": func() string {
127130
return ""
@@ -221,3 +224,26 @@ func Reset() {
221224
loaderOnce = sync.Once{}
222225
pluginLoaders = make(map[string]func() Plugin)
223226
}
227+
228+
func ExtractRunTypeOrUndefined(varStack map[string]string) string {
229+
runType, ok := varStack["run_type"]
230+
if !ok {
231+
runType = "undefined"
232+
}
233+
return runType
234+
}
235+
236+
func NewContext(envId string, varStack map[string]string, timeout time.Duration) (context.Context, context.CancelFunc) {
237+
return context.WithTimeout(
238+
monitoring.AddEnvAndRunType(context.Background(),
239+
envId,
240+
ExtractRunTypeOrUndefined(varStack),
241+
),
242+
timeout)
243+
}
244+
245+
func NewContextEmptyEnvIdRunType(timeout time.Duration) (context.Context, context.CancelFunc) {
246+
return context.WithTimeout(
247+
monitoring.AddEnvAndRunType(context.Background(), "none", "none"),
248+
timeout)
249+
}

0 commit comments

Comments
 (0)