Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 41 additions & 95 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
}
// global runs only
stack["RunLoad"] = func() (out string) { // must formally return string even when we return nothing
log.WithField("partition", envId).
WithField("level", infologger.IL_Ops).
rn := varStack["run_number"]
var runNumber64 int64
runNumber64, err := strconv.ParseInt(rn, 10, 32)
if err != nil {
log.WithField("partition", envId).
WithError(err).
Error("cannot acquire run number for Run Load")
}
log := log.WithField("run", runNumber64).WithField("partition", envId)

log.WithField("level", infologger.IL_Ops).
Info("ALIECS SOR operation : performing TRG Run Load Request")

parentRole, ok := call.GetParentRole().(callable.ParentRole)
Expand All @@ -570,30 +579,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

globalConfig, ok := varStack["trg_global_config"]
log.WithField("globalConfig", globalConfig).
WithField("partition", envId).
Debug("not a TRG Global Run, continuing with TRG Run Start")
if !ok {
log.WithField("partition", envId).
Debug("no TRG Global config set")
log.Debug("no TRG Global config set")
globalConfig = ""
}
// TODO (malexis): pass consul key to TRG if avail

rn := varStack["run_number"]
var runNumber64 int64
runNumber64, err := strconv.ParseInt(rn, 10, 32)
if err != nil {
log.WithField("partition", envId).
WithError(err).
Error("cannot acquire run number for Run Load")
}

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
// "" -all required must be ready
log.WithField("partition", envId).
WithField("run", runNumber64).
Debug("empty TRG detectors list provided")
log.Debug("empty TRG detectors list provided")
trgDetectorsParam = ""
}

Expand All @@ -603,7 +599,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("partition", envId).
WithField("call", "RunLoad").
Error("TRG error")
call.VarStack["__call_error_reason"] = err.Error()
Expand All @@ -615,9 +610,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
// standalone run
if len(strings.Split(detectors, " ")) < 2 && varStack["trg_global_run_enabled"] == "false" {
// we do not load any run cause it is standalone
log.WithField("partition", envId).
WithField("run", runNumber64).
Debug("not a TRG Global Run, continuing with TRG Run Start")
log.Debug("not a TRG Global Run, continuing with TRG Run Start")

return
}
Expand All @@ -629,7 +622,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("partition", envId).
WithField("call", "RunLoad").
Warn("could not parse ctp_readout_enabled value")
}
Expand All @@ -647,8 +639,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunLoad").
Error("TRG error")

Expand All @@ -664,8 +654,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunLoad").
Error("TRG error")

Expand Down Expand Up @@ -700,8 +688,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunLoad").
Error("TRG error")

Expand Down Expand Up @@ -729,8 +715,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunLoad").
Error("TRG error")

Expand Down Expand Up @@ -775,16 +759,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}
stack["RunStart"] = func() (out string) { // must formally return string even when we return nothing
log.WithField("partition", envId).
Info("ALIECS SOR operation : performing TRG Run Start")

runtimeConfig, ok := varStack["trg_runtime_config"]
if !ok {
log.WithField("partition", envId).
Debug("no TRG config set, using default configuration")
runtimeConfig = ""
}

rn := varStack["run_number"]
var runNumber64 int64
runNumber64, err := strconv.ParseInt(rn, 10, 32)
Expand All @@ -793,13 +767,20 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
WithField("partition", envId).
Error("cannot acquire run number for Run Start")
}
log := log.WithField("run", runNumber64).WithField("partition", envId)

log.Info("ALIECS SOR operation : performing TRG Run Start")

runtimeConfig, ok := varStack["trg_runtime_config"]
if !ok {
log.Debug("no TRG config set, using default configuration")
runtimeConfig = ""
}

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
// "" it is a global run
log.WithField("partition", envId).
WithField("run", runNumber64).
Debug("Detector for host is not available, starting global run")
log.Debug("Detector for host is not available, starting global run")
trgDetectorsParam = ""
}

Expand All @@ -810,8 +791,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStart").
Error("TRG error")

Expand Down Expand Up @@ -839,8 +818,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStart").
Error("TRG error")

Expand All @@ -855,8 +832,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStart").
Error("TRG error")

Expand Down Expand Up @@ -891,8 +866,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStart").
Error("TRG error")

Expand All @@ -919,8 +892,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStart").
Error("TRG error")

Expand Down Expand Up @@ -960,9 +931,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

