Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,86 @@ jobs:
name: status-ubsan
path: badge-status/status-ubsan.json

tsan:
name: tsan / ubuntu-24.04 / clang-20
runs-on: ubuntu-24.04
timeout-minutes: 45

concurrency:
group: tsan-${{ github.ref }}
cancel-in-progress: true

steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Install dependencies
shell: bash
run: |
set -euxo pipefail
sudo apt-get update
sudo apt-get install -y cmake ninja-build wget gnupg lsb-release software-properties-common libhdf5-dev
wget https://apt.llvm.org/llvm.sh
chmod +x llvm.sh
sudo ./llvm.sh 20 all
echo "CC=$(command -v clang-20)" >> "$GITHUB_ENV"
echo "CXX=$(command -v clang++-20)" >> "$GITHUB_ENV"

- name: Configure
shell: bash
run: |
set -euxo pipefail
cmake -S . -B build -G Ninja \
-DCMAKE_C_COMPILER="$CC" \
-DCMAKE_CXX_COMPILER="$CXX" \
-DCMAKE_CXX_STANDARD=17 \
-DCMAKE_BUILD_TYPE=Debug \
-DCMAKE_C_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \
-DCMAKE_CXX_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \
-DCMAKE_EXE_LINKER_FLAGS="-fsanitize=thread" \
-DH5CPP_BUILD_TESTS=ON

- name: Build
shell: bash
run: cmake --build build --parallel

- name: Test
shell: bash
env:
# halt_on_error=1: TSAN exits on the first data race so CI fails loud.
# second_deadlock_stack=1: print full stack of the lock that completed
# the cycle (default prints only one stack for the offending pair).
TSAN_OPTIONS: "halt_on_error=1:second_deadlock_stack=1"
run: ctest --test-dir build --output-on-failure

- name: Record Badge Status
if: always()
shell: bash
run: |
set -euxo pipefail
mkdir -p badge-status
cat <<EOF > badge-status/status-tsan.json
{
"os": "ubuntu-24.04",
"compiler": "clang-20",
"label": "TSan",
"status": "${{ job.status }}"
}
EOF

- name: Upload Status Artifact
if: always()
uses: actions/upload-artifact@v7
with:
name: status-tsan
path: badge-status/status-tsan.json

badge:
name: Generate SVG Badges
if: always()
needs: [build, asan, ubsan]
needs: [build, asan, ubsan, tsan]
runs-on: ubuntu-24.04

steps:
Expand Down Expand Up @@ -648,8 +724,8 @@ jobs:
-DCMAKE_CXX_STANDARD=20 \
-DCMAKE_C_COMPILER=gcc-14 \
-DCMAKE_CXX_COMPILER=g++-14 \
-DCMAKE_C_FLAGS="--coverage -O0 -g" \
-DCMAKE_CXX_FLAGS="--coverage -O0 -g" \
-DCMAKE_C_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \
-DCMAKE_CXX_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \
-DH5CPP_BUILD_TESTS=ON \
-DH5CPP_BUILD_EXAMPLES=ON

Expand Down
15 changes: 11 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,17 @@ find_package(HDF5 REQUIRED COMPONENTS C)

