Skip to content

Commit 40d49d5

Browse files
committed
DPL: allow discarding context updates after a partial match
1 parent e79fdd2 commit 40d49d5

File tree

5 files changed

+203
-43
lines changed

5 files changed

+203
-43
lines changed

Framework/Core/include/Framework/DataDescriptorMatcher.h

Lines changed: 86 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "Headers/DataHeader.h"
1616
#include "Headers/Stack.h"
1717

18+
#include <array>
1819
#include <cstdint>
1920
#include <string>
2021
#include <variant>
@@ -46,8 +47,72 @@ struct ContextRef {
4647
/// We do not have any float in the value, because AFAICT there is no need for
4748
/// it in the O2 DataHeader, however we could add it later on.
4849
struct ContextElement {
50+
using Value = std::variant<uint64_t, std::string, None>;
4951
std::string label; /// The name of the variable contained in this element.
50-
std::variant<uint64_t, std::string, None> value = None{}; /// The actual contents of the element.
52+
Value value = None{}; /// The actual contents of the element.
53+
};
54+
55+
struct ContextUpdate {
56+
size_t position;
57+
ContextElement::Value newValue;
58+
};
59+
60+
constexpr int MAX_MATCHING_VARIABLE = 16;
61+
constexpr int MAX_UPDATES_PER_QUERY = 16;
62+
63+
class VariableContext
64+
{
65+
public:
66+
VariableContext()
67+
: mPerformedUpdates{ 0 }
68+
{
69+
}
70+
71+
ContextElement::Value const& get(size_t pos) const
72+
{
73+
// First we check if there is any pending update
74+
for (size_t i = 0; i < mPerformedUpdates; ++i) {
75+
if (mUpdates[i].position == pos) {
76+
return mUpdates[i].newValue;
77+
}
78+
}
79+
// Otherwise we return the element.
80+
return mElements.at(pos).value;
81+
}
82+
83+
void put(ContextUpdate&& update)
84+
{
85+
mUpdates[mPerformedUpdates++] = std::move(update);
86+
}
87+
88+
/// Use this after a query to actually commit the matched fields. Notice the
89+
/// old matches remain there, but we do not need to clean them up as we have
90+
/// reset the counter. Use this after a successful query to persist matches
91+
/// variables and speedup subsequent lookups.
92+
void commit()
93+
{
94+
for (size_t i = 0; i < mPerformedUpdates; ++i) {
95+
mElements[mUpdates[i].position].value = mUpdates[i].newValue;
96+
}
97+
mPerformedUpdates = 0;
98+
}
99+
100+
/// Discard the updates. Use this after a failed query if you do not want to
101+
/// retain partial matches.
102+
void discard()
103+
{
104+
mPerformedUpdates = 0;
105+
}
106+
107+
private:
108+
/* We make this class fixed size to avoid memory churning while
109+
matching as much as posible when doing the matching, as that might become
110+
performance critical. Given we will have only a few of these (one per
111+
cacheline of the input messages) it should not be critical memory wise.
112+
*/
113+
std::array<ContextElement, MAX_MATCHING_VARIABLE> mElements;
114+
std::array<ContextUpdate, MAX_UPDATES_PER_QUERY> mUpdates;
115+
int mPerformedUpdates;
51116
};
52117

53118
/// Can hold either an actual value of type T or a reference to
@@ -103,15 +168,15 @@ class OriginValueMatcher : public ValueHolder<std::string>
103168
{
104169
}
105170

106-
bool match(header::DataHeader const& header, std::vector<ContextElement>& context) const
171+
bool match(header::DataHeader const& header, VariableContext& context) const
107172
{
108173
if (auto ref = std::get_if<ContextRef>(&mValue)) {
109-
auto& variable = context.at(ref->index);
110-
if (auto value = std::get_if<std::string>(&variable.value)) {
174+
auto& variable = context.get(ref->index);
175+
if (auto value = std::get_if<std::string>(&variable)) {
111176
return strncmp(header.dataOrigin.str, value->c_str(), 4) == 0;
112177
}
113178
auto maxSize = strnlen(header.dataOrigin.str, 4);
114-
variable.value = std::string(header.dataOrigin.str, maxSize);
179+
context.put({ ref->index, std::string(header.dataOrigin.str, maxSize) });
115180
return true;
116181
} else if (auto s = std::get_if<std::string>(&mValue)) {
117182
return strncmp(header.dataOrigin.str, s->c_str(), 4) == 0;
@@ -134,15 +199,15 @@ class DescriptionValueMatcher : public ValueHolder<std::string>
134199
{
135200
}
136201

137-
bool match(header::DataHeader const& header, std::vector<ContextElement>& context) const
202+
bool match(header::DataHeader const& header, VariableContext& context) const
138203
{
139204
if (auto ref = std::get_if<ContextRef>(&mValue)) {
140-
auto& variable = context.at(ref->index);
141-
if (auto value = std::get_if<std::string>(&variable.value)) {
205+
auto& variable = context.get(ref->index);
206+
if (auto value = std::get_if<std::string>(&variable)) {
142207
return strncmp(header.dataDescription.str, value->c_str(), 16) == 0;
143208
}
144209
auto maxSize = strnlen(header.dataDescription.str, 16);
145-
variable.value = std::string(header.dataDescription.str, maxSize);
210+
context.put({ ref->index, std::string(header.dataDescription.str, maxSize) });
146211
return true;
147212
} else if (auto s = std::get_if<std::string>(&this->mValue)) {
148213
return strncmp(header.dataDescription.str, s->c_str(), 16) == 0;
@@ -173,14 +238,14 @@ class SubSpecificationTypeValueMatcher : public ValueHolder<uint64_t>
173238
{
174239
}
175240

176-
bool match(header::DataHeader const& header, std::vector<ContextElement>& context) const
241+
bool match(header::DataHeader const& header, VariableContext& context) const
177242
{
178243
if (auto ref = std::get_if<ContextRef>(&mValue)) {
179-
auto& variable = context.at(ref->index);
180-
if (auto value = std::get_if<uint64_t>(&variable.value)) {
244+
auto& variable = context.get(ref->index);
245+
if (auto value = std::get_if<uint64_t>(&variable)) {
181246
return header.subSpecification == *value;
182247
}
183-
variable.value = header.subSpecification;
248+
context.put({ ref->index, header.subSpecification });
184249
return true;
185250
} else if (auto v = std::get_if<uint64_t>(&mValue)) {
186251
return header.subSpecification == *v;
@@ -219,14 +284,14 @@ class StartTimeValueMatcher : public ValueHolder<uint64_t>
219284
/// This will match the timing information which is currently in
220285
/// the DataProcessingHeader. Notice how we apply the scale to the
221286
/// actual values found.
222-
bool match(DataProcessingHeader const& dph, std::vector<ContextElement>& context) const
287+
bool match(DataProcessingHeader const& dph, VariableContext& context) const
223288
{
224289
if (auto ref = std::get_if<ContextRef>(&mValue)) {
225-
auto& variable = context.at(ref->index);
226-
if (auto value = std::get_if<uint64_t>(&variable.value)) {
290+
auto& variable = context.get(ref->index);
291+
if (auto value = std::get_if<uint64_t>(&variable)) {
227292
return (dph.startTime / mScale) == *value;
228293
}
229-
variable.value = dph.startTime / mScale;
294+
context.put({ ref->index, dph.startTime / mScale });
230295
return true;
231296
} else if (auto v = std::get_if<uint64_t>(&mValue)) {
232297
return (dph.startTime / mScale) == *v;
@@ -341,7 +406,7 @@ class DataDescriptorMatcher
341406

342407
/// @return true if the (sub-)query associated to this matcher will
343408
/// match the provided @a spec, false otherwise.
344-
bool match(InputSpec const& spec, std::vector<ContextElement>& context) const
409+
bool match(InputSpec const& spec, VariableContext& context) const
345410
{
346411
header::DataHeader dh;
347412
dh.dataOrigin = spec.origin;
@@ -351,19 +416,19 @@ class DataDescriptorMatcher
351416
return this->match(reinterpret_cast<char const*>(&dh), context);
352417
}
353418

354-
bool match(header::DataHeader const& header, std::vector<ContextElement>& context) const
419+
bool match(header::DataHeader const& header, VariableContext& context) const
355420
{
356421
return this->match(reinterpret_cast<char const*>(&header), context);
357422
}
358423

359-
bool match(header::Stack const& stack, std::vector<ContextElement>& context) const
424+
bool match(header::Stack const& stack, VariableContext& context) const
360425
{
361426
return this->match(reinterpret_cast<char const*>(stack.data()), context);
362427
}
363428

364429
// actual polymorphic matcher which is able to cast the pointer to the correct
365430
// kind of header.
366-
bool match(char const* d, std::vector<ContextElement>& context) const
431+
bool match(char const* d, VariableContext& context) const
367432
{
368433
bool leftValue = false, rightValue = false;
369434

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ DataProcessorSpec CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec>
4646

4747
bool hasOutputsToWrite = false;
4848
auto [variables, outputMatcher] = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString);
49-
std::vector<ContextElement> context(variables.size());
49+
VariableContext context;
5050
for (auto& spec : danglingOutputInputs) {
5151
if (outputMatcher->match(spec, context)) {
5252
hasOutputsToWrite = true;
@@ -65,8 +65,8 @@ DataProcessorSpec CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec>
6565
});
6666
}
6767
auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);
68-
return std::move([ output, matcher = outputMatcher, contextSize = variables.size() ](ProcessingContext & pc) mutable->void {
69-
std::vector<ContextElement> matchingContext(contextSize);
68+
return std::move([ output, matcher = outputMatcher ](ProcessingContext & pc) mutable->void {
69+
VariableContext matchingContext;
7070
LOG(INFO) << "processing data set with " << pc.inputs().size() << " entries";
7171
for (const auto& entry : pc.inputs()) {
7272
LOG(INFO) << " " << *(entry.spec);

Framework/Core/src/DataRelayer.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ size_t
137137
{
138138
/// FIXME: for the moment we have a global context, since we do not support
139139
/// yet generic matchers as InputSpec.
140-
std::vector<ContextElement> context{};
140+
VariableContext context;
141141

142142
for (size_t ri = 0, re = matchers.size(); ri < re; ++ri) {
143143
auto& matcher = matchers[ri];
@@ -147,8 +147,10 @@ size_t
147147
}
148148

149149
if (matcher.match(*h, context)) {
150+
context.commit();
150151
return ri;
151152
}
153+
context.discard();
152154
}
153155
return matchers.size();
154156
}

Framework/Core/test/benchmark_DataDescriptorMatcher.cxx

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ static void BM_MatchedSingleQuery(benchmark::State& state)
1717
OriginValueMatcher{ "TRD" }
1818
};
1919

20-
std::vector<ContextElement> context;
20+
VariableContext context;
2121

2222
for (auto _ : state) {
2323
matcher.match(header, context);
@@ -45,7 +45,7 @@ static void BM_MatchedFullQuery(benchmark::State& state)
4545
ConstantValueMatcher{ true }))
4646
};
4747

48-
std::vector<ContextElement> context;
48+
VariableContext context;
4949

5050
for (auto _ : state) {
5151
matcher.match(header, context);
@@ -73,7 +73,7 @@ static void BM_UnmatchedSingleQuery(benchmark::State& state)
7373
ConstantValueMatcher{ true }))
7474
};
7575

76-
std::vector<ContextElement> context;
76+
VariableContext context;
7777

7878
for (auto _ : state) {
7979
matcher.match(header, context);
@@ -102,7 +102,7 @@ static void BM_UnmatchedFullQuery(benchmark::State& state)
102102
ConstantValueMatcher{ true }))
103103
};
104104

105-
std::vector<ContextElement> context;
105+
VariableContext context;
106106

107107
for (auto _ : state) {
108108
matcher.match(header, context);
@@ -130,11 +130,11 @@ static void BM_OneVariableFullMatch(benchmark::State& state)
130130
ConstantValueMatcher{ true }))
131131
};
132132

133-
std::vector<ContextElement> context(1);
133+
VariableContext context;
134134

135135
for (auto _ : state) {
136-
context[0].value = None{};
137136
matcher.match(header, context);
137+
context.discard();
138138
}
139139
}
140140
// Register the function as a benchmark
@@ -164,12 +164,12 @@ static void BM_OneVariableMatchUnmatch(benchmark::State& state)
164164
ConstantValueMatcher{ true }))
165165
};
166166

167-
std::vector<ContextElement> context(1);
167+
VariableContext context;
168168

169169
for (auto _ : state) {
170-
context[0].value = None{};
171170
matcher.match(header0, context);
172171
matcher.match(header1, context);
172+
context.discard();
173173
}
174174
}
175175
// Register the function as a benchmark

0 commit comments

Comments
 (0)