Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return runUnloadFunc(ctx, runNumber64)
}
stack["Cleanup"] = func() (out string) {
// obsolete, EnsureRunStop and EnsureRunUnload should be used instead one after another.
envId, ok := varStack["environment_id"]
if !ok {
log.WithField("partition", envId).
Expand All @@ -1303,7 +1304,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithField("run", runNumberStop).
WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Debug("pending TRG Stop found, performing cleanup")
Info("pending TRG Stop found, performing cleanup")

delete(p.pendingRunStops, envId)
_ = runStopFunc(ctx, runNumberStop)
Expand All @@ -1330,7 +1331,80 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithField("run", runNumberUnload).
WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Debug("pending TRG Unload found, performing cleanup")
Info("pending TRG Unload found, performing cleanup")

delete(p.pendingRunUnloads, envId)
_ = runUnloadFunc(ctx, runNumberUnload)
} else {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Debug("TRG cleanup: Unload not needed")
}
return
}
stack["EnsureRunStop"] = func() (out string) {
// if there is a run to stop, it is stopped, otherwise we do nothing (no errors, no events)
envId, ok := varStack["environment_id"]
if !ok {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Warn("no environment_id found for TRG EnsureRunStop")
return
}

timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunStop", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// runStop if found pending
runNumberStop, ok := p.pendingRunStops[envId]
if ok {
log.WithField("run", runNumberStop).
WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Info("pending TRG Stop found, performing cleanup")

delete(p.pendingRunStops, envId)
_ = runStopFunc(ctx, runNumberStop)

trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
parentRole, ok := call.GetParentRole().(callable.ParentRole)
if ok {
parentRole.SetGlobalRuntimeVar("trg_end_time_ms", trgEndTime)
} else {
log.WithField("partition", envId).
WithField("run", runNumberStop).
WithField("trgEndTime", trgEndTime).
Debug("could not get parentRole and set TRG end time")
}
} else {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Debug("TRG cleanup: Stop not needed")
}
return
}
stack["EnsureRunUnload"] = func() (out string) {
// if there is a run to unload, it is unloaded, otherwise we do nothing (no errors, no events)
envId, ok := varStack["environment_id"]
if !ok {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Warn("no environment_id found for TRG EnsureRunUnload")
return
}

timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunUnload", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// runUnload if found pending
runNumberUnload, ok := p.pendingRunUnloads[envId]
if ok {
log.WithField("run", runNumberUnload).
WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Info("pending TRG Unload found, performing cleanup")

delete(p.pendingRunUnloads, envId)
_ = runUnloadFunc(ctx, runNumberUnload)
Expand Down