# HDF5 floor is decoupled from h5cpp package version. Previously the check
# compared HDF5_VERSION against PROJECT_VERSION, which coupled package
# versioning to HDF5 minimums by coincidence. After #234 raised the floor
# to 1.12 and #247 made PROJECT_VERSION track the git tag, the comparison
# stopped being meaningful. Pin the floor explicitly.
set(H5CPP_HDF5_FLOOR "1.12.0")
# versioning to HDF5 minimums by coincidence. After #247 made
# PROJECT_VERSION track the git tag, the comparison stopped being meaningful.
# Pin the floor explicitly.
#
# 1.10.4 matches the prior implicit floor (the stale project(VERSION 1.10.4.6)
# line that #247 removed). The CI matrix runs Ubuntu 22.04 with system
# HDF5 1.10.7 (floor coverage restored by #235); raising the floor above
# 1.10.4 would break that matrix entry. When dropping 1.10.x coverage in
# a future cohort, bump this constant deliberately and remove the 22.04
# matrix entry in the same commit.
set(H5CPP_HDF5_FLOOR "1.10.4")
if(HDF5_VERSION VERSION_LESS ${H5CPP_HDF5_FLOOR})
message(FATAL_ERROR
"-- !!! H5CPP requires HDF5 v${H5CPP_HDF5_FLOOR} or greater (found ${HDF5_VERSION}) !!!"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ cmake --install build
- **`std::float16_t`** (C++23 IEEE 754 half-precision)
- **Rank-7** array support
- **Expanded attribute** type coverage
- **Threaded I/O pipeline** for filter chains
- **FAPL-scoped worker pool** — `h5::create(..., h5::threads{N} | h5::backpressure{M})` opts the file into parallel filter compression; all chunked datasets opened on that file (and pt_t built from them) inherit the pool with async-pipelined dispatch
- **HDF5 1.12.2 ceiling** — tested and verified; `H5Dvlen_reclaim` / reference API compatibility
- **Windows MSVC** in the CI matrix
- **ASan + UBSan** clean on Clang 20
- **ASan + UBSan + TSan** clean on Clang 20

## Documentation

Expand Down
21 changes: 20 additions & 1 deletion docs/filtering-pipeline-rework-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The current implementation is an experimental skeleton rather than a production
| Multi-filter read | Throws for more than one filter | Reverse-order decode through the complete filter plan |
| Buffer sizing | Uses chunk-sized scratch buffers | Encoded buffers must allow compression expansion |
| Filter mask | Partial handling | Preserve HDF5 chunk filter-mask semantics |
| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling |
| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling (delivered in #250 as FAPL-scoped `pool_pipeline_t`) |
| Portability | Linux path is the only recently verified path | Linux, macOS, and Windows allocation/build behavior |

Focused baseline probes confirmed two important failures:
Expand Down Expand Up @@ -175,3 +175,22 @@ Threading should initially use C++17 standard library primitives. Avoid platform
Start with correctness, not SIMD. The highest-value first milestone is a serial `filter_plan` that can round-trip standard HDF5 filters and reject unsupported filters explicitly. Once that foundation is correct, SIMD and multithreading become execution-policy improvements rather than a risky rewrite.

The strategic direction is to make H5CPP's filtering chain a modern CPU execution engine while preserving HDF5-compatible metadata and file interoperability.

## Status — Phase I (#250, FAPL worker pool)

Phase I of the threading workplan is delivered on PR #251. The design and trade-offs are summarised in `tasks/h5cpp-fapl-multithreading-workplan.md`; the user-visible surface is one line in the file's FAPL:

```cpp
h5::fd_t fd = h5::create(
"data.h5", H5F_ACC_TRUNC,
h5::default_fcpl,
h5::threads{N} | h5::backpressure{M}); // M default = 8 × N
```

When `h5::threads{N}` is installed, the FAPL allocates a `worker_pool_t` and parks a `shared_ptr<>` to it inside an `H5Pinsert2` slot. Every dataset created/opened on that file inherits the pool via `H5Fget_access_plist`. When a dataset's DAPL has `h5::high_throughput`, `h5::write` and `h5::read` construct a local `pool_pipeline_t` that submits per-chunk compression closures to the pool and drains in submission order; `H5Dwrite_chunk` still runs on the calling thread. `pt_t` resolves the same pool in `init()` and uses `pool_pipeline_t` as a variant alternative.

Back-pressure is bounded by `h5::backpressure{M}`: the producer blocks on the front future once the in-flight deque hits `M`. Default is 8 × worker count.

The legacy per-pt_t `h5::filter::threads{N}` constructor from #241 is removed in this cycle (see [Phase 1.4 commit message]). Two parallel threading paths in the pipeline invite contention bugs and confuse the surface; the FAPL pool fully subsumes it.

Phase II (compile-time C-API blocking on `async_fd_t`, full async mode) is tracked separately.
121 changes: 75 additions & 46 deletions h5cpp/H5Dappend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
#include "H5capi.hpp"
#include "H5Tmeta.hpp"
#include "H5cout.hpp"
#include "H5Zpipeline_threaded.hpp"
#include <memory>
#include <string>
#include <variant>
#include <array>
#include <cstring>
#include <deque>
#include <future>
#include <vector>
#include <stdexcept>
#include <type_traits>
Expand All @@ -22,28 +25,32 @@ namespace h5 {
std::ostream& operator<<(std::ostream& os, const h5::pt_t& pt);

namespace h5::impl {
// pt_t::pipeline selects between the synchronous basic pipeline (default,
// bytewise-identical to pre-241 behavior) and the parallel threaded pipeline.
// Both alternatives are indirect-owned through unique_ptr so that the
// variant remains move-assignable regardless of the underlying pipeline's
// move semantics (threaded_pipeline_t deletes moves because it owns
// std::jthread workers and atomics; basic_pipeline_t inherits a manually-
// written move-assign from pipeline_t<Derived> that suppresses the
// implicit move-ctor needed by variant assignment).
// pt_t::pipeline selects between two pipeline implementations:
//
// basic_pipeline_t — synchronous filter chain on the calling
// thread; default when the file's FAPL has
// no h5::threads{N} pool installed.
// pool_pipeline_t — FAPL-scoped shared worker pool, async-
// pipelined dispatch with back-pressure.
// Selected when init() resolves a pool
// from the file's FAPL.
//
// Both are indirect-owned through unique_ptr so the variant
// remains move-assignable regardless of the underlying pipeline's
// move semantics.
using pt_pipeline_t = std::variant<
std::unique_ptr<impl::basic_pipeline_t>,
std::unique_ptr<impl::threaded_pipeline_t>
std::unique_ptr<impl::pool_pipeline_t>
>;
}

// packet table template specialization with inheritance
namespace h5 {
struct pt_t {
pt_t();
pt_t( const h5::ds_t& handle ); // conversion ctor — synchronous pipeline
pt_t( const h5::ds_t& handle, h5::filter::threads workers ); // threaded pipeline
// deep copy with own cache memory — always uses the synchronous pipeline,
// since the threaded pipeline owns workers that cannot be duplicated.
pt_t( const h5::ds_t& handle ); // FAPL-aware ctor: pool when h5::threads{N} is set, basic otherwise
// deep copy with own cache memory — re-runs init(), so the copy
// resolves its own pipeline from the dataset's file FAPL.
pt_t( const h5::pt_t& pt ) : h5::pt_t(pt.ds) {
};
~pt_t();
Expand Down Expand Up @@ -115,6 +122,11 @@ namespace h5 {
chunk_dims[H5CPP_MAX_RANK], count[H5CPP_MAX_RANK];
size_t block_size,element_size,N,n,rank;
void *ptr, *fill_value;

// Phase 1.3.3 — chunk dispatch is uniform across all variant
// alternatives via visit_pipeline + write_chunk. pool_pipeline_t
// holds the pool reference, in-flight deque, and back-pressure
// logic internally; pt_t no longer needs per-instance pool fields.
};
}

Expand All @@ -128,24 +140,16 @@ inline h5::pt_t::pt_t() :
count[i] = 1, offset[i] = 0;
}

// conversion ctor — synchronous pipeline (default)
// FAPL-aware conversion ctor — init() resolves pool from the dataset's
// FAPL and swaps the variant to pool_pipeline_t when h5::threads{N} is
// installed. Otherwise the default basic_pipeline_t stays active.
inline
h5::pt_t::pt_t( const h5::ds_t& handle ) : pt_t() {
/*default ctor has an invalid state -- skip initialization */
if( !is_valid(handle) ) return;
init(handle);
}

// conversion ctor — threaded pipeline with N compression workers
inline
h5::pt_t::pt_t( const h5::ds_t& handle, h5::filter::threads workers ) : pt_t() {
if( !is_valid(handle) ) return;
auto threaded = std::make_unique<impl::threaded_pipeline_t>();
threaded->set_worker_count(workers.n);
pipeline.emplace<std::unique_ptr<impl::threaded_pipeline_t>>(std::move(threaded));
init(handle);
}

inline
h5::pt_t::~pt_t(){
/*default ctor has an invalid state -- skip flushing cache */
Expand All @@ -170,6 +174,21 @@ void h5::pt_t::init( const h5::ds_t& handle ){
H5Pset_chunk_cache(dapl, 0, 0, H5D_CHUNK_CACHE_W0_DEFAULT);
ds = h5::ds_t{H5Dopen2(fid, dname.data(), dapl)};
H5Pclose(dapl);

// Phase 1.3.3 — resolve the file's FAPL pool while we still hold
// a live fid. When the FAPL has h5::threads{N} installed, swap
// the variant from basic_pipeline_t (default) to pool_pipeline_t
// constructed with the pool + back-pressure cap. When no pool
// is present, the default basic_pipeline_t stays — synchronous
// behavior, identical to pre-Phase-I.
hid_t fapl = H5Fget_access_plist(fid);
if (auto pool = impl::resolve_worker_pool(fapl)) {
const unsigned cap = impl::resolve_backpressure(fapl, pool->worker_count());
pipeline.emplace<std::unique_ptr<impl::pool_pipeline_t>>(
std::make_unique<impl::pool_pipeline_t>(std::move(pool), cap));
}
H5Pclose(fapl);

H5Fclose(fid);
dt = h5::dt_t<void>{H5Dget_type(static_cast<hid_t>(ds))};
h5::sp_t file_space = h5::get_space( handle );
Expand All @@ -194,6 +213,7 @@ void h5::pt_t::init( const h5::ds_t& handle ){
throw h5::error::io::packet_table::misc( H5CPP_ERROR_MSG("CTOR: unable to create handle from dataset..."));
}
}

template<class T> inline std::enable_if_t< h5::meta::is_scalar<T>::value,
void> h5::pt_t::append( const T* ptr ) try {
//PTR: write directly chunk size from provided buffer/ptr
Expand Down Expand Up @@ -308,28 +328,37 @@ void> h5::pt_t::append( const T& ref ) try {

inline
void h5::pt_t::flush(){
if( n == 0 ) return;
*offset = *current_dims;
*current_dims += *chunk_dims;
h5::set_extent(ds, current_dims);

if( H5Tis_variable_str(this->dt)) {
hsize_t block = 1, count = n;
h5::sp_t mem_space{H5Screate_simple(static_cast<int>(rank), &count, nullptr )};
h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )};
h5::select_all( mem_space );
H5Sselect_hyperslab( static_cast<hid_t>(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count);

H5Dwrite( static_cast<hid_t>( ds ),
dt, mem_space, file_space, static_cast<hid_t>(dxpl), ptr);
} else {
// the remainder of last chunk must be set to fill_value; arbitrary type size supported
for(hsize_t i=0; i<(N-n); i++)
for(size_t j=0; j < element_size; j++)
static_cast<char*>( ptr )[(n + i) * element_size + j] = static_cast<char*>( fill_value )[ j ];
visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); });
if( n != 0 ) {
*offset = *current_dims;
*current_dims += *chunk_dims;
h5::set_extent(ds, current_dims);

if( H5Tis_variable_str(this->dt)) {
hsize_t block = 1, count = n;
h5::sp_t mem_space{H5Screate_simple(static_cast<int>(rank), &count, nullptr )};
h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )};
h5::select_all( mem_space );
H5Sselect_hyperslab( static_cast<hid_t>(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count);

H5Dwrite( static_cast<hid_t>( ds ),
dt, mem_space, file_space, static_cast<hid_t>(dxpl), ptr);
} else {
// the remainder of last chunk must be set to fill_value; arbitrary type size supported
for(hsize_t i=0; i<(N-n); i++)
for(size_t j=0; j < element_size; j++)
static_cast<char*>( ptr )[(n + i) * element_size + j] = static_cast<char*>( fill_value )[ j ];
visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); });
}
n = 0;
}
n = 0;
// Pool path: drain in-flight chunks so flush() honors the "data
// on disk after this returns" contract. basic_pipeline_t writes
// inline; the visit is a no-op for that alternative.
std::visit([](auto& p) {
using T = std::decay_t<decltype(*p)>;
if constexpr (std::is_same_v<T, impl::pool_pipeline_t>)
p->drain();
}, pipeline);
}

