Skip to content

Commit e79fdd2

Browse files
committed
DPL: allow matching on DataProcessingHeader::startTime
Refactoring the whole process scheduling logic to use this will allow supporting sub-timeframe and above timeframe granularities for messages.
1 parent fb16d23 commit e79fdd2

File tree

2 files changed

+133
-16
lines changed

2 files changed

+133
-16
lines changed

Framework/Core/include/Framework/DataDescriptorMatcher.h

Lines changed: 100 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
#define o2_framework_DataDescriptorMatcher_H_INCLUDED
1212

1313
#include "Framework/InputSpec.h"
14+
#include "Framework/DataProcessingHeader.h"
1415
#include "Headers/DataHeader.h"
16+
#include "Headers/Stack.h"
1517

1618
#include <cstdint>
1719
#include <string>
@@ -45,7 +47,7 @@ struct ContextRef {
4547
/// it in the O2 DataHeader, however we could add it later on.
4648
struct ContextElement {
4749
std::string label; /// The name of the variable contained in this element.
48-
std::variant<uint64_t, std::string, None> value; /// The actual contents of the element.
50+
std::variant<uint64_t, std::string, None> value = None{}; /// The actual contents of the element.
4951
};
5052

5153
/// Can hold either an actual value of type T or a reference to
@@ -187,7 +189,55 @@ class SubSpecificationTypeValueMatcher : public ValueHolder<uint64_t>
187189
}
188190
};
189191

190-
/// Something which can be matched against a header::SubSpecificationType
192+
/// Matcher on actual time, as reported in the DataProcessingHeader
193+
class StartTimeValueMatcher : public ValueHolder<uint64_t>
194+
{
195+
public:
196+
StartTimeValueMatcher(ContextRef variableId, uint64_t scale = 1)
197+
: ValueHolder{ variableId },
198+
mScale{ scale }
199+
{
200+
}
201+
202+
/// The passed string @a s is the expected numerical value for
203+
/// the SubSpecification type.
204+
StartTimeValueMatcher(std::string const& s, uint64_t scale = 1)
205+
: ValueHolder<uint64_t>{ strtoull(s.c_str(), nullptr, 10) },
206+
mScale{ scale }
207+
{
208+
}
209+
210+
/// This means that the matcher is looking for a constant.
211+
/// We will divide the input by scale so that we can map
212+
/// quantities with different granularities to the same record.
213+
StartTimeValueMatcher(uint64_t v, uint64_t scale = 1)
214+
: ValueHolder<uint64_t>{ v / scale },
215+
mScale{ scale }
216+
{
217+
}
218+
219+
/// This will match the timing information which is currently in
220+
/// the DataProcessingHeader. Notice how we apply the scale to the
221+
/// actual values found.
222+
bool match(DataProcessingHeader const& dph, std::vector<ContextElement>& context) const
223+
{
224+
if (auto ref = std::get_if<ContextRef>(&mValue)) {
225+
auto& variable = context.at(ref->index);
226+
if (auto value = std::get_if<uint64_t>(&variable.value)) {
227+
return (dph.startTime / mScale) == *value;
228+
}
229+
variable.value = dph.startTime / mScale;
230+
return true;
231+
} else if (auto v = std::get_if<uint64_t>(&mValue)) {
232+
return (dph.startTime / mScale) == *v;
233+
}
234+
throw std::runtime_error("Mismatching type for variable");
235+
}
236+
237+
private:
238+
uint64_t mScale;
239+
};
240+
191241
class ConstantValueMatcher
192242
{
193243
public:
@@ -198,7 +248,7 @@ class ConstantValueMatcher
198248
mValue = value;
199249
}
200250

201-
bool match(header::DataHeader const& header) const
251+
bool match() const
202252
{
203253
return mValue;
204254
}
@@ -232,7 +282,7 @@ struct DescriptorMatcherTrait<header::DataHeader::SubSpecificationType> {
232282
};
233283

234284
class DataDescriptorMatcher;
235-
using Node = std::variant<OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr<DataDescriptorMatcher>, ConstantValueMatcher>;
285+
using Node = std::variant<OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr<DataDescriptorMatcher>, ConstantValueMatcher, StartTimeValueMatcher>;
236286

