@@ -57,13 +57,16 @@ std::vector<DataDescriptorMatcher> createInputMatchers(std::vector<InputRoute> c
5757 for (auto & route : routes) {
5858 DataDescriptorMatcher matcher{
5959 DataDescriptorMatcher::Op::And,
60- OriginValueMatcher{ route. matcher . origin . str },
60+ StartTimeValueMatcher{ ContextRef{ 0 } },
6161 std::make_unique<DataDescriptorMatcher>(
6262 DataDescriptorMatcher::Op::And,
63- DescriptionValueMatcher { route.matcher .description .str },
63+ OriginValueMatcher { route.matcher .origin .str },
6464 std::make_unique<DataDescriptorMatcher>(
65- DataDescriptorMatcher::Op::Just,
66- SubSpecificationTypeValueMatcher{ route.matcher .subSpec }))
65+ DataDescriptorMatcher::Op::And,
66+ DescriptionValueMatcher{ route.matcher .description .str },
67+ std::make_unique<DataDescriptorMatcher>(
68+ DataDescriptorMatcher::Op::Just,
69+ SubSpecificationTypeValueMatcher{ route.matcher .subSpec })))
6770 };
6871 result.emplace_back (std::move (matcher));
6972 }
@@ -132,21 +135,14 @@ void DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& ex
132135// / This does the mapping between a route and a InputSpec. The
133136// / reason why these might diffent is that when you have timepipelining
134137// / you have one route per timeslice, even if the type is the same.
135- size_t
136- assignInputSpecId (void * data, std::vector<DataDescriptorMatcher> const & matchers)
138+ size_t assignInputSpecId (void * data,
139+ std::vector<DataDescriptorMatcher> const & matchers,
140+ VariableContext& context)
137141{
138- // / FIXME: for the moment we have a global context, since we do not support
139- // / yet generic matchers as InputSpec.
140- VariableContext context;
141-
142142 for (size_t ri = 0 , re = matchers.size (); ri < re; ++ri) {
143143 auto & matcher = matchers[ri];
144- const DataHeader* h = o2::header::get<DataHeader*>(data);
145- if (h == nullptr ) {
146- return re;
147- }
148144
149- if (matcher.match (*h , context)) {
145+ if (matcher.match (reinterpret_cast < char const *>(data) , context)) {
150146 context.commit ();
151147 return ri;
152148 }
@@ -175,31 +171,30 @@ DataRelayer::relay(std::unique_ptr<FairMQMessage> &&header,
175171 // This returns the identifier for the given input. We use a separate
176172 // function because while it's trivial now, the actual matchmaking will
177173 // become more complicated when we will start supporting ranges.
178- auto getInput = [&matchers = mInputMatchers ,&header] () -> int {
179- return assignInputSpecId (header->GetData (), matchers);
180- };
181-
182- // This will check if the input is valid. We hide the details so that
183- // in principle the outer code will work regardless of the actual
184- // implementation.
185- auto isValidInput = [](int inputIdx) {
186- // If this is true, it means the message we got does
187- // not match any of the expected inputs.
188- return inputIdx != INVALID_INPUT;
189- };
174+ auto getInputAndTimeslice = [& matchers = mInputMatchers ,
175+ &header,
176+ &index,
177+ &contexts = mVariableContextes ]()
178+ ->std ::tuple<int , TimesliceId>
179+ {
180+ // / FIXME: for the moment we only use the first context and reset
181+ // / between one invokation and the other.
182+ assert (contexts.empty () == false );
183+ for (auto & context : contexts) {
184+ context.reset ();
185+ auto input = assignInputSpecId (header->GetData (), matchers, context);
186+
187+ if (input == INVALID_INPUT) {
188+ return { INVALID_INPUT, TimesliceId{ TimesliceId::INVALID } };
189+ }
190190
191- // The timeslice is embedded in the DataProcessingHeader header of the O2
192- // header stack. This is an extension to the DataHeader, because apparently
193- // we do have data which comes without a timestamp, although I am personally
194- // not sure what that would be.
195- const auto getTimeslice = [&header, &index]() -> TimesliceId {
196- const DataProcessingHeader* dph = o2::header::get<DataProcessingHeader*>(header->GetData ());
197- if (dph == nullptr ) {
198- return TimesliceId{ TimesliceId::INVALID };
191+ // / The first argument is always matched against the data start time, so
192+ // / we can assert it's the same as the dph->startTime
193+ if (auto pval = std::get_if<uint64_t >(&context.get (0 ))) {
194+ return { input, TimesliceId{ *pval } };
195+ }
199196 }
200- size_t timesliceId = dph->startTime ;
201- assert (index.size ());
202- return TimesliceId{ timesliceId };
197+ return { INVALID_INPUT, TimesliceId{ TimesliceId::INVALID } };
203198 };
204199
205200 // A cache line is obsolete if the incoming one has a greater timestamp or
@@ -253,21 +248,20 @@ DataRelayer::relay(std::unique_ptr<FairMQMessage> &&header,
253248 //
254249 // This is the actual outer loop processing input as part of a given
255250 // timeslice. All the other implementation details are hidden by the lambdas
256- auto input = getInput ();
251+ auto [ input, timeslice] = getInputAndTimeslice ();
257252
258- if (isValidInput ( input) == false ) {
253+ if (input == INVALID_INPUT ) {
259254 LOG (ERROR) << " A malformed message just arrived" ;
260255 return WillNotRelay;
261256 }
262257
263- auto timeslice = getTimeslice ();
264- auto slot = index.getSlotForTimeslice (timeslice);
265- LOG (DEBUG) << " Received timeslice" << timeslice.value ;
266258 if (TimesliceId::isValid (timeslice) == false ) {
267259 LOG (ERROR) << " Could not determine the timeslice for input" ;
268260 return WillNotRelay;
269261 }
262+ LOG (DEBUG) << " Received timeslice " << timeslice.value ;
270263
264+ assert (index.size ());
271265 if (index.isObsolete (timeslice)) {
272266 LOG (ERROR) << " An entry for timeslice " << timeslice.value << " just arrived but too late to be processed" ;
273267 return WillNotRelay;
@@ -415,6 +409,7 @@ DataRelayer::getParallelTimeslices() const {
415409void
416410DataRelayer::setPipelineLength (size_t s) {
417411 mTimesliceIndex .resize (s);
412+ mVariableContextes .resize (s);
418413 auto numInputTypes = mDistinctRoutesIndex .size ();
419414 assert (numInputTypes);
420415 mCache .resize (numInputTypes * mTimesliceIndex .size ());
0 commit comments