inline void h5::pt_t::reset() {
Expand Down
29 changes: 26 additions & 3 deletions h5cpp/H5Dread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,32 @@ namespace h5 {
return layout == H5D_CHUNKED;
}();
if( use_pipeline ){
h5::impl::pipeline_t<impl::basic_pipeline_t>* filters;
H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters);
filters->read(ds, offset, stride, block, count, dxpl, ptr);
// Phase 1.3.3 — if the file's FAPL has h5::threads{N}, route
// reads through a local pool_pipeline_t. Currently pool_pipeline_t::
// read_chunk_impl is synchronous (parallel decompress is Phase 1.5+),
// so the FAPL-pool branch is semantically equivalent to the DAPL
// path today; the structure is in place for the read-ahead
// optimization to land later without changing call sites.
hid_t fid = H5Iget_file_id(static_cast<hid_t>(ds));
hid_t fapl = H5Fget_access_plist(fid);
auto pool = h5::impl::resolve_worker_pool(fapl);
if (pool) {
const unsigned cap = h5::impl::resolve_backpressure(
fapl, pool->worker_count());
h5::impl::pool_pipeline_t pipe(std::move(pool), cap);
h5::dcpl_t dcpl{H5Dget_create_plist(static_cast<hid_t>(ds))};
hid_t type_id = H5Dget_type(static_cast<hid_t>(ds));
size_t elem_sz = H5Tget_size(type_id);
H5Tclose(type_id);
pipe.set_cache(dcpl, elem_sz);
pipe.read(ds, offset, stride, block, count, dxpl, ptr);
} else {
h5::impl::pipeline_t<impl::basic_pipeline_t>* filters;
H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters);
filters->read(ds, offset, stride, block, count, dxpl, ptr);
}
H5Pclose(fapl);
H5Fclose(fid);
}else{
h5::sp_t mem_space = h5::create_simple( size );
h5::select_all( mem_space );
Expand Down
Loading
Loading