Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions Framework/Core/include/Framework/DataRefUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef O2_FRAMEWORK_DATAREFUTILS_H_
#define O2_FRAMEWORK_DATAREFUTILS_H_

#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataRef.h"
#include "Framework/RootSerializationSupport.h"
#include "Framework/SerializationMethods.h"
Expand All @@ -33,6 +34,9 @@ class ConfigurableParam;
namespace o2::framework
{

template <typename H>
concept DataHeaderLike = requires(H& dh) {dh.dataOrigin; dh.dataDescription; dh.subSpecification; };

// FIXME: Should enforce the fact that DataRefs are read only...
struct DataRefUtils {

Expand All @@ -52,7 +56,7 @@ struct DataRefUtils {
if ((payloadSize % sizeof(T)) != 0) {
throw runtime_error("Cannot extract POD from message as size do not match");
}
//FIXME: provide a const collection
// FIXME: provide a const collection
return gsl::span<T>(reinterpret_cast<T*>(const_cast<char*>(ref.payload)), payloadSize / sizeof(T));
} else if constexpr (has_root_dictionary<T>::value == true &&
is_messageable<T>::value == false) {
Expand Down Expand Up @@ -220,17 +224,24 @@ struct DataRefUtils {
return ref.spec != nullptr && ref.spec->binding == binding;
}

/// check if the O2 message referred by DataRef matches a particular
/// input spec. The DataHeader is retrieved from the header message and matched
/// against @ref spec parameter.
static bool match(DataRef const& ref, InputSpec const& spec)
template <DataHeaderLike H>
static bool matchHeader(DataRef const& ref, InputSpec const& spec)
{
auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
auto const* dh = o2::header::get<H*>(ref.header);
if (dh == nullptr) {
return false;
}
return DataSpecUtils::match(spec, dh->dataOrigin, dh->dataDescription, dh->subSpecification);
}

/// check if the O2 message referred by DataRef matches a particular
/// input spec. The DataHeader is retrieved from the header message and matched
/// against @ref spec parameter.
template <DataHeaderLike... H>
static bool match(DataRef const& ref, InputSpec const& spec)
{
return (DataRefUtils::matchHeader<H>(ref, spec) || ... || matchHeader<o2::header::DataHeader>(ref, spec));
}
};

} // namespace o2::framework
Expand Down
5 changes: 3 additions & 2 deletions Framework/Core/include/Framework/InputRecordWalker.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
#define FRAMEWORK_INPUTRECORDWALKER_H

/// @file InputRecordWalker.h
/// @author Matthias Richter
/// @since 2020-03-25
/// @brief A helper class to iteratate over all parts of all input routes

#include "Framework/InputRecord.h"
#include "Framework/DataRefUtils.h"

namespace o2::framework
{
Expand Down Expand Up @@ -49,6 +49,7 @@ namespace o2::framework
/// for (auto const& ref : InputRecordWalker(inputs, filter)) {
/// // do something with the data
/// }
template <DataHeaderLike... EXTRA_HEADERS>
class InputRecordWalker
{
public:
Expand Down Expand Up @@ -131,7 +132,7 @@ class InputRecordWalker
if (mFilterSpecs.size() > 0) {
bool isSelected = false;
for (auto const& spec : mFilterSpecs) {
if ((isSelected = DataRefUtils::match(*mCurrent, spec)) == true) {
if ((isSelected = DataRefUtils::match<EXTRA_HEADERS...>(*mCurrent, spec)) == true) {
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion Framework/Utils/include/DPLUtils/DPLRawPageSequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class DPLRawPageSequencer
}

private:
InputRecordWalker mInput;
InputRecordWalker<> mInput;

template <typename Predicate, typename Inserter>
void forwardInternal(Predicate pred, Inserter inserter, const char* data, size_t size, const o2::header::DataHeader* dh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ struct DataSamplingHeader : public header::BaseHeader {
uint32_t totalEvaluatedMessages = 0;
DeviceIDType deviceID = "";

DataSamplingHeader();
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID);
/// Presampled description for the data. Copied from the original DataHeader.
header::DataDescription dataDescription;
/// Presampled origin for the data. Copied from the original DataHeader.
header::DataOrigin dataOrigin;
/// Presampled subSpecification for the data.
header::DataHeader::SubSpecificationType subSpecification;

DataSamplingHeader() = delete;
DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& original);
DataSamplingHeader(const DataSamplingHeader&) = default;
DataSamplingHeader& operator=(const DataSamplingHeader&) = default;

Expand All @@ -51,4 +58,4 @@ struct DataSamplingHeader : public header::BaseHeader {

} // namespace o2::utilities

#endif //ALICEO2_DATASAMPLINGHEADER_H
#endif // ALICEO2_DATASAMPLINGHEADER_H
4 changes: 2 additions & 2 deletions Utilities/DataSampling/include/DataSampling/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Dispatcher : public framework::Task
framework::Options getOptions();

private:
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy);
DataSamplingHeader prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original);
header::Stack extractAdditionalHeaders(const char* inputHeaderStack) const;
void reportStats(monitoring::Monitoring& monitoring) const;
void send(framework::DataAllocator& dataAllocator, const framework::DataRef& inputData, const framework::Output& output) const;
Expand All @@ -78,4 +78,4 @@ class Dispatcher : public framework::Task

} // namespace o2::utilities

#endif //ALICEO2_DISPATCHER_H
#endif // ALICEO2_DISPATCHER_H
13 changes: 6 additions & 7 deletions Utilities/DataSampling/src/DataSamplingHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
namespace o2::utilities
{

DataSamplingHeader::DataSamplingHeader() : BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion)
{
}

DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID)
DataSamplingHeader::DataSamplingHeader(uint64_t _sampleTimeUs, uint32_t _totalAcceptedMessages, uint32_t _totalEvaluatedMessages, DeviceIDType _deviceID, header::DataHeader const& _original)
: BaseHeader(sizeof(DataSamplingHeader), sHeaderType, sSerializationMethod, sVersion),
sampleTimeUs(_sampleTimeUs),
totalAcceptedMessages(_totalAcceptedMessages),
totalEvaluatedMessages(_totalEvaluatedMessages),
deviceID(_deviceID)
deviceID(_deviceID),
dataDescription(_original.dataDescription),
dataOrigin(_original.dataOrigin),
subSpecification(_original.subSpecification)
{
}

Expand All @@ -42,4 +41,4 @@ const uint32_t o2::utilities::DataSamplingHeader::sVersion = 1;
const o2::header::HeaderType o2::utilities::DataSamplingHeader::sHeaderType = header::String2<uint64_t>("DataSamp");
const o2::header::SerializationMethod o2::utilities::DataSamplingHeader::sSerializationMethod = o2::header::gSerializationMethodNone;

} // namespace o2::utilities
} // namespace o2::utilities
7 changes: 4 additions & 3 deletions Utilities/DataSampling/src/Dispatcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void Dispatcher::run(ProcessingContext& ctx)
// a "TST/RAWDATA/*" output.
if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) {
auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route);
auto dsheader = prepareDataSamplingHeader(*policy);
auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader);
for (const auto& part : inputIt) {
if (part.header != nullptr) {
// We copy every header which is not DataHeader or DataProcessingHeader,
Expand Down Expand Up @@ -144,15 +144,16 @@ void Dispatcher::reportStats(Monitoring& monitoring) const
monitoring.send(Metric{dispatcherTotalAcceptedMessages, "Dispatcher_messages_passed", Verbosity::Prod}.addTag(tags::Key::Subsystem, tags::Value::DataSampling));
}

DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy)
DataSamplingHeader Dispatcher::prepareDataSamplingHeader(const DataSamplingPolicy& policy, header::DataHeader const& original)
{
uint64_t sampleTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());