// runStart successful, we cache the run number for eventual cleanup
p.pendingRunStops[envId] = runNumber64
log.WithField("partition", envId).
WithField("run", runNumber64).
Info("TRG RunStart success")
log.Info("TRG RunStart success")

trgStartTimeS := strconv.FormatInt(trgStartTime.UnixMilli(), 10)
parentRole, ok := call.GetParentRole().(callable.ParentRole)
Expand All @@ -974,12 +943,12 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}
runStopFunc := func(ctx context.Context, runNumber64 int64) (out string) {
log := log.WithField("run", runNumber64).WithField("partition", envId)

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
// "" it is a global run
log.WithField("partition", envId).
WithField("run", runNumber64).
Debug("Detector for host is not available, stoping global run")
log.Debug("Detector for host is not available, stoping global run")
trgDetectorsParam = ""
}

Expand Down Expand Up @@ -1007,8 +976,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStop").
Error("TRG error")

Expand All @@ -1023,8 +990,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStop").
Error("TRG error")

Expand Down Expand Up @@ -1055,8 +1020,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStop").
Error("TRG error")

Expand All @@ -1083,8 +1046,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunStop").
Error("TRG error")

Expand Down Expand Up @@ -1124,24 +1085,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

// RunStop successful, we pop the run number from the cache
delete(p.pendingRunStops, envId)
log.WithField("partition", envId).
WithField("run", runNumber64).
Info("TRG RunStop success")
log.Info("TRG RunStop success")

trgEndTimeS := strconv.FormatInt(trgEndTime.UnixMilli(), 10)
parentRole, ok := call.GetParentRole().(callable.ParentRole)
if ok {
parentRole.SetGlobalRuntimeVar("trg_end_time_ms", trgEndTimeS)
} else {
log.WithField("partition", envId).
WithField("run", runNumber64).
WithField("trgEndTime", trgEndTimeS).
log.WithField("trgEndTime", trgEndTimeS).
Debug("could not get parentRole and set TRG end time")
}

return
}
runUnloadFunc := func(ctx context.Context, runNumber64 int64) (out string) {
log := log.WithField("run", runNumber64).WithField("partition", envId)

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
Expand All @@ -1155,9 +1113,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

// if global run then unload
if len(strings.Split(detectors, " ")) < 2 && varStack["trg_global_run_enabled"] == "false" {
log.WithField("partition", envId).
WithField("run", runNumber64).
Debug("not a TRG Global Run, skipping TRG Run Unload")
log.Debug("not a TRG Global Run, skipping TRG Run Unload")
return
}

Expand All @@ -1175,8 +1131,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunUnload").
Error("TRG error")

Expand All @@ -1191,8 +1145,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunUnload").
Error("TRG error")

Expand Down Expand Up @@ -1223,8 +1175,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunUnload").
Error("TRG error")

Expand All @@ -1251,8 +1201,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("run", runNumber64).
WithField("partition", envId).
WithField("call", "RunUnload").
Error("TRG error")

Expand Down Expand Up @@ -1290,17 +1238,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

// RunUnload successful, we pop the run number from the cache
delete(p.pendingRunUnloads, envId)
log.WithField("partition", envId).
WithField("run", runNumber64).
Info("ALICECS EOR operation : TRG RunUnload success")
log.Info("ALICECS EOR operation : TRG RunUnload success")

return
}
stack["RunStop"] = func() (out string) {
log.WithField("partition", envId).
//WithField("run", runNumber64).
Info("ALIECS EOR operation : performing TRG Run Stop ")

rn := varStack["run_number"]
var runNumber64 int64
var err error
Expand All @@ -1311,17 +1253,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Stop")
}

log.WithField("partition", envId).
WithField("run", runNumber64).
Info("ALIECS EOR operation : performing TRG Run Stop ")

timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "RunStop", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return runStopFunc(ctx, runNumber64)
}
stack["RunUnload"] = func() (out string) {
log.WithField("partition", envId).
//WithField("run", runNumber64).
Info("ALIECS EOR operation : performing TRG Run Unload ")

rn := varStack["run_number"]
var runNumber64 int64
var err error
Expand All @@ -1332,6 +1274,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Unload")
}

log.WithField("partition", envId).
WithField("run", runNumber64).
Info("ALIECS EOR operation : performing TRG Run Unload ")

timeout := callable.AcquireTimeout(TRG_UNLOAD_TIMEOUT, varStack, "RunUnload", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand Down