Skip to content

Commit e652841

Browse files
committed
DPL: remove direct dependency on fair::mq::Device from the FairMQDeviceProxy
1 parent abe259d commit e652841

File tree

4 files changed

+38
-20
lines changed

4 files changed

+38
-20
lines changed

Framework/Core/include/Framework/FairMQDeviceProxy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class FairMQDeviceProxy
3838
FairMQDeviceProxy() = default;
3939
FairMQDeviceProxy(FairMQDeviceProxy const&) = delete;
4040
void bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
41-
std::vector<ForwardRoute> const& forwards, fair::mq::Device& device);
41+
std::vector<ForwardRoute> const& forwards,
42+
std::function<fair::mq::Channel&(std::string const&)> bindChannelByName,
43+
std::function<bool(void)> newStateRequestedCallback);
4244

4345
/// Retrieve the transport associated to a given route.
4446
[[nodiscard]] OutputRoute const& getOutputRoute(RouteIndex routeIndex) const { return mOutputs.at(routeIndex.value); }

Framework/Core/src/CommonMessageBackends.cxx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,18 @@ o2::framework::ServiceSpec CommonMessageBackends::fairMQDeviceProxy()
5757
/// some of the channels are added only later on to the party,
5858
/// (e.g. by ECS) and Init might not be late enough to
5959
/// account for them.
60-
proxy->bind(outputs, inputs, forwards, *device); },
60+
std::function<fair::mq::Channel&(std::string const&)> bindByName = [device](std::string const& channelName) -> fair::mq::Channel& {
61+
auto channel = device->GetChannels().find(channelName);
62+
if (channel == device->GetChannels().end()) {
63+
LOGP(fatal, "Expected channel {} not configured.", channelName);
64+
}
65+
return channel->second.at(0);
66+
};
67+
68+
std::function<bool()> newStateCallback = [device]() -> bool {
69+
return device->NewStatePending();
70+
};
71+
proxy->bind(outputs, inputs, forwards, bindByName, newStateCallback); },
6172
};
6273
}
6374

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name,
10901090

10911091
channelNames->emplace_back(std::move(channel));
10921092
}
1093-
proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device);
1093+
std::function<fair::mq::Channel&(std::string const&)> bindByName = [device](std::string const& channelName) -> fair::mq::Channel& {
1094+
auto channel = device->GetChannels().find(channelName);
1095+
if (channel == device->GetChannels().end()) {
1096+
LOGP(fatal, "Expected channel {} not configured.", channelName);
1097+
}
1098+
return channel->second.at(0);
1099+
};
1100+
1101+
std::function<bool()> newStateCallback = [device]() -> bool {
1102+
return device->NewStatePending();
1103+
};
1104+
proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, bindByName, newStateCallback);
10941105
};
10951106
// We need to clear the channels on stop, because we will check and add them
10961107
auto channelConfigurationDisposer = [&deviceSpec]() {

Framework/Core/src/FairMQDeviceProxy.cxx

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createForwardMessage(Route
230230

231231
void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
232232
std::vector<ForwardRoute> const& forwards,
233-
fair::mq::Device& device)
233+
std::function<fair::mq::Channel&(std::string const&)> bindChannelByName,
234+
std::function<bool(void)> newStatePending)
234235
{
235236
mOutputs.clear();
236237
mOutputRoutes.clear();
@@ -258,14 +259,11 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
258259
if (channelPos == channelNameToChannel.end()) {
259260
channelIndex = ChannelIndex{(int)mOutputChannelInfos.size()};
260261
ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
261-
auto channel = device.GetChannels().find(route.channel);
262-
if (channel == device.GetChannels().end()) {
263-
LOGP(fatal, "Expected channel {} not configured.", route.channel);
264-
}
262+
auto& channel = bindChannelByName(route.channel);
265263
OutputChannelInfo info{
266264
.name = route.channel,
267265
.channelType = dplChannel,
268-
.channel = channel->second.at(0),
266+
.channel = channel,
269267
.policy = route.policy,
270268
.index = channelIndex,
271269
};
@@ -305,11 +303,9 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
305303

306304
if (channelPos == channelNameToChannel.end()) {
307305
channelIndex = ChannelIndex{(int)mInputChannels.size()};
308-
auto channel = device.GetChannels().find(route.sourceChannel);
309-
if (channel == device.GetChannels().end()) {
310-
LOGP(fatal, "Expected channel {} not configured.", route.sourceChannel);
311-
}
312-
mInputChannels.push_back(&channel->second.at(0));
306+
fair::mq::Channel& channel = bindChannelByName(route.sourceChannel);
307+
308+
mInputChannels.push_back(&channel);
313309
mInputChannelNames.push_back(route.sourceChannel);
314310
channelNameToChannel[route.sourceChannel] = channelIndex;
315311
LOGP(detail, "Binding channel {} to channel index {}", route.sourceChannel, channelIndex.value);
@@ -341,12 +337,10 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
341337

342338
if (channelPos == channelNameToChannel.end()) {
343339
channelIndex = ChannelIndex{(int)mForwardChannelInfos.size()};
344-
auto channel = device.GetChannels().find(route.channel);
345-
if (channel == device.GetChannels().end()) {
346-
LOGP(fatal, "Expected channel {} not configured.", route.channel);
347-
}
340+
auto& channel = bindChannelByName(route.channel);
341+
348342
ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
349-
mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel->second.at(0), .policy = route.policy, .index = channelIndex});
343+
mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel, .policy = route.policy, .index = channelIndex});
350344
mForwardChannelStates.push_back(ForwardChannelState{0});
351345
channelNameToChannel[route.channel] = channelIndex;
352346
LOGP(detail, "Binding forward channel {} to channel index {}", route.channel, channelIndex.value);
@@ -368,6 +362,6 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
368362
LOGP(detail, "Forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi, state.channel.value);
369363
}
370364
}
371-
mStateChangeCallback = [&device]() -> bool { return device.NewStatePending(); };
365+
mStateChangeCallback = newStatePending;
372366
}
373367
} // namespace o2::framework

0 commit comments

Comments
 (0)