Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions benchmark/benchmark_disk_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,18 @@ void BM_ConvertGpuToDisk(benchmark::State& state)
rmm::cuda_stream stream;

// Create GPU representation
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

// Warmup
auto warmup_table = create_benchmark_table_from_bytes(1 * KiB, 2);
auto warmup_repr = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(warmup_table)), *const_cast<memory_space*>(gpu_space));
std::make_unique<cudf::table>(std::move(warmup_table)),
*const_cast<memory_space*>(gpu_space),
stream.view());
auto warmup_result =
registry->convert<disk_data_representation>(*warmup_repr, disk_space, stream.view());
stream.synchronize();
Expand Down Expand Up @@ -474,9 +478,11 @@ void BM_ConvertDiskToGpu(benchmark::State& state)
rmm::cuda_stream stream;

// Create GPU representation then convert to disk once
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

auto disk_rep = registry->convert<disk_data_representation>(*gpu_rep, disk_space, stream.view());
stream.synchronize();
Expand Down Expand Up @@ -521,9 +527,11 @@ void BM_ConvertHostToDisk(benchmark::State& state)
rmm::cuda_stream stream;

// Create GPU table, convert to host_data first
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

auto host_rep = registry->convert<host_data_representation>(*gpu_rep, host_space, stream.view());
stream.synchronize();
Expand Down Expand Up @@ -569,9 +577,11 @@ void BM_ConvertDiskToHost(benchmark::State& state)
rmm::cuda_stream stream;

// Create GPU table, convert to host, then to disk
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

auto host_rep = registry->convert<host_data_representation>(*gpu_rep, host_space, stream.view());
stream.synchronize();
Expand Down Expand Up @@ -620,9 +630,11 @@ void BM_ConvertGpuToDiskStringColumns(benchmark::State& state)

rmm::cuda_stream stream;

auto table = create_string_benchmark_table(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_string_benchmark_table(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

size_t bytes_transferred = gpu_rep->get_size_in_bytes();

Expand Down Expand Up @@ -662,9 +674,11 @@ void BM_ConvertGpuToDiskListColumns(benchmark::State& state)

rmm::cuda_stream stream;

auto table = create_list_benchmark_table(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_list_benchmark_table(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

size_t bytes_transferred = gpu_rep->get_size_in_bytes();

Expand Down Expand Up @@ -704,9 +718,11 @@ void BM_ConvertGpuToDiskStructColumns(benchmark::State& state)

rmm::cuda_stream stream;

auto table = create_struct_benchmark_table(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_struct_benchmark_table(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

size_t bytes_transferred = gpu_rep->get_size_in_bytes();

Expand Down Expand Up @@ -817,9 +833,11 @@ void BM_ConvertGpuToDiskPipeline(benchmark::State& state)

rmm::cuda_stream stream;

auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

size_t bytes_transferred = gpu_rep->get_size_in_bytes();

Expand Down Expand Up @@ -866,7 +884,7 @@ std::unique_ptr<disk_data_representation> write_table_to_disk(cudf::table&& tabl
auto registry = make_benchmark_registry();

auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space), stream);
auto disk_rep = registry->convert<disk_data_representation>(*gpu_rep, disk_space, stream);
stream.synchronize();
return disk_rep;
Expand Down Expand Up @@ -943,9 +961,11 @@ void pipeline_write_benchmark(benchmark::State& state, TableFactory table_factor

rmm::cuda_stream stream;

auto table = table_factory(total_bytes, num_columns);
auto gpu_rep = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = table_factory(total_bytes, num_columns);
auto gpu_rep =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
stream.view());

size_t bytes_transferred = gpu_rep->get_size_in_bytes();

Expand Down
36 changes: 24 additions & 12 deletions benchmark/benchmark_representation_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,19 @@ void BM_ConvertGpuToHost(benchmark::State& state)

for (uint64_t t = 0; t < thread_count; ++t) {
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
thread_gpu_reprs.push_back(std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space)));
thread_gpu_reprs.push_back(
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
rmm::cuda_stream_view{}));
}

// Warm-up
rmm::cuda_stream warmup_stream;
auto warmup_table = create_benchmark_table_from_bytes(1 * KiB, 2);
auto warmup_repr = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(warmup_table)), *const_cast<memory_space*>(gpu_space));
std::make_unique<cudf::table>(std::move(warmup_table)),
*const_cast<memory_space*>(gpu_space),
warmup_stream.view());
auto warmup_result =
registry->convert<host_data_packed_representation>(*warmup_repr, host_space, warmup_stream);
warmup_stream.synchronize();
Expand Down Expand Up @@ -259,9 +263,11 @@ void BM_ConvertHostToGpu(benchmark::State& state)

