Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions tree/ntuple/inc/ROOT/RNTupleMetrics.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,107 @@ private:
std::string fName;
bool fIsEnabled = false;

std::atomic<std::uint64_t> fSumSkip{0}; ///< Sum of seek distances
std::atomic<std::uint64_t> fTotalFileSize{0}; ///< Total size of the backing file
std::atomic<std::uint64_t> fExplicitBytesRead{0}; ///< Mirrored counter for API access
std::atomic<std::uint64_t> fTransactions{0}; ///< Number of I/O transactions

bool Contains(const std::string &name) const;

public:
void AddSumSkip(std::uint64_t n) { fSumSkip += n; }
void AddExplicitBytesRead(std::uint64_t n) { fExplicitBytesRead += n; }
void AddTransactions(std::uint64_t n) { fTransactions += n; }
void SetTotalFileSize(std::uint64_t n) { fTotalFileSize = n; }
std::uint64_t GetTotalFileSize() const { return fTotalFileSize; }

explicit RNTupleMetrics(const std::string &name) : fName(name) {}

/// \brief Returns the sparseness metric: ratio of payload bytes read to total file size.
/// \return Value in [0.0, 1.0]. Returns 0.0 if file size is unknown or 0.
double GetSparseness() const {
std::uint64_t totalSize = fTotalFileSize;
std::uint64_t totalBytes = fExplicitBytesRead;
for (auto *m : fObservedMetrics) {
totalSize += m->fTotalFileSize;
totalBytes += m->fExplicitBytesRead;
}
return totalSize > 0 ? (double)totalBytes / (double)totalSize : 0.0;
}

/// \brief Returns the randomness metric: ratio of seek distance to bytes read.
/// Higher values indicate inefficient, non-sequential access patterns.
/// \return Ratio >= 0.0. Returns 0.0 if no bytes have been read.
double GetRandomness() const {
std::uint64_t totalSkip = fSumSkip;
std::uint64_t totalBytes = fExplicitBytesRead;
for (auto *m : fObservedMetrics) {
totalSkip += m->fSumSkip;
totalBytes += m->fExplicitBytesRead;
}
return totalBytes > 0 ? (double)totalSkip / (double)totalBytes : 0.0;
}

/// \brief Returns the transactions metric: total number of I/O operations.
/// Counts all read operations across this metrics object and observed sub-metrics.
/// \return Total number of read transactions.
std::uint64_t GetTransactions() const {
std::uint64_t total = fTransactions;
for (auto *m : fObservedMetrics) {
total += m->GetTransactions();
}
return total;
}

RNTupleMetrics(const RNTupleMetrics &other) = delete;
RNTupleMetrics & operator=(const RNTupleMetrics &other) = delete;
RNTupleMetrics(RNTupleMetrics &&other) = default;
RNTupleMetrics & operator=(RNTupleMetrics &&other) = default;

RNTupleMetrics(RNTupleMetrics &&other)
: fCounters(std::move(other.fCounters)),
fObservedMetrics(std::move(other.fObservedMetrics)),
fName(std::move(other.fName)),
fIsEnabled(other.fIsEnabled),
fSumSkip(other.fSumSkip.load()),
fTotalFileSize(other.fTotalFileSize.load()),
fExplicitBytesRead(other.fExplicitBytesRead.load()),
fTransactions(other.fTransactions.load())
{
other.fSumSkip.store(0);
other.fTotalFileSize.store(0);
other.fExplicitBytesRead.store(0);
other.fTransactions.store(0);
}

RNTupleMetrics & operator=(RNTupleMetrics &&other) {
if (this != &other) {
fCounters = std::move(other.fCounters);
fObservedMetrics = std::move(other.fObservedMetrics);
fName = std::move(other.fName);
fIsEnabled = other.fIsEnabled;
fSumSkip.store(other.fSumSkip.load());
fTotalFileSize.store(other.fTotalFileSize.load());
fExplicitBytesRead.store(other.fExplicitBytesRead.load());
fTransactions.store(other.fTransactions.load());

other.fSumSkip.store(0);
other.fTotalFileSize.store(0);
other.fExplicitBytesRead.store(0);
other.fTransactions.store(0);
}
return *this;
}

