Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 27 additions & 35 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,43 +203,37 @@ sequenceDiagram

**File**: `include/cucascade/data/data_batch.hpp`

A `data_batch` is the fundamental unit of data in cuCascade. It wraps a tier-specific data representation (GPU table or host table) and manages concurrent access through a RAII read-only and mutable accessor classes.
A `data_batch` is the fundamental unit of data in cuCascade. It wraps a tier-specific data representation (GPU table or host table) and manages concurrent access through RAII read-only and mutable accessor classes.

```mermaid
stateDiagram-v2
direction LR

idle --> task_created : try_to_create_task()
idle --> in_transit : try_to_lock_for_in_transit()
task_created --> processing : try_to_lock_for_processing()
task_created --> in_transit : try_to_lock_for_in_transit()
task_created --> idle : try_to_cancel_task()
processing --> idle : handle destruction
processing --> task_created : try_to_create_task()
in_transit --> idle : try_to_release_in_transit()
in_transit --> task_created : try_to_release_in_transit(task_created)
idle --> read_only : get_read_only() / try_get_read_only()
idle --> mutable_locked : get_mutable() / try_get_mutable()
read_only --> idle : read_only_data_batch::to_idle()
mutable_locked --> idle : mutable_data_batch::to_idle()
```

**States**:
- **idle** -- no pending work, available for scheduling or tier movement
- **task_created** -- a task has been registered but processing hasn't started
- **processing** -- one or more RAII `data_batch_processing_handle`s are active
- **in_transit** -- locked for movement between memory tiers (no concurrent access)
- **idle** -- no active readers or writers; the synchronized handle grants no direct data access
- **read_only** -- one or more `read_only_data_batch` shared locks are active
- **mutable_locked** -- one `mutable_data_batch` exclusive lock is active

The `data_batch_processing_handle` uses RAII to ensure the processing count is always correctly decremented, even on exceptions.
The accessor objects use RAII to release locks automatically, even on exceptions.

### Data Repositories

**File**: `include/cucascade/data/data_repository.hpp`

A `data_repository` is a partitioned storage for data batches. It provides blocking `pop` operations that wait until a batch reaches the requested state.
A `data_repository` is partitioned storage for synchronized data batch handles. It provides non-blocking `pop` operations that return `nullptr` when a partition is empty.

```cpp
// Pop a batch that can transition to task_created (blocks if none ready)
auto batch = repository.pop_data_batch(batch_state::task_created);
// Pop the next batch from the default partition
auto batch = repository.pop_next_data_batch();

// Pop a specific batch by ID
auto batch = repository.pop_data_batch_by_id(42, batch_state::in_transit);
auto batch = repository.pop_data_batch_by_id(42);
```

The `data_repository_manager` coordinates multiple repositories across operators/pipelines and provides atomic batch ID generation.
Expand Down Expand Up @@ -290,25 +284,23 @@ A typical lifecycle of data through cuCascade:
```
1. INGESTION
Create data representation (e.g., gpu_table_representation wrapping a cuDF table)
-> Wrap in data_batch with unique ID from data_repository_manager
-> Wrap in data_batch::make(unique_id, representation)
-> Add to data_repository

2. TASK SCHEDULING
batch.try_to_create_task() [idle -> task_created]
repository.pop_data_batch(task_created) [retrieve batch for processing]
2. REPOSITORY DISTRIBUTION
manager.add_data_batch(batch, destinations)
repository.pop_next_data_batch() [retrieve synchronized batch handle]

3. PROCESSING
batch.try_to_lock_for_processing() [task_created -> processing]
-> Returns data_batch_processing_handle (RAII)
-> Access data via batch.get_data()
-> Handle destructs on scope exit [processing -> idle]
auto ro = batch->get_read_only() [shared lock]
-> Access data via ro->get_data()
-> Accessor destructs on scope exit [read_only -> idle when last reader exits]

4. MEMORY PRESSURE (downgrade)
memory_space.should_downgrade_memory() [threshold exceeded]
batch.try_to_lock_for_in_transit() [idle -> in_transit]
converter_registry.convert<host_data_representation>(...)
batch.set_data(new_representation) [data now on HOST]
batch.try_to_release_in_transit() [in_transit -> idle]
auto mut = batch->get_mutable() [exclusive lock]
mut->convert_to<host_data_representation>(...)
mutable_data_batch::to_idle(std::move(mut))

5. DATA NEEDED (upgrade)
Same flow as downgrade but in reverse tier direction
Expand All @@ -326,7 +318,7 @@ cuCascade uses a strict lock hierarchy to prevent deadlocks:
```
Level 1: atomic<uint64_t> (batch ID generation -- lock-free)
|
Level 2: data_batch, read_only_data_batch, mutable_data_batch (3-class sytem provides read-only and mutable access classes)
Level 2: data_batch, read_only_data_batch, mutable_data_batch (3-class system provides read-only and mutable access classes)
|
Level 3: idata_repository._mutex (protects batch storage)
|
Expand All @@ -339,7 +331,7 @@ Level 6: memory_reservation_manager._wait_mutex (protects reservation waiting)