rmm::cuda_stream setup_stream;
for (uint64_t t = 0; t < thread_count; ++t) {
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_repr_temp = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_repr_temp =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
setup_stream.view());
auto host_repr =
registry->convert<host_data_packed_representation>(*gpu_repr_temp, host_space, setup_stream);
setup_stream.synchronize();
Expand Down Expand Up @@ -334,15 +340,19 @@ void BM_ConvertGpuToHostFast(benchmark::State& state)

for (uint64_t t = 0; t < thread_count; ++t) {
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
thread_gpu_reprs.push_back(std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space)));
thread_gpu_reprs.push_back(
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
rmm::cuda_stream_view{}));
}

// Warm-up: small transfer to prime CUDA driver
rmm::cuda_stream warmup_stream;
auto warmup_table = create_benchmark_table_from_bytes(1 * KiB, 2);
auto warmup_repr = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(warmup_table)), *const_cast<memory_space*>(gpu_space));
std::make_unique<cudf::table>(std::move(warmup_table)),
*const_cast<memory_space*>(gpu_space),
warmup_stream.view());
auto warmup_result =
registry->convert<host_data_representation>(*warmup_repr, host_space, warmup_stream);
warmup_stream.synchronize();
Expand Down Expand Up @@ -399,9 +409,11 @@ void BM_ConvertHostFastToGpu(benchmark::State& state)