237287
// A matcher for a given O2 Data Model descriptor. We use a variant to hold
238288
// the different kind of matchers so that we can have a hierarchy or
@@ -298,10 +348,22 @@ class DataDescriptorMatcher
298348
dh.dataDescription = spec.description;
299349
dh.subSpecification = spec.subSpec;
300350

301-
return this->match(dh, context);
351+
return this->match(reinterpret_cast<char const*>(&dh), context);
352+
}
353+
354+
bool match(header::DataHeader const& header, std::vector<ContextElement>& context) const
355+
{
356+
return this->match(reinterpret_cast<char const*>(&header), context);
357+
}
358+
359+
bool match(header::Stack const& stack, std::vector<ContextElement>& context) const
360+
{
361+
return this->match(reinterpret_cast<char const*>(stack.data()), context);
302362
}
303363

304-
bool match(header::DataHeader const& d, std::vector<ContextElement>& context) const
364+
// actual polymorphic matcher which is able to cast the pointer to the correct
365+
// kind of header.
366+
bool match(char const* d, std::vector<ContextElement>& context) const
305367
{
306368
bool leftValue = false, rightValue = false;
307369

@@ -331,15 +393,33 @@ class DataDescriptorMatcher
331393
// }
332394
// When we drop support for macOS 10.13
333395
if (auto pval0 = std::get_if<OriginValueMatcher>(&mLeft)) {
334-
leftValue = pval0->match(d, context);
396+
auto dh = o2::header::get<header::DataHeader*>(d);
397+
if (dh == nullptr) {
398+
throw std::runtime_error("Cannot find DataHeader");
399+
}
400+
leftValue = pval0->match(*dh, context);
335401
} else if (auto pval1 = std::get_if<DescriptionValueMatcher>(&mLeft)) {
336-
leftValue = pval1->match(d, context);
402+
auto dh = o2::header::get<header::DataHeader*>(d);
403+
if (dh == nullptr) {
404+
throw std::runtime_error("Cannot find DataHeader");
405+
}
406+
leftValue = pval1->match(*dh, context);
337407
} else if (auto pval2 = std::get_if<SubSpecificationTypeValueMatcher>(&mLeft)) {
338-
leftValue = pval2->match(d, context);
408+
auto dh = o2::header::get<header::DataHeader*>(d);
409+
if (dh == nullptr) {
410+
throw std::runtime_error("Cannot find DataHeader");
411+
}
412+
leftValue = pval2->match(*dh, context);
339413
} else if (auto pval3 = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&mLeft)) {
340414
leftValue = (*pval3)->match(d, context);
341415
} else if (auto pval4 = std::get_if<ConstantValueMatcher>(&mLeft)) {
342-
leftValue = pval4->match(d);
416+
leftValue = pval4->match();
417+
} else if (auto pval5 = std::get_if<StartTimeValueMatcher>(&mLeft)) {
418+
auto dph = o2::header::get<DataProcessingHeader*>(d);
419+
if (dph == nullptr) {
420+
throw std::runtime_error("Cannot find DataProcessingHeader");
421+
}
422+
leftValue = pval5->match(*dph, context);
343423
} else {
344424
throw std::runtime_error("Bad parsing tree");
345425
}
@@ -355,15 +435,21 @@ class DataDescriptorMatcher
355435
}
356436

