Skip to content

Commit 1485ea0

Browse files
committed
DPL: test for ConsumeExisting when forwarding
1 parent 7b3397d commit 1485ea0

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,60 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute")
107107
REQUIRE(result[0].Size() == 2); // Two messages for that route
108108
}
109109

110+
TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
111+
{
112+
o2::header::DataHeader dh;
113+
dh.dataOrigin = "TST";
114+
dh.dataDescription = "A";
115+
dh.subSpecification = 0;
116+
dh.splitPayloadIndex = 0;
117+
dh.splitPayloadParts = 1;
118+
119+
o2::framework::DataProcessingHeader dph{0, 1};
120+
std::vector<fair::mq::Channel> channels{
121+
fair::mq::Channel("from_A_to_B")};
122+
123+
bool copyByDefault = false;
124+
FairMQDeviceProxy proxy;
125+
std::vector<ForwardRoute> routes{ForwardRoute{
126+
.timeslice = 0,
127+
.maxTimeslices = 1,
128+
.matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
129+
.channel = "from_A_to_B",
130+
.policy = nullptr,
131+
}};
132+
133+
auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
134+
for (auto& channel : channels) {
135+
if (channel.GetName() == channelName) {
136+
return channel;
137+
}
138+
}
139+
throw std::runtime_error("Channel not found");
140+
};
141+
142+
proxy.bind({}, {}, routes, findChannelByName, nullptr);
143+
144+
TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
145+
std::vector<MessageSet> currentSetOfInputs;
146+
MessageSet messageSet;
147+
148+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
149+
fair::mq::MessagePtr payload(nullptr);
150+
REQUIRE(payload.get() == nullptr);
151+
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
152+
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
153+
messageSet.add(PartRef{std::move(header), std::move(payload)});
154+
REQUIRE(messageSet.size() == 1);
155+
currentSetOfInputs.emplace_back(std::move(messageSet));
156+
157+
TimesliceSlot slot{0};
158+
159+
auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, true);
160+
REQUIRE(result.size() == 1);
161+
REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed.
162+
}
163+
110164
TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
111165
{
112166
o2::header::DataHeader dh;

0 commit comments

Comments
 (0)