rmm::cuda_stream setup_stream;
for (uint64_t t = 0; t < thread_count; ++t) {
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_repr_temp = std::make_unique<gpu_table_representation>(
std::make_unique<cudf::table>(std::move(table)), *const_cast<memory_space*>(gpu_space));
auto table = create_benchmark_table_from_bytes(total_bytes, num_columns);
auto gpu_repr_temp =
std::make_unique<gpu_table_representation>(std::make_unique<cudf::table>(std::move(table)),
*const_cast<memory_space*>(gpu_space),
setup_stream.view());
auto host_repr =
registry->convert<host_data_representation>(*gpu_repr_temp, host_space, setup_stream);
setup_stream.synchronize();
Expand Down
2 changes: 1 addition & 1 deletion docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ cuCascade uses a strict lock hierarchy to prevent deadlocks:
```
Level 1: atomic<uint64_t> (batch ID generation -- lock-free)
|
Level 2: data_batch, read_only_data_batch, mutable_data_batch (3-class sytem provides read-only and mutable access classes)
Level 2: data_batch, read_only_data_batch, mutable_data_batch (3-class system provides read-only and mutable access classes)
|
Level 3: idata_repository._mutex (protects batch storage)
|
Expand Down
42 changes: 33 additions & 9 deletions include/cucascade/data/data_batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <cucascade/data/common.hpp>
#include <cucascade/data/gpu_data_representation.hpp>
#include <cucascade/data/representation_converter.hpp>
#include <cucascade/memory/common.hpp>

Expand Down Expand Up @@ -98,7 +99,7 @@ class data_batch : public std::enable_shared_from_this<data_batch> {
uint64_t get_batch_id() const;

/**
* @brief Increment the subscriber interest count.
* @brief Increment the subscriber interest count.
*/
void subscribe();

Expand Down Expand Up @@ -205,7 +206,7 @@ class data_batch : public std::enable_shared_from_this<data_batch> {
*
* Releases the shared lock, then acquires an exclusive lock (may block).
* The source accessor is consumed via move.
* NOTE: The transition is not atomic.
* NOTE: The transition is not atomic.
*
* @param accessor Rvalue reference to the read-only accessor (consumed).
* @return A mutable_data_batch holding the exclusive lock.
Expand All @@ -217,7 +218,7 @@ class data_batch : public std::enable_shared_from_this<data_batch> {
*
* Releases the exclusive lock, then acquires a shared lock (may block).
* The source accessor is consumed via move.
* NOTE: The transition is not atomic.
* NOTE: The transition is not atomic.
*
* @param accessor Rvalue reference to the mutable accessor (consumed).
* @return A read_only_data_batch holding the shared lock.
Expand Down Expand Up @@ -293,6 +294,30 @@ class read_only_data_batch {
/** @brief Get a raw pointer to the memory space. */
memory::memory_space* get_memory_space() const { return _batch->get_memory_space(); }

/**
* @brief Get the writer event from the underlying GPU representation, or nullptr.
*
* D-B3 proxy: delegates to gpu_table_representation::get_writer_event() via
* dynamic_cast. Returns nullptr when the underlying representation is not a
* gpu_table_representation (e.g., host or disk tier) or when no writer event has
* been recorded yet.
*
* STREAM-LINEAGE: callers that cross stream / device boundaries should call
* cudaStreamWaitEvent on the returned event (when non-null) before reading the
* underlying memory of this batch.
*
* @return cudaEvent_t The writer event, or nullptr if not a GPU representation or
* no event recorded.
*/
[[nodiscard]] cudaEvent_t get_writer_event() const
{
auto* repr = get_data();
if (!repr) { return nullptr; }
auto* gpu_repr = dynamic_cast<gpu_table_representation*>(repr);
if (!gpu_repr) { return nullptr; }
return gpu_repr->get_writer_event();
}

// -- Clone operations (D-18/D-19/D-20/CLONE-01/CLONE-02) --

/**
Expand Down Expand Up @@ -353,8 +378,8 @@ class read_only_data_batch {
// INVARIANT: _batch must be declared before _lock -- destruction order is load-bearing.
// When destroyed, _lock releases the shared lock first, then _batch drops the parent
// reference. This prevents accessing a destroyed mutex.
std::shared_ptr<data_batch> _batch; ///< Parent lifetime (destroyed second)
std::shared_lock<std::shared_mutex> _lock; ///< Shared lock (destroyed first)
std::shared_ptr<data_batch> _batch; ///< Parent lifetime (destroyed second)
std::shared_lock<std::shared_mutex> _lock; ///< Shared lock (destroyed first)
};

/**
Expand Down Expand Up @@ -459,14 +484,13 @@ class mutable_data_batch {
* @param parent Shared pointer to the parent data_batch (moved in).
* @param lock Exclusive lock already acquired on the parent's mutex.
*/
mutable_data_batch(std::shared_ptr<data_batch> parent,
std::unique_lock<std::shared_mutex> lock);
mutable_data_batch(std::shared_ptr<data_batch> parent, std::unique_lock<std::shared_mutex> lock);

// INVARIANT: _batch must be declared before _lock -- destruction order is load-bearing.
// When destroyed, _lock releases the exclusive lock first, then _batch drops the parent
// reference. This prevents accessing a destroyed mutex.
std::shared_ptr<data_batch> _batch; ///< Parent lifetime (destroyed second)
std::unique_lock<std::shared_mutex> _lock; ///< Exclusive lock (destroyed first)
std::shared_ptr<data_batch> _batch; ///< Parent lifetime (destroyed second)
std::unique_lock<std::shared_mutex> _lock; ///< Exclusive lock (destroyed first)
};

// =============================================================================
Expand Down
5 changes: 2 additions & 3 deletions include/cucascade/data/data_repository.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ class idata_repository {
_data_batches.resize(partition_idx + 1);
}
_data_batches[partition_idx].push_back(std::move(batch));
}
}
}


/**
* @brief Remove and return the next data batch from the repository.
*
Expand Down Expand Up @@ -268,7 +267,7 @@ class idata_repository {
}

protected:
mutable std::mutex _mutex; ///< Mutex for thread-safe access to repository operations
mutable std::mutex _mutex; ///< Mutex for thread-safe access to repository operations
std::vector<std::vector<PtrType>>
_data_batches; ///< Container for data batch pointers (partitioned)
};
Expand Down
Loading
Loading