@@ -230,7 +230,8 @@ std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createForwardMessage(Route
230230
231231void FairMQDeviceProxy::bind (std::vector<OutputRoute> const & outputs, std::vector<InputRoute> const & inputs,
232232 std::vector<ForwardRoute> const & forwards,
233- fair::mq::Device& device)
233+ std::function<fair::mq::Channel&(std::string const &)> bindChannelByName,
234+ std::function<bool(void )> newStatePending)
234235{
235236 mOutputs .clear ();
236237 mOutputRoutes .clear ();
@@ -258,14 +259,11 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
258259 if (channelPos == channelNameToChannel.end ()) {
259260 channelIndex = ChannelIndex{(int )mOutputChannelInfos .size ()};
260261 ChannelAccountingType dplChannel = (route.channel .rfind (" from_" , 0 ) == 0 ) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
261- auto channel = device.GetChannels ().find (route.channel );
262- if (channel == device.GetChannels ().end ()) {
263- LOGP (fatal, " Expected channel {} not configured." , route.channel );
264- }
262+ auto & channel = bindChannelByName (route.channel );
265263 OutputChannelInfo info{
266264 .name = route.channel ,
267265 .channelType = dplChannel,
268- .channel = channel-> second . at ( 0 ) ,
266+ .channel = channel,
269267 .policy = route.policy ,
270268 .index = channelIndex,
271269 };
@@ -305,11 +303,9 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
305303
306304 if (channelPos == channelNameToChannel.end ()) {
307305 channelIndex = ChannelIndex{(int )mInputChannels .size ()};
308- auto channel = device.GetChannels ().find (route.sourceChannel );
309- if (channel == device.GetChannels ().end ()) {
310- LOGP (fatal, " Expected channel {} not configured." , route.sourceChannel );
311- }
312- mInputChannels .push_back (&channel->second .at (0 ));
306+ fair::mq::Channel& channel = bindChannelByName (route.sourceChannel );
307+
308+ mInputChannels .push_back (&channel);
313309 mInputChannelNames .push_back (route.sourceChannel );
314310 channelNameToChannel[route.sourceChannel ] = channelIndex;
315311 LOGP (detail, " Binding channel {} to channel index {}" , route.sourceChannel , channelIndex.value );
@@ -341,12 +337,10 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
341337
342338 if (channelPos == channelNameToChannel.end ()) {
343339 channelIndex = ChannelIndex{(int )mForwardChannelInfos .size ()};
344- auto channel = device.GetChannels ().find (route.channel );
345- if (channel == device.GetChannels ().end ()) {
346- LOGP (fatal, " Expected channel {} not configured." , route.channel );
347- }
340+ auto & channel = bindChannelByName (route.channel );
341+
348342 ChannelAccountingType dplChannel = (route.channel .rfind (" from_" , 0 ) == 0 ) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
349- mForwardChannelInfos .push_back (ForwardChannelInfo{.name = route.channel , .channelType = dplChannel, .channel = channel-> second . at ( 0 ) , .policy = route.policy , .index = channelIndex});
343+ mForwardChannelInfos .push_back (ForwardChannelInfo{.name = route.channel , .channelType = dplChannel, .channel = channel, .policy = route.policy , .index = channelIndex});
350344 mForwardChannelStates .push_back (ForwardChannelState{0 });
351345 channelNameToChannel[route.channel ] = channelIndex;
352346 LOGP (detail, " Binding forward channel {} to channel index {}" , route.channel , channelIndex.value );
@@ -368,6 +362,6 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
368362 LOGP (detail, " Forward route {}@{}%{} to index {} and channelIndex {}" , DataSpecUtils::describe (route.matcher ), route.timeslice , route.maxTimeslices , fi, state.channel .value );
369363 }
370364 }
371- mStateChangeCallback = [&device]() -> bool { return device. NewStatePending (); } ;
365+ mStateChangeCallback = newStatePending ;
372366}
373367} // namespace o2::framework
0 commit comments