/// \brief Resets accumulated metrics (skip distance and bytes read).
/// File size is not reset as it's an invariant property.
void Reset() {
fSumSkip = 0;
fExplicitBytesRead = 0;
fTransactions = 0;
// fTotalFileSize is constant, so we don't reset it.
for (auto *m : fObservedMetrics) {
m->Reset();
}
}
~RNTupleMetrics() = default;

// TODO(jblomer): return a reference
Expand Down
1 change: 1 addition & 0 deletions tree/ntuple/inc/ROOT/RNTupleReader.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ public:
/// ~~~
void EnableMetrics() { fMetrics.Enable(); }
const Experimental::Detail::RNTupleMetrics &GetMetrics() const { return fMetrics; }
Experimental::Detail::RNTupleMetrics &GetMetrics() { return fMetrics; }
}; // class RNTupleReader

} // namespace ROOT
Expand Down
6 changes: 6 additions & 0 deletions tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ private:
/// Populated by LoadStructureImpl(), reset at the end of Attach()
RStructureBuffer fStructureBuffer;

// fLastOffset tracks the file pointer for THIS source instance.
// Thread safety is guaranteed because RPageSourceFile instances
// are effectively thread-local (owned by a specific RNTupleReader/PageSource).
std::size_t fLastOffset = 0;
void UpdateReadMetrics(std::uint64_t offset, std::size_t nbytes);

RPageSourceFile(std::string_view ntupleName, const ROOT::RNTupleReadOptions &options);

/// Helper function for LoadClusters: it prepares the memory buffer (page map) and the
Expand Down
46 changes: 46 additions & 0 deletions tree/ntuple/src/RPageStorageFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ void ROOT::Internal::RPageSourceFile::LoadStructureImpl()
(fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
UpdateReadMetrics(fAnchor->GetSeekHeader(), fAnchor->GetNBytesHeader());
fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
UpdateReadMetrics(fAnchor->GetSeekFooter(), fAnchor->GetNBytesFooter());
fMetrics.AddTransactions(2);
fCounters->fNRead.Add(2);
} else {
RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
Expand All @@ -391,6 +394,9 @@ void ROOT::Internal::RPageSourceFile::LoadStructureImpl()
{fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(),
static_cast<std::size_t>(fAnchor->GetNBytesFooter()), 0}};
fFile->ReadV(readRequests, 2);
UpdateReadMetrics(fAnchor->GetSeekHeader(), fAnchor->GetNBytesHeader());
UpdateReadMetrics(fAnchor->GetSeekFooter(), fAnchor->GetNBytesFooter());
fMetrics.AddTransactions(1);
fCounters->fNReadV.Inc();
}
}
Expand Down Expand Up @@ -421,6 +427,9 @@ ROOT::RNTupleDescriptor ROOT::Internal::RPageSourceFile::AttachImpl(RNTupleSeria
auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
UpdateReadMetrics(cgDesc.GetPageListLocator().GetPosition<std::uint64_t>(),
cgDesc.GetPageListLocator().GetNBytesOnStorage());
fMetrics.AddTransactions(1);
RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLength(), buffer.data());

Expand All @@ -430,6 +439,9 @@ ROOT::RNTupleDescriptor ROOT::Internal::RPageSourceFile::AttachImpl(RNTupleSeria
// For the page reads, we rely on the I/O scheduler to define the read requests
fFile->SetBuffering(false);

// Initialize total file size for sparseness metric
fMetrics.SetTotalFileSize(fFile->GetSize());

return desc;
}

