Skip to content

Commit 1cfa87d

Browse files
committed
[core] split TRG cleanup into two calls for distinguishable kafka events
This commit splits trg.Cleanup() into two calls which should be used subsequently instead of trg.Cleanup. Thanks to this, the GUIs can listen to kafka events for a specific operationName ("trg.EnsureRunStop") for trigger stop timestamp. We preferred to avoid having the GUIs hunt for operationStep "perform TRG call: RunStop", which seems to be a message targeted for humans and could be more prone to change, thus breaking behaviour in GUIs. Fixes OCTRL-1017.
1 parent 8b14cc3 commit 1cfa87d

File tree

1 file changed

+76
-2
lines changed

1 file changed

+76
-2
lines changed

core/integration/trg/plugin.go

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12851285
return runUnloadFunc(ctx, runNumber64)
12861286
}
12871287
stack["Cleanup"] = func() (out string) {
1288+
// obsolete, EnsureRunStop and EnsureRunUnload should be used instead one after another.
12881289
envId, ok := varStack["environment_id"]
12891290
if !ok {
12901291
log.WithField("partition", envId).
@@ -1303,7 +1304,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13031304
log.WithField("run", runNumberStop).
13041305
WithField("partition", envId).
13051306
WithField("level", infologger.IL_Devel).
1306-
Debug("pending TRG Stop found, performing cleanup")
1307+
Info("pending TRG Stop found, performing cleanup")
13071308

13081309
delete(p.pendingRunStops, envId)
13091310
_ = runStopFunc(ctx, runNumberStop)
@@ -1330,7 +1331,80 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
13301331
log.WithField("run", runNumberUnload).
13311332
WithField("partition", envId).
13321333
WithField("level", infologger.IL_Devel).
1333-
Debug("pending TRG Unload found, performing cleanup")
1334+
Info("pending TRG Unload found, performing cleanup")
1335+
1336+
delete(p.pendingRunUnloads, envId)
1337+
_ = runUnloadFunc(ctx, runNumberUnload)
1338+
} else {
1339+
log.WithField("partition", envId).
1340+
WithField("level", infologger.IL_Devel).
1341+
Debug("TRG cleanup: Unload not needed")
1342+
}
1343+
return
1344+
}
1345+
stack["EnsureRunStop"] = func() (out string) {
1346+
// if there is a run to stop, it is stopped, otherwise we do nothing (no errors, no events)
1347+
envId, ok := varStack["environment_id"]
1348+
if !ok {
1349+
log.WithField("partition", envId).
1350+
WithField("level", infologger.IL_Devel).
1351+
Warn("no environment_id found for TRG EnsureRunStop")
1352+
return
1353+
}
1354+
1355+
timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunStop", envId)
1356+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1357+
defer cancel()
1358+
1359+
// runStop if found pending
1360+
runNumberStop, ok := p.pendingRunStops[envId]
1361+
if ok {
1362+
log.WithField("run", runNumberStop).
1363+
WithField("partition", envId).
1364+
WithField("level", infologger.IL_Devel).
1365+
Info("pending TRG Stop found, performing cleanup")
1366+
1367+
delete(p.pendingRunStops, envId)
1368+
_ = runStopFunc(ctx, runNumberStop)
1369+
1370+
trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
1371+
parentRole, ok := call.GetParentRole().(callable.ParentRole)
1372+
if ok {
1373+
parentRole.SetGlobalRuntimeVar("trg_end_time_ms", trgEndTime)
1374+
} else {
1375+
log.WithField("partition", envId).
1376+
WithField("run", runNumberStop).
1377+
WithField("trgEndTime", trgEndTime).
1378+
Debug("could not get parentRole and set TRG end time")
1379+
}
1380+
} else {
1381+
log.WithField("partition", envId).
1382+
WithField("level", infologger.IL_Devel).
1383+
Debug("TRG cleanup: Stop not needed")
1384+
}
1385+
return
1386+
}
1387+
stack["EnsureRunUnload"] = func() (out string) {
1388+
// if there is a run to unload, it is unloaded, otherwise we do nothing (no errors, no events)
1389+
envId, ok := varStack["environment_id"]
1390+
if !ok {
1391+
log.WithField("partition", envId).
1392+
WithField("level", infologger.IL_Devel).
1393+
Warn("no environment_id found for TRG EnsureRunUnload")
1394+
return
1395+
}
1396+
1397+
timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunUnload", envId)
1398+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1399+
defer cancel()
1400+
1401+
// runUnload if found pending
1402+
runNumberUnload, ok := p.pendingRunUnloads[envId]
1403+
if ok {
1404+
log.WithField("run", runNumberUnload).
1405+
WithField("partition", envId).
1406+
WithField("level", infologger.IL_Devel).
1407+
Info("pending TRG Unload found, performing cleanup")
13341408

13351409
delete(p.pendingRunUnloads, envId)
13361410
_ = runUnloadFunc(ctx, runNumberUnload)

0 commit comments

Comments
 (0)