357437
if (auto pval0 = std::get_if<OriginValueMatcher>(&mRight)) {
358-
rightValue = pval0->match(d, context);
438+
auto dh = o2::header::get<header::DataHeader*>(d);
439+
rightValue = pval0->match(*dh, context);
359440
} else if (auto pval1 = std::get_if<DescriptionValueMatcher>(&mRight)) {
360-
rightValue = pval1->match(d, context);
441+
auto dh = o2::header::get<header::DataHeader*>(d);
442+
rightValue = pval1->match(*dh, context);
361443
} else if (auto pval2 = std::get_if<SubSpecificationTypeValueMatcher>(&mRight)) {
362-
rightValue = pval2->match(d, context);
444+
auto dh = o2::header::get<header::DataHeader*>(d);
445+
rightValue = pval2->match(*dh, context);
363446
} else if (auto pval3 = std::get_if<std::unique_ptr<DataDescriptorMatcher>>(&mRight)) {
364447
rightValue = (*pval3)->match(d, context);
365448
} else if (auto pval4 = std::get_if<ConstantValueMatcher>(&mRight)) {
366-
rightValue = pval4->match(d);
449+
rightValue = pval4->match();
450+
} else if (auto pval5 = std::get_if<StartTimeValueMatcher>(&mRight)) {
451+
auto dph = o2::header::get<DataProcessingHeader*>(d);
452+
rightValue = pval5->match(*dph, context);
367453
}
368454
// There are cases in which not having a rightValue might be legitimate,
369455
// so we do not throw an exception.

Framework/Core/test/test_DataDescriptorMatcher.cxx

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "Framework/DataDescriptorQueryBuilder.h"
1717

1818
#include <boost/test/unit_test.hpp>
19+
#include <variant>
1920

2021
using namespace o2::framework;
2122
using namespace o2::header;
@@ -296,7 +297,7 @@ BOOST_AUTO_TEST_CASE(TestQueryBuilder)
296297
// This checks matching using variables
297298
BOOST_AUTO_TEST_CASE(TestMatchingVariables)
298299
{
299-
std::vector<ContextElement> context(1);
300+
std::vector<ContextElement> context(2);
300301

301302
DataDescriptorMatcher matcher{
302303
DataDescriptorMatcher::Op::And,
@@ -306,7 +307,7 @@ BOOST_AUTO_TEST_CASE(TestMatchingVariables)
306307
DescriptionValueMatcher{ "CLUSTERS" },
307308
std::make_unique<DataDescriptorMatcher>(
308309
DataDescriptorMatcher::Op::And,
309-
SubSpecificationTypeValueMatcher{ 1 },
310+
SubSpecificationTypeValueMatcher{ ContextRef{ 1 } },
310311
ConstantValueMatcher{ true }))
311312
};
312313

@@ -319,6 +320,9 @@ BOOST_AUTO_TEST_CASE(TestMatchingVariables)
319320
auto s = std::get_if<std::string>(&context[0].value);
320321
BOOST_CHECK(s != nullptr);
321322
BOOST_CHECK(*s == "TPC");
323+
auto v = std::get_if<uint64_t>(&context[1].value);
324+
BOOST_CHECK(v != nullptr);
325+
BOOST_CHECK(*v == 1);
322326

323327
// This will not match, because ContextRef{0} is bound
324328
// to TPC already.
@@ -384,3 +388,30 @@ BOOST_AUTO_TEST_CASE(TestInputSpecMatching)
384388
BOOST_CHECK(matcher2.match(spec3, context) == false);
385389
BOOST_CHECK(matcher2.match(spec4, context) == true);
386390
}
391+
392+
BOOST_AUTO_TEST_CASE(TestStartTimeMatching)
393+
{
394+
std::vector<ContextElement> context(1);
395+
396+
DataDescriptorMatcher matcher{
397+
DataDescriptorMatcher::Op::Just,
398+
StartTimeValueMatcher{ ContextRef{ 0 } }
399+
};
400+
401+
DataHeader dh;
402+
dh.dataOrigin = "TPC";
403+
dh.dataDescription = "CLUSTERS";
404+
dh.subSpecification = 1;
405+
406+
DataProcessingHeader dph;
407+
dph.startTime = 123;
408+
409+
Stack s{ dh, dph };
410+
auto s2dph = o2::header::get<DataProcessingHeader*>(s.data());
411+
BOOST_CHECK(s2dph != nullptr);
412+
BOOST_CHECK_EQUAL(s2dph->startTime, 123);
413+
BOOST_CHECK(matcher.match(s, context) == true);
414+
auto vPtr = std::get_if<uint64_t>(&context[0].value);
415+
BOOST_REQUIRE(vPtr != nullptr);
416+
BOOST_CHECK_EQUAL(*vPtr, 123);
417+
}

0 commit comments

Comments
 (0)