Expand All @@ -453,6 +465,8 @@ void ROOT::Internal::RPageSourceFile::LoadSealedPage(ROOT::DescriptorId_t physic
if (pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero) {
fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
pageInfo.GetLocator().GetPosition<std::uint64_t>());
UpdateReadMetrics(pageInfo.GetLocator().GetPosition<std::uint64_t>(), sealedPage.GetBufferSize());
fMetrics.AddTransactions(1);
} else {
assert(!pageInfo.HasChecksum());
memcpy(const_cast<void *>(sealedPage.GetBuffer()), ROOT::Internal::RPage::GetPageZeroBuffer(),
Expand Down Expand Up @@ -495,6 +509,8 @@ ROOT::Internal::RPageRef ROOT::Internal::RPageSourceFile::LoadPageImpl(ColumnHan
RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(),
pageInfo.GetLocator().GetPosition<std::uint64_t>());
UpdateReadMetrics(pageInfo.GetLocator().GetPosition<std::uint64_t>(), sealedPage.GetBufferSize());
fMetrics.AddTransactions(1);
}
fCounters->fNPageRead.Inc();
fCounters->fNRead.Inc();
Expand Down Expand Up @@ -705,9 +721,15 @@ ROOT::Internal::RPageSourceFile::LoadClusters(std::span<RCluster::RKey> clusterK
nBatch = 1;
RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
UpdateReadMetrics(readRequests[iReq].fOffset, readRequests[iReq].fSize);
fMetrics.AddTransactions(1);
} else {
RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
fFile->ReadV(&readRequests[iReq], nBatch);
for (size_t k = 0; k < nBatch; ++k) {
UpdateReadMetrics(readRequests[iReq + k].fOffset, readRequests[iReq + k].fSize);
}
fMetrics.AddTransactions(1);
}
fCounters->fNReadV.Inc();
fCounters->fNRead.Add(nBatch);
Expand All @@ -723,3 +745,27 @@ void ROOT::Internal::RPageSourceFile::LoadStreamerInfo()
{
fReader.LoadStreamerInfo();
}

void ROOT::Internal::RPageSourceFile::UpdateReadMetrics(std::uint64_t offset, std::size_t nbytes)
{
if (fMetrics.GetTotalFileSize() == 0 && fFile) {
fMetrics.SetTotalFileSize(fFile->GetSize());
}

std::size_t skip = 0;
if (fLastOffset != 0) {
// Calculate absolute seek distance (Manhattan distance) for the disk head.
// Forward: Gap between end of last read and start of new read.
// Backward: Distance from end of last read back to start of new read.
if (offset > fLastOffset){
skip = offset - fLastOffset;
}
else{
skip = fLastOffset - offset;
}
}

fMetrics.AddSumSkip(skip);
fMetrics.AddExplicitBytesRead(nbytes);
fLastOffset = offset + nbytes;
}
45 changes: 45 additions & 0 deletions tree/ntuple/test/ntuple_metrics.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,48 @@ TEST(Metrics, RNTupleWriter)
// one page for the int field, one for the float field
EXPECT_EQ(2, page_counter->GetValueAsInt());
}

TEST(Metrics, IOMetrics)
{
// Setup: Write a small dummy ntuple
{
auto model = RNTupleModel::Create();
auto field = model->MakeField<int>("val");
auto writer = RNTupleWriter::Recreate(std::move(model), "ntuple", "test_metrics.root");
for (int i = 0; i < 1000; ++i) {
*field = i;
writer->Fill();
}
}

// Read it back and check metrics
auto reader = RNTupleReader::Open("ntuple", "test_metrics.root");
reader->EnableMetrics();

for (auto entry : *reader) {
(void)entry;
}

auto& metrics = reader->GetMetrics();

// Assertions
// Sparseness: Should be > 0 (we read data) and <= 1.0
EXPECT_GT(metrics.GetSparseness(), 0.0);
EXPECT_LE(metrics.GetSparseness(), 1.0);

// Randomness: Should be > 0 (we had to seek to header/footer)
EXPECT_GT(metrics.GetRandomness(), 0.0);

// Transactions: We definitely did I/O
EXPECT_GT(metrics.GetTransactions(), 0);

// Test Reset()
// Note: We need a non-const reference to call Reset.
// RNTupleReader::GetMetrics() returns a reference, so this works.
reader->GetMetrics().Reset();

EXPECT_DOUBLE_EQ(metrics.GetRandomness(), 0.0);
EXPECT_DOUBLE_EQ(metrics.GetSparseness(), 0.0);
// Transactions should also be reset
EXPECT_EQ(metrics.GetTransactions(), 0);
}