Skip to content

Commit 8688938

Browse files
authored
DataSampling: make sure the original DataHeader is available in the sampled data (#14164)
1 parent 4f4b545 commit 8688938

File tree

5 files changed

+47
-32
lines changed

5 files changed

+47
-32
lines changed

Utilities/DataSampling/include/DataSampling/DataSamplingHeader.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,15 @@ struct DataSamplingHeader : public header::BaseHeader {
4141
uint32_t totalEvaluatedMessages = 0;
4242
DeviceIDType deviceID = "";
4343

44-
DataSamplingHeader();
45-
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID);
44+
/// Presampled description for the data. Copied from the original DataHeader.
45+
header::DataDescription dataDescription;
46+
/// Presampled origin for the data. Copied from the original DataHeader.
47+
header::DataOrigin dataOrigin;
48+
/// Presampled subSpecification for the data.
49+
header::DataHeader::SubSpecificationType subSpecification;
50+
51+
DataSamplingHeader() = delete;
52+
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& original);
4653
DataSamplingHeader(const DataSamplingHeader&) = default;
4754
DataSamplingHeader& operator=(const DataSamplingHeader&) = default;
4855

@@ -51,4 +58,4 @@ struct DataSamplingHeader : public header::BaseHeader {
5158

5259
} // namespace o2::utilities
5360

54-
#endif //ALICEO2_DATASAMPLINGHEADER_H
61+
#endif // ALICEO2_DATASAMPLINGHEADER_H

Utilities/DataSampling/include/DataSampling/Dispatcher.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class Dispatcher : public framework::Task
6464
framework::Options getOptions();
6565

6666
private:
67-
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy);
67+
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original);
6868
header::Stack extractAdditionalHeaders(const char* inputHeaderStack) const;
6969
void reportStats(monitoring::Monitoring& monitoring) const;
7070
void send(framework::DataAllocator& dataAllocator, const framework::DataRef& inputData, const framework::Output& output) const;
@@ -78,4 +78,4 @@ class Dispatcher : public framework::Task
7878

7979
} // namespace o2::utilities
8080

81-
#endif //ALICEO2_DISPATCHER_H
81+
#endif // ALICEO2_DISPATCHER_H

Utilities/DataSampling/src/DataSamplingHeader.cxx

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@
1919
namespace o2::utilities
2020
{
2121

22-
DataSamplingHeader::DataSamplingHeader() : BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion)
23-
{
24-
}
25-
26-
DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID)
22+
DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& _original)
2723
: BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion),
2824
sampleTimeUs(_sampleTimeUs),
2925
totalAcceptedMessages(_totalAcceptedMessages),
3026
totalEvaluatedMessages(_totalEvaluatedMessages),
31-
deviceID(_deviceID)
27+
deviceID(_deviceID),
28+
dataDescription(_original.dataDescription),
29+
dataOrigin(_original.dataOrigin),
30+
subSpecification(_original.subSpecification)
3231
{
3332
}
3433

@@ -42,4 +41,4 @@ const uint32_t o2::utilities::DataSamplingHeader::sVersion = 1;
4241
const o2::header::HeaderType o2::utilities::DataSamplingHeader::sHeaderType = header::String2<uint64_t>("DataSamp");
4342
const o2::header::SerializationMethod o2::utilities::DataSamplingHeader::sSerializationMethod = o2::header::gSerializationMethodNone;
4443

45-
} // namespace o2::utilities
44+
} // namespace o2::utilities

Utilities/DataSampling/src/Dispatcher.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ void Dispatcher::run(ProcessingContext& ctx)
9999
// a "TST/RAWDATA/*" output.
100100
if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) {
101101
auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route);
102-
auto dsheader = prepareDataSamplingHeader(*policy);
102+
auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader);
103103
for (const auto& part : inputIt) {
104104
if (part.header != nullptr) {
105105
// We copy every header which is not DataHeader or DataProcessingHeader,
@@ -144,15 +144,16 @@ void Dispatcher::reportStats(Monitoring& monitoring) const
144144
monitoring.send(Metric{dispatcherTotalAcceptedMessages, "Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
145145
}
146146

147-
DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy)
147+
DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original)
148148
{
149149
uint64_t sampleTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
150150

151151
return {
152152
sampleTime,
153153
policy.getTotalAcceptedMessages(),
154154
policy.getTotalEvaluatedMessages(),
155-
mDeviceID};
155+
mDeviceID,
156+
original};
156157
}
157158

158159
header::Stack Dispatcher::extractAdditionalHeaders(const char* inputHeaderStack) const

Utilities/DataSampling/test/test_DataSamplingHeader.cxx

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,57 +21,62 @@
2121
using namespace o2::utilities;
2222
using namespace o2::header;
2323

