@@ -156,6 +156,7 @@ int defaultConditionQueryRateMultiplier()
156156
157157void WorkflowHelpers::injectServiceDevices (WorkflowSpec& workflow, ConfigContext& ctx)
158158{
159+ int rateLimitingIPCID = std::stoi (ctx.options ().get <std::string>(" timeframes-rate-limit-ipcid" ));
159160 DataProcessorSpec ccdbBackend{
160161 .name = " internal-dpl-ccdb-backend" ,
161162 .outputs = {},
@@ -230,23 +231,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
230231 ConfigParamSpec{" step-value-enumeration" , VariantType::Int64, 1ll , {" step between one value and the other" }}},
231232 .requiredServices = CommonServices::defaultServices (" O2FrameworkAnalysisSupport:RunSummary" )};
232233
233- // AOD reader can be rate limited
234- int rateLimitingIPCID = std::stoi (ctx.options ().get <std::string>(" timeframes-rate-limit-ipcid" ));
235- std::string rateLimitingChannelConfigInput;
236- std::string rateLimitingChannelConfigOutput;
237- bool internalRateLimiting = false ;
238-
239- // In case we have rate-limiting requested, any device without an input will get one on the special
240- // "DPL/RATE" message.
241- if (rateLimitingIPCID >= 0 ) {
242- rateLimitingChannelConfigInput = fmt::format (" name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" ,
243- ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
244- rateLimitingChannelConfigOutput = fmt::format (" name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" ,
245- ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
246- internalRateLimiting = true ;
247- aodReader.options .emplace_back (ConfigParamSpec{" channel-config" , VariantType::String, rateLimitingChannelConfigInput, {" how many timeframes can be in flight at the same time" }});
248- }
249-
250234 ctx.services ().registerService (ServiceRegistryHelpers::handleForService<DanglingEdgesContext>(new DanglingEdgesContext));
251235 auto & dec = ctx.services ().get <DanglingEdgesContext>();
252236
@@ -274,7 +258,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
274258 // A timeframeSink consumes timeframes without creating new
275259 // timeframe data.
276260 bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs;
277- if (std::stoi (ctx. options (). get <std::string>( " timeframes-rate-limit-ipcid " )) != -1 ) {
261+ if (rateLimitingIPCID != -1 ) {
278262 if (timeframeSink && processor.name .find (" internal-dpl-injected-dummy-sink" ) == std::string::npos) {
279263 O2_SIGNPOST_ID_GENERATE (sid, workflow_helpers);
280264 uint32_t hash = runtime_hash (processor.name .c_str ());
@@ -631,6 +615,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
631615 extraSpecs.push_back (CommonDataProcessors::getScheduledDummySink (ignored));
632616 } else {
633617 O2_SIGNPOST_EVENT_EMIT (workflow_helpers, sid, " injectServiceDevices" , " Injecting rate limited dummy sink" );
618+ std::string rateLimitingChannelConfigOutput;
619+ if (rateLimitingIPCID != -1 ) {
620+ rateLimitingChannelConfigOutput = fmt::format (" name=metric-feedback,type=push,method=bind,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0" , ChannelSpecHelpers::defaultIPCFolder (), rateLimitingIPCID);
621+ }
634622 extraSpecs.push_back (CommonDataProcessors::getDummySink (ignored, rateLimitingChannelConfigOutput));
635623 }
636624 }
0 commit comments