return {
sampleTime,
policy.getTotalAcceptedMessages(),
policy.getTotalEvaluatedMessages(),
mDeviceID};
mDeviceID,
original};
}

header::Stack Dispatcher::extractAdditionalHeaders(const char* inputHeaderStack) const
Expand Down
42 changes: 25 additions & 17 deletions Utilities/DataSampling/test/test_DataSamplingHeader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,62 @@
using namespace o2::utilities;
using namespace o2::header;

BOOST_AUTO_TEST_CASE(DataSamplingHeaderDefault)
{
DataSamplingHeader header;

BOOST_CHECK_EQUAL(header.sampleTimeUs, 0);
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 0);
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 0);
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, ""), 0);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderInit)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};

BOOST_CHECK_EQUAL(header.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(header.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(header.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(header.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(header.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(header.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(header.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderCopy)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};
DataSamplingHeader copy(header);

BOOST_CHECK_EQUAL(copy.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(copy.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(copy.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(copy.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(copy.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(copy.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(copy.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderAssignement)
{
DataSamplingHeader first{123, 456, 789, "abc"};
DataSamplingHeader second;
second = first;
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader first{123, 456, 789, "abc", original};
DataSamplingHeader second = first;

BOOST_CHECK_EQUAL(first.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(first.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(first.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(first.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(first.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(first.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(first.subSpecification, 1);

BOOST_CHECK_EQUAL(second.sampleTimeUs, 123);
BOOST_CHECK_EQUAL(second.totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(second.totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(second.deviceID.str, "abc"), 0);
BOOST_CHECK_EQUAL(strcmp(second.dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(second.dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(second.subSpecification, 1);
}

BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
{
DataSamplingHeader header{123, 456, 789, "abc"};
o2::header::DataHeader original("A", "TST", 1);
DataSamplingHeader header{123, 456, 789, "abc", original};
Stack headerStack{header};

const auto* dsHeaderFromStack = get<DataSamplingHeader*>(headerStack.data());
Expand All @@ -81,4 +86,7 @@ BOOST_AUTO_TEST_CASE(DataSamplingHeaderOnStack)
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalAcceptedMessages, 456);
BOOST_CHECK_EQUAL(dsHeaderFromStack->totalEvaluatedMessages, 789);
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->deviceID.str, "abc"), 0);
}
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataOrigin.str, "TST"), 0);
BOOST_CHECK_EQUAL(strcmp(dsHeaderFromStack->dataDescription.str, "A"), 0);
BOOST_CHECK_EQUAL(dsHeaderFromStack->subSpecification, 1);
}