24-
BOOST_AUTO_TEST_CASE(DataSamplingHeaderDefault)
25-
{
26-
DataSamplingHeader header;
27-
28-
BOOST_CHECK_EQUAL(header.sampleTimeUs, 0);
29-
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 0);
30-
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 0);
31-
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, ""), 0);
32-
}
33-
3424
BOOST_AUTO_TEST_CASE(DataSamplingHeaderInit)
3525
{
36-
DataSamplingHeader header{123, 456, 789, "abc"};
26+
o2::header::DataHeader original("A", "TST", 1);
27+
DataSamplingHeader header{123, 456, 789, "abc", original};
3728

3829
BOOST_CHECK_EQUAL(header.sampleTimeUs, 123);
3930
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 456);
4031
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 789);
4132
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, "abc"), 0);
33+
BOOST_CHECK_EQUAL(strcmp(header.dataOrigin.str, "TST"), 0);
34+
BOOST_CHECK_EQUAL(strcmp(header.dataDescription.str, "A"), 0);
35+
BOOST_CHECK_EQUAL(header.subSpecification, 1);
4236
}
4337

4438
BOOST_AUTO_TEST_CASE(DataSamplingHeaderCopy)
4539
{
46-
DataSamplingHeader header{123, 456, 789, "abc"};
40+
o2::header::DataHeader original("A", "TST", 1);
41+
DataSamplingHeader header{123, 456, 789, "abc", original};
4742
DataSamplingHeader copy(header);
4843

4944
BOOST_CHECK_EQUAL(copy.sampleTimeUs, 123);
5045
BOOST_CHECK_EQUAL(copy.totalAcceptedMessages, 456);
5146
BOOST_CHECK_EQUAL(copy.totalEvaluatedMessages, 789);
5247
BOOST_CHECK_EQUAL(strcmp(copy.deviceID.str, "abc"), 0);
48+
BOOST_CHECK_EQUAL(strcmp(copy.dataOrigin.str, "TST"), 0);
49+
BOOST_CHECK_EQUAL(strcmp(copy.dataDescription.str, "A"), 0);
50+
BOOST_CHECK_EQUAL(copy.subSpecification, 1);
5351
}
5452

5553
BOOST_AUTO_TEST_CASE(DataSamplingHeaderAssignement)
5654
{
57-
DataSamplingHeader first{123, 456, 789, "abc"};
58-
DataSamplingHeader second;
59-
second = first;
55+
o2::header::DataHeader original("A", "TST", 1);
56+
DataSamplingHeader first{123, 456, 789, "abc", original};
57+
DataSamplingHeader second = first;
6058

6159
BOOST_CHECK_EQUAL(first.sampleTimeUs, 123);
6260
BOOST_CHECK_EQUAL(first.totalAcceptedMessages, 456);
6361
BOOST_CHECK_EQUAL(first.totalEvaluatedMessages, 789);
6462
BOOST_CHECK_EQUAL(strcmp(first.deviceID.str, "abc"), 0);
63+
BOOST_CHECK_EQUAL(strcmp(first.dataOrigin.str, "TST"), 0);
64+
BOOST_CHECK_EQUAL(strcmp(first.dataDescription.str, "A"), 0);
65+
BOOST_CHECK_EQUAL(first.subSpecification, 1);
6566

6667
BOOST_CHECK_EQUAL(second.sampleTimeUs, 123);
6768
BOOST_CHECK_EQUAL(second.totalAcceptedMessages, 456);
6869
BOOST_CHECK_EQUAL(second.totalEvaluatedMessages, 789);
6970
BOOST_CHECK_EQUAL(strcmp(second.deviceID.str, "abc"), 0);
71+
BOOST_CHECK_EQUAL(strcmp(second.dataOrigin.str, "TST"), 0);
72+
BOOST_CHECK_EQUAL(strcmp(second.dataDescription.str, "A"), 0);
73+
BOOST_CHECK_EQUAL(second.subSpecification, 1);
7074
}
7175

7276
BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
7377
{
74-
DataSamplingHeader header{123, 456, 789, "abc"};
78+
o2::header::DataHeader original("A", "TST", 1);
79+
DataSamplingHeader header{123, 456, 789, "abc", original};
7580
Stack headerStack{header};
7681

7782
const auto* dsHeaderFromStack = get<DataSamplingHeader*>(headerStack.data());
@@ -81,4 +86,7 @@ BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
8186
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalAcceptedMessages, 456);
8287
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalEvaluatedMessages, 789);
8388
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->deviceID.str, "abc"), 0);
84-
}
89+
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataOrigin.str, "TST"), 0);
90+
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataDescription.str, "A"), 0);
91+
BOOST_CHECK_EQUAL(dsHeaderFromStack->subSpecification, 1);
92+
}

0 commit comments

Comments
 (0)