Key synchronization primitives:
- **`std::mutex`** -- guards state transitions, storage, and configuration
- **`std::condition_variable`** -- blocks repository pops and reservation requests until satisfied
- **`std::condition_variable`** -- blocks reservation requests until satisfied
- **`std::atomic`** -- lock-free counters for batch IDs, allocated bytes, and peak tracking
- **`atomic_bounded_counter`** -- CAS-based bounded arithmetic for reservation enforcement
- **`notification_channel`** -- signals waiting threads when reservations are released
Expand All @@ -352,7 +344,7 @@ Key synchronization primitives:
|---------|-----------|
| **Strategy** | `reservation_request_strategy` subclasses for memory selection |
| **Builder** | `reservation_manager_configurator` for fluent system configuration |
| **RAII** | `data_batch_processing_handle`, `borrowed_stream`, `multiple_blocks_allocation`, `notify_on_exit` |
| **RAII** | `read_only_data_batch`, `mutable_data_batch`, `borrowed_stream`, `multiple_blocks_allocation`, `notify_on_exit` |
| **Factory** | `DeviceMemoryResourceFactoryFn` for tier-specific allocator creation |
| **Adapter** | `reservation_aware_resource_adaptor` wraps RMM resources with tracking |
| **3-Class System** | `data_batch` with read-only and mutable class variants which provide locking and accessors |
Expand Down Expand Up @@ -393,7 +385,7 @@ Key synchronization primitives:
|------|---------|
| `include/cucascade/data/common.hpp` | `idata_representation` interface |
| `include/cucascade/data/data_batch.hpp` | Batch lifecycle, read-only and mutable locking accessor classes|
| `include/cucascade/data/data_repository.hpp` | Partitioned batch storage with blocking pops |
| `include/cucascade/data/data_repository.hpp` | Partitioned batch storage with non-blocking pops |
| `include/cucascade/data/data_repository_manager.hpp` | Multi-pipeline repository coordination |
| `include/cucascade/data/representation_converter.hpp` | Type-indexed converter registry |
| `include/cucascade/data/gpu_data_representation.hpp` | GPU-resident cuDF table wrapper |
Expand Down
98 changes: 43 additions & 55 deletions docs/data-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ A deep dive into cuCascade's data lifecycle, batch read-only and mutable locking
- [Data Batch Lifecycle](#data-batch-lifecycle)
- [States](#states)
- [State Transitions](#state-transitions)
- [Processing Handles](#processing-handles)
- [RAII Accessor Classes](#raii-accessor-classes)
- [Thread Safety](#thread-safety)
- [Cloning](#cloning)
- [Representation Conversion](#representation-conversion)
Expand All @@ -23,7 +23,7 @@ A deep dive into cuCascade's data lifecycle, batch read-only and mutable locking
- [Data Repositories](#data-repositories)
- [Add and Pop Semantics](#add-and-pop-semantics)
- [Partitioning](#partitioning)
- [shared_ptr vs unique_ptr Repositories](#shared_ptr-vs-unique_ptr-repositories)
- [Repository Pointer Type](#repository-pointer-type)
- [Data Repository Manager](#data-repository-manager)
- [Operator Port Keys](#operator-port-keys)
- [Batch ID Generation](#batch-id-generation)
Expand All @@ -40,7 +40,7 @@ The data module manages the lifecycle of data as it flows through processing pip
- **Tier-agnostic data representations** -- abstract interface with GPU and host implementations
- **Locking read-only and mutable accessor classes for batch lifecycle** -- prevents concurrent access conflicts during processing and tier movement
- **Type-indexed conversion** -- extensible registry for converting data between representations
- **Partitioned repositories** -- thread-safe storage with blocking retrieval
- **Partitioned repositories** -- thread-safe storage with non-blocking retrieval
- **Multi-pipeline coordination** -- manages data across operators with atomic ID generation

The data module depends on the memory module for allocators, memory spaces, and CUDA streams.
Expand Down Expand Up @@ -176,24 +176,22 @@ appropriate lock — the idle `data_batch` pointer grants no data access.
stateDiagram-v2
direction LR

idle --> read_only : to_read_only() / try_to_read_only()
idle --> mutable_locked : to_mutable() / try_to_mutable()
idle --> read_only : get_read_only() / try_get_read_only()
idle --> mutable_locked : get_mutable() / try_get_mutable()

read_only --> idle : to_idle(read_only_data_batch&&) [last reader]
read_only --> mutable_locked : readonly_to_mutable(read_only_data_batch&&)
read_only --> idle : read_only_data_batch::to_idle(read_only_data_batch&&) [last reader]

mutable_locked --> idle : to_idle(mutable_data_batch&&)
mutable_locked --> read_only : mutable_to_readonly(mutable_data_batch&&)
mutable_locked --> idle : mutable_data_batch::to_idle(mutable_data_batch&&)
```

**Key rules**:
- Non-static transitions (`to_read_only`, `to_mutable`, `try_to_*`) use `shared_from_this()` and do not consume the caller's `shared_ptr`.
- Static transitions (`to_idle`, `readonly_to_mutable`, `mutable_to_readonly`) consume the accessor via `&&`, making the source null at the call site — the compiler enforces that you cannot use an accessor after releasing it.
- `to_read_only()` / `to_mutable()` **block** until the lock is available.
- `try_to_read_only()` / `try_to_mutable()` are **non-blocking** and return `std::nullopt` on failure.
- Access acquisition (`get_read_only`, `get_mutable`, `try_get_*`) uses the caller's `shared_ptr<data_batch>` and does not consume it.
- `read_only_data_batch::to_idle()` and `mutable_data_batch::to_idle()` consume the accessor via `&&`, making the source null at the call site.
- `get_read_only()` and `get_mutable()` **block** until the lock is available.
- `try_get_read_only()` and `try_get_mutable()` are **non-blocking** and return `std::nullopt` on failure.
- Multiple `read_only_data_batch` handles may coexist on the same batch (concurrent reads).
- `mutable_data_batch` is exclusive: it cannot coexist with any other reader or writer.
- Copying a `read_only_data_batch` acquires a new shared lock on the parent, incrementing `_read_only_count`.
- `read_only_data_batch` is move-only; use `clone_read_only_access()` to acquire another shared lock on the same batch.

See [data_batch_state_transitions.md](data_batch_state_transitions.md) for the complete reference.

Expand All @@ -205,47 +203,40 @@ Access to batch data is only possible through one of two RAII accessor classes:

```cpp
// Acquire shared lock (blocks if exclusive lock held)
read_only_data_batch ro = batch->to_read_only();
read_only_data_batch ro = batch->get_read_only();

// Access data
auto* data = ro.get_data()->cast<gpu_table_representation>();
auto* data = ro->get_data()->cast<gpu_table_representation>();
process(data->get_table_view());

// Release: either let ro go out of scope, or explicitly return to idle
auto idle_batch = cucascade::data_batch::to_idle(std::move(ro));
auto idle_batch = cucascade::read_only_data_batch::to_idle(std::move(ro));
```

Properties:
- **Copyable** — each copy acquires a new shared lock; `_read_only_count` increments per copy.
- **Movable** — moves transfer lock ownership without changing the count.
- **Move-only** — moves transfer lock ownership without changing the count.
- `clone_read_only_access()` acquires another shared lock when a second reader handle is needed.
- Destruction or `to_idle()` decrements `_read_only_count`; the batch returns to `idle` when the count reaches zero.

**`mutable_data_batch`** — exclusive (write) lock:

```cpp
// Acquire exclusive lock (blocks until all readers and writers release)
mutable_data_batch mut = batch->to_mutable();
mutable_data_batch mut = batch->get_mutable();

// Read and write data
mut.set_data(std::move(new_representation));
mut->set_data(std::move(new_representation));

// Release back to idle
auto idle_batch = cucascade::data_batch::to_idle(std::move(mut));
auto idle_batch = cucascade::mutable_data_batch::to_idle(std::move(mut));
```

Properties:
- **Move-only** — no copies allowed; only one exclusive lock can exist at a time.
- Destruction or `to_idle()` releases the exclusive lock.

**Upgrade / Downgrade**:

```cpp
// Upgrade: shared → exclusive (releases shared, acquires exclusive — may block)
mutable_data_batch mut = cucascade::data_batch::readonly_to_mutable(std::move(ro));

// Downgrade: exclusive → shared (releases exclusive, acquires shared — may block)
read_only_data_batch ro2 = cucascade::data_batch::mutable_to_readonly(std::move(mut));
```
There is no direct read-to-write upgrade or write-to-read downgrade API. Release the current
accessor first, then acquire the next access mode from the returned `shared_ptr<data_batch>`.

### Thread Safety

Expand All @@ -266,12 +257,12 @@ This is used by the pipeline and downgrade executor to track which batches are s
### Cloning

```cpp
// Via read-only accessor (caller already holds lock)
read_only_data_batch ro = batch->to_read_only();
auto cloned = ro.clone(new_batch_id, stream);
// Acquire read access while inspecting the source, then clone from the synchronized handle.
read_only_data_batch ro = batch->get_read_only();
auto cloned = batch->clone(new_batch_id, stream);

// With representation conversion
auto cloned = ro.clone_to<host_data_representation>(registry, new_batch_id, host_space, stream);
auto cloned = batch->clone_to<host_data_representation>(registry, new_batch_id, host_space, stream);
```

Cloning produces a new `shared_ptr<data_batch>` in `idle` state with the given ID. The clone
Expand Down Expand Up @@ -380,8 +371,8 @@ repository.add_data_batch(batch_ptr, partition_idx);
// Pop the next batch from a partition (non-blocking; returns nullptr if empty)
auto batch = repository.pop_next_data_batch(partition_idx);
if (batch) {
auto ro = batch->to_read_only(); // acquire shared lock
process(ro.get_data());
auto ro = batch->get_read_only(); // acquire shared lock
process(ro->get_data());
}

// Pop a specific batch by ID (returns nullptr if not found)
Expand All @@ -406,17 +397,17 @@ repository.add_data_batch(batch_a, 0);
repository.add_data_batch(batch_b, 1);

// Pop from partition 0 only
auto batch = repository.pop_data_batch(batch_state::task_created, 0);
auto batch = repository.pop_next_data_batch(0);
```

### shared_ptr vs unique_ptr Repositories
### Repository Pointer Type

| Type | Alias | Use Case |
|------|-------|----------|
| `idata_repository<shared_ptr<data_batch>>` | `shared_data_repository` | Same batch shared across multiple repositories (fan-out) |
| `idata_repository<unique_ptr<data_batch>>` | `unique_data_repository` | Each batch owned by exactly one repository |

Key difference: `get_data_batch_by_id()` (non-removing access) is only available with `shared_ptr` repositories.
Repositories store synchronized `data_batch` handles as `std::shared_ptr<data_batch>`. The
non-removing `get_data_batch_by_id()` API returns another copy of that synchronized handle.

---

Expand Down Expand Up @@ -451,14 +442,12 @@ uint64_t id = manager.get_next_data_batch_id(); // atomic increment
The `add_data_batch()` method distributes a batch to one or more repositories:

```cpp
// shared_ptr: same batch goes to multiple repositories
// Same synchronized batch handle goes to multiple repositories
manager.add_data_batch(shared_batch, {{0, "output"}, {1, "input"}});

// unique_ptr: batch goes to exactly one repository (throws if multiple specified)
manager.add_data_batch(std::move(unique_batch), {{1, "input"}});
```

The manager also provides `get_data_batches_for_downgrade()` to find batches eligible for tier demotion based on their memory space.
Repository managers only distribute and retrieve synchronized batch handles. Tier demotion is
performed by acquiring mutable access to a specific batch and converting its representation.

---

Expand All @@ -472,9 +461,7 @@ The data and memory modules are connected at several points:

3. **Reservation tracking** -- when data is converted between tiers, the converter allocates in the target memory space using its allocator and reservation system

4. **Downgrade coordination** -- the application queries `memory_space.should_downgrade_memory()` and uses `data_repository_manager.get_data_batches_for_downgrade()` to find candidates

5. **Processing validation** -- `try_to_lock_for_processing()` checks that the requested `memory_space_id` matches the batch's current location
4. **Tier movement** -- the application queries memory pressure, acquires mutable batch access, and calls `convert_to()` through the accessor

```
Application
Expand All @@ -488,17 +475,18 @@ Application
|-- data_repository.pop_next_data_batch(partition_idx)
| |-- returns shared_ptr<data_batch> (idle, non-blocking)
|
|-- batch->to_read_only() [shared lock]
|-- batch->get_read_only() [shared lock]
| |-- returns read_only_data_batch (blocks until available)
| |-- ro.get_data() grants read access to idata_representation
| |-- ro->get_data() grants read access to idata_representation
|
|-- [on memory pressure]
| mutable_data_batch mut = data_batch::readonly_to_mutable(std::move(ro))
| mut.convert_to<TargetRepresentation>(registry, target_space, stream)
| read_only_data_batch::to_idle(std::move(ro)) [release shared lock]
| mutable_data_batch mut = batch->get_mutable() [exclusive lock]
| mut->convert_to<TargetRepresentation>(registry, target_space, stream)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have mut->convert_to instead of mut.convert_to? its not a pointer or an optional.

Similarly you have several places where you also have ro->

| |-- converter_registry.convert(data, target_space, stream)
| |-- allocates in target memory_space
| |-- cudaMemcpy between tiers
| data_batch::to_idle(std::move(mut)) [release exclusive lock]
| mutable_data_batch::to_idle(std::move(mut)) [release exclusive lock]
```

---
Expand All @@ -509,7 +497,7 @@ Application
|------|---------|
| `include/cucascade/data/common.hpp` | `idata_representation` abstract interface |
| `include/cucascade/data/data_batch.hpp` | `data_batch`, `read_only_data_batch`, `mutable_data_batch`, `batch_state` |
| `include/cucascade/data/data_repository.hpp` | `idata_repository<PtrType>`, `shared_data_repository`, `unique_data_repository` |
| `include/cucascade/data/data_repository.hpp` | `idata_repository<PtrType>`, `shared_data_repository` |
| `include/cucascade/data/data_repository_manager.hpp` | `data_repository_manager`, `operator_port_key` |
| `include/cucascade/data/representation_converter.hpp` | `representation_converter_registry`, `converter_key` |
| `include/cucascade/data/gpu_data_representation.hpp` | `gpu_table_representation` wrapping `cudf::table` |
Expand Down
Loading
Loading