Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Utilities/DataSampling/include/DataSampling/DataSamplingHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ struct DataSamplingHeader : public header::BaseHeader {
uint32_t totalEvaluatedMessages = 0;
DeviceIDType deviceID = "";

DataSamplingHeader();
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID);
/// Presampled description for the data. Copied from the original DataHeader.
header::DataDescription dataDescription;
/// Presampled origin for the data. Copied from the original DataHeader.
header::DataOrigin dataOrigin;
/// Presampled subSpecification for the data.
header::DataHeader::SubSpecificationType subSpecification;

DataSamplingHeader() = delete;
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& original);
DataSamplingHeader(const DataSamplingHeader&) = default;
DataSamplingHeader& operator=(const DataSamplingHeader&) = default;

Expand Down
2 changes: 1 addition & 1 deletion Utilities/DataSampling/include/DataSampling/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Dispatcher : public framework::Task
framework::Options getOptions();

private:
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy);
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original);
header::Stack extractAdditionalHeaders(const char* inputHeaderStack) const;
void reportStats(monitoring::Monitoring& monitoring) const;
void send(framework::DataAllocator& dataAllocator, const framework::DataRef& inputData, const framework::Output& output) const;
Expand Down
13 changes: 6 additions & 7 deletions Utilities/DataSampling/src/DataSamplingHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
namespace o2::utilities
{

DataSamplingHeader::DataSamplingHeader() : BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion)
{
}

DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID)
DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& _original)
: BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion),
sampleTimeUs(_sampleTimeUs),
totalAcceptedMessages(_totalAcceptedMessages),
totalEvaluatedMessages(_totalEvaluatedMessages),
deviceID(_deviceID)
deviceID(_deviceID),
dataDescription(_original.dataDescription),
dataOrigin(_original.dataOrigin),
subSpecification(_original.subSpecification)
{
}

Expand All @@ -42,4 +41,4 @@ const uint32_t o2::utilities::DataSamplingHeader::sVersion = 1;
const o2::header::HeaderType o2::utilities::DataSamplingHeader::sHeaderType = header::String2<uint64_t>("DataSamp");
const o2::header::SerializationMethod o2::utilities::DataSamplingHeader::sSerializationMethod = o2::header::gSerializationMethodNone;

} // namespace o2::utilities
} // namespace o2::utilities
7 changes: 4 additions & 3 deletions Utilities/DataSampling/src/Dispatcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void Dispatcher::run(ProcessingContext& ctx)
// a "TST/RAWDATA/*" output.
if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) {
auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route);
auto dsheader = prepareDataSamplingHeader(*policy);
auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader);
for (const auto& part : inputIt) {
if (part.header != nullptr) {
// We copy every header which is not DataHeader or DataProcessingHeader,
Expand Down Expand Up @@ -144,15 +144,16 @@ void Dispatcher::reportStats(Monitoring& monitoring) const
monitoring.send(Metric{dispatcherTotalAcceptedMessages, "Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
}

DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy)
DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original)
{
uint64_t sampleTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());

return {
sampleTime,
policy.getTotalAcceptedMessages(),
policy.getTotalEvaluatedMessages(),
mDeviceID};
mDeviceID,
original};
}

header::Stack Dispatcher::extractAdditionalHeaders(const char* inputHeaderStack) const
Expand Down
42 changes: 25 additions & 17 deletions Utilities/DataSampling/test/test_DataSamplingHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,62 @@
using namespace o2::utilities;
using namespace o2::header;

BOOST_AUTO_TEST_CASE(DataSamplingHeaderDefault)
{
DataSamplingHeader header;

BOOST_CHECK_EQUAL(header.sampleTimeUs, 0);
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 0);
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 0);
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, ""), 0);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderInit)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};

BOOST_CHECK_EQUAL(header.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(header.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(header.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(header.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderCopy)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};
DataSamplingHeader copy(header);

BOOST_CHECK_EQUAL(copy.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(copy.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(copy.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(copy.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(copy.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(copy.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(copy.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderAssignement)
{
DataSamplingHeader first{123, 456, 789, "abc"};
DataSamplingHeader second;
second = first;
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader first{123, 456, 789, "abc", original};
DataSamplingHeader second = first;

BOOST_CHECK_EQUAL(first.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(first.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(first.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(first.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(first.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(first.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(first.subSpecification, 1);

BOOST_CHECK_EQUAL(second.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(second.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(second.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(second.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(second.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(second.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(second.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};
Stack headerStack{header};

const auto* dsHeaderFromStack = get<DataSamplingHeader*>(headerStack.data());
Expand All @@ -81,4 +86,7 @@ BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->deviceID.str, "abc"), 0);
}
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(dsHeaderFromStack->subSpecification, 1);
}