Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions cpp/CMakeLists.pg.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,106 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/FindDuckDB.cmake)
set(CMAKE_PREFIX_PATH ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api ${CMAKE_PREFIX_PATH})
find_package(DeepLakeAPI REQUIRED PATHS ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api/lib/cmake/deeplake_api NO_DEFAULT_PATH)

# AWS SDK - required by deeplake_api (symbols not bundled in the prebuilt library)
find_package(AWSSDK COMPONENTS core s3 identity-management)
if(AWSSDK_FOUND)
message(STATUS "Found AWS SDK: ${AWSSDK_LIBRARIES}")
list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${AWSSDK_LIBRARIES})
else()
message(STATUS "AWS SDK not found via find_package, trying manual discovery...")
# Try to find AWS SDK libraries in common vcpkg locations
set(_AWS_SEARCH_PATHS
"$ENV{VCPKG_ROOT}/installed/arm64-linux/lib"
"$ENV{VCPKG_ROOT}/packages/aws-sdk-cpp_arm64-linux/lib"
)
# Also check for AWS libs in the build's vcpkg_installed
file(GLOB _VCPKG_INSTALLED_DIRS "${CMAKE_BINARY_DIR}/vcpkg_installed/*/lib")
list(APPEND _AWS_SEARCH_PATHS ${_VCPKG_INSTALLED_DIRS})

set(_AWS_LIBS
aws-cpp-sdk-s3
aws-cpp-sdk-core
aws-cpp-sdk-identity-management
aws-cpp-sdk-cognito-identity
aws-cpp-sdk-sts
aws-crt-cpp
aws-c-s3
aws-c-auth
aws-c-http
aws-c-mqtt
aws-c-event-stream
aws-c-io
aws-c-cal
aws-c-compression
aws-c-sdkutils
aws-c-common
aws-checksums
s2n
)
set(_FOUND_AWS_LIBS)
foreach(_lib ${_AWS_LIBS})
find_library(_LIB_${_lib} NAMES ${_lib} PATHS ${_AWS_SEARCH_PATHS} NO_DEFAULT_PATH)
if(_LIB_${_lib})
list(APPEND _FOUND_AWS_LIBS ${_LIB_${_lib}})
message(STATUS " Found: ${_LIB_${_lib}}")
endif()
endforeach()
if(_FOUND_AWS_LIBS)
list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${_FOUND_AWS_LIBS})
message(STATUS "Linked ${CMAKE_LIST_LENGTH(_FOUND_AWS_LIBS)} AWS SDK libraries manually")
else()
message(WARNING "AWS SDK libraries not found. pg_deeplake may fail to load at runtime.")
endif()
endif()

# }

include_directories(${DEFAULT_PARENT_DIR}/.ext/duckdb/src/include)

set(POSTGRES_DIR "${DEFAULT_PARENT_DIR}/../postgres")

# Build fingerprint: git hash + dirty state for hot-reload verification.
# Always recomputed at configure time so the .so reflects the current source.
# -c safe.directory=* is needed because the Docker container runs as root
# but the bind-mounted /deeplake is owned by the host user.
execute_process(
COMMAND git -c safe.directory=* rev-parse --short=12 HEAD
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
OUTPUT_VARIABLE _GIT_HASH
OUTPUT_STRIP_TRAILING_WHITESPACE
ERROR_QUIET
RESULT_VARIABLE _GIT_RESULT)
if(_GIT_RESULT EQUAL 0)
execute_process(
COMMAND git -c safe.directory=* diff --quiet
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE _GIT_DIRTY)
if(_GIT_DIRTY)
set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}-dirty")
else()
set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}")
endif()
else()
set(PG_DEEPLAKE_BUILD_HASH "unknown")
endif()
message(STATUS "PG_DEEPLAKE_BUILD_HASH: ${PG_DEEPLAKE_BUILD_HASH}")

foreach(PG_VERSION ${PG_VERSIONS})
set(PG_LIB "pg_deeplake_${PG_VERSION}")
message(STATUS "Creating library ${PG_LIB} with sources: ${PG_SOURCES}")
ADD_LIBRARY(${PG_LIB} SHARED ${PG_SOURCES})

# Embed build fingerprint for hot-reload verification.
# Isolated in a generated .cpp so hash changes only recompile one file
# instead of every source file in the target (configure_file only
# rewrites when content actually changes, so same-hash = zero work).
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/deeplake_pg/build_info.cpp.in"
"${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp"
@ONLY)
target_sources(${PG_LIB} PRIVATE
"${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp")

set(PG_TARGET_NAME "configure_postgres_REL_${PG_VERSION}_0")

if(TARGET ${PG_TARGET_NAME})
Expand Down Expand Up @@ -151,8 +240,14 @@ foreach(PG_VERSION ${PG_VERSIONS})
DESTINATION ${PG_SHAREDIR}/extension
)

# Symlink instead of copy: always points at the build output, zero I/O,
# and survives incremental builds where the target is up-to-date (the
# existing symlink still resolves to the current .so).
add_custom_command(TARGET ${PG_LIB} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy ${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} "${POSTGRES_DIR}/"
COMMENT "Copied ${CMAKE_BINARY_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} to ${POSTGRES_DIR}/"
COMMAND ${CMAKE_COMMAND} -E rm -f "${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}"
COMMAND ${CMAKE_COMMAND} -E create_symlink
"$<TARGET_FILE:${PG_LIB}>"
"${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}"
COMMENT "Symlinked ${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} -> $<TARGET_FILE:${PG_LIB}>"
)
endforeach()
4 changes: 2 additions & 2 deletions cpp/CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
"inherits": "template-unix",
"cacheVariables": {
"AL_PG": "ON",
"AL_ASSERTIONS": "ON"
"AL_ASSERTIONS": "OFF"
}
},
{
"name": "deeplake-pg-dev-windows",
"inherits": "template-windows",
"cacheVariables": {
"AL_PG": "ON",
"AL_ASSERTIONS": "ON"
"AL_ASSERTIONS": "OFF"
}
},
{
Expand Down
10 changes: 10 additions & 0 deletions cpp/bifrost/async_prefetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class async_prefetcher
void start();
void stop() noexcept;

/**
* @brief Wait for the first batch to be ready (for cold run optimization).
* @param timeout_ms Maximum time to wait in milliseconds.
*
* This method is used for eager prefetching during cold runs.
* It fetches and caches the first batch so that subsequent next_batch()
* calls return immediately without blocking.
*/
void wait_for_first_batch(int64_t timeout_ms = 30000);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing implementation: wait_for_first_batch() is declared but not implemented in any .cpp file. This will cause linker errors when the code is built.

Fix: Implement this method in async_prefetcher.cpp or mark it as = delete if it's not meant to be used yet.


bool is_started() const noexcept;

heimdall::dataset_view_ptr dataset() const noexcept;
Expand Down
18 changes: 18 additions & 0 deletions cpp/bifrost/column_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ class column_streamer
return b.columns()[0].array();
}

async::promise<deeplake_core::batch> next_batch_async()
{
return prefetcher_.next_batch_async();
}

/**
* @brief Pre-fetch and cache the first batch for cold run optimization.
* @param timeout_ms Maximum time to wait in milliseconds.
*
* This method waits for the first batch to be downloaded and cached
* internally. Subsequent calls to next_batch() will return immediately
* for the first batch.
*/
void ensure_first_batch_ready(int64_t timeout_ms = 30000)
{
prefetcher_.wait_for_first_batch(timeout_ms);
}

bool empty() const noexcept
{
return prefetcher_.size() == 0;
Expand Down
37 changes: 22 additions & 15 deletions cpp/cmake/modules/FindPostgres.cmake
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
include(FetchContent)
include(ExternalProject)

# Define PostgreSQL versions
set(postgres_versions
"REL_16_0"
"REL_17_0"
"REL_18_0"
)

# Define corresponding SHA256 checksums for each version
set(postgres_SHA256_CHECKSUMS
"37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf"
"16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34"
"b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc"
)

# Loop through each PostgreSQL version
# Map BUILD_PG_* options to version tags and SHA256 checksums.
# Only download/build PostgreSQL versions that are actually enabled,
# avoiding unnecessary network downloads and compilation.
set(postgres_versions)
set(postgres_SHA256_CHECKSUMS)

if(BUILD_PG_16)
list(APPEND postgres_versions "REL_16_0")
list(APPEND postgres_SHA256_CHECKSUMS "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf")
endif()

if(BUILD_PG_17)
list(APPEND postgres_versions "REL_17_0")
list(APPEND postgres_SHA256_CHECKSUMS "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34")
endif()

if(BUILD_PG_18)
list(APPEND postgres_versions "REL_18_0")
list(APPEND postgres_SHA256_CHECKSUMS "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc")
endif()

# Loop through each enabled PostgreSQL version
foreach(postgres_version IN LISTS postgres_versions)
# Find the index of the current version
list(FIND postgres_versions ${postgres_version} postgres_index)
Expand Down
8 changes: 8 additions & 0 deletions cpp/deeplake_pg/build_info.cpp.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Auto-generated at CMake configure time — do not edit manually.
// Isolated so that build-hash changes only recompile this single file
// instead of all pg_deeplake source files (18+).
// configure_file() only rewrites when the content actually changes,
// so an unchanged hash means zero recompilation.

__attribute__((visibility("default"), used))
const char pg_deeplake_build_hash[] = "PG_DEEPLAKE_BUILD:@PG_DEEPLAKE_BUILD_HASH@";
18 changes: 18 additions & 0 deletions cpp/deeplake_pg/deeplake_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,26 @@ void analyze_plan(PlannedStmt* plan)
}
}
}

// Warm first batches for all streamers in parallel for cold run optimization.
// This blocks until all first batches are downloaded but overlaps I/O across columns.
if (pg::eager_batch_prefetch) {
try {
table_data->get_streamers().warm_all_streamers();
} catch (const std::exception& e) {
elog(WARNING, "Eager batch prefetch failed during analyze_plan: %s", e.what());
} catch (...) {
elog(WARNING, "Eager batch prefetch failed during analyze_plan with unknown exception");
}
}
}
pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake);

// Pre-initialize DuckDB connection early so it's ready when query execution starts.
// This reduces cold run latency by front-loading DuckDB init.
if (pg::eager_batch_prefetch && pg::query_info::current().is_deeplake_table_referenced()) {
pg::ensure_duckdb_initialized();
}
}

} // namespace pg
Expand Down
57 changes: 55 additions & 2 deletions cpp/deeplake_pg/duckdb_deeplake_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,22 @@ class deeplake_scan_function_helper
}

ASSERT(output_.ColumnCount() == global_state_.column_ids.size());

// Pre-trigger parallel batch initialization for all streaming columns.
// Without this, each column's batch download would block sequentially,
// serializing I/O waits. This overlaps all column batch downloads.
if (!has_index_search() && pg::eager_batch_prefetch) {
std::vector<int32_t> streaming_cols;
for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) {
const auto col_idx = global_state_.column_ids[i];
if (bind_data_.table_data.is_column_requested(col_idx) &&
bind_data_.table_data.column_has_streamer(col_idx)) {
streaming_cols.push_back(col_idx);
}
}
bind_data_.table_data.get_streamers().prefetch_batches_for_row(streaming_cols, current_row);
}

icm::vector<async::promise<void>> column_promises;
// Fill output vectors column by column using table_data streamers
for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) {
Expand Down Expand Up @@ -988,10 +1004,47 @@ void deeplake_scan_function(duckdb::ClientContext& context, duckdb::TableFunctio
deeplake_scan_function_helper helper(context, data, output);
try {
helper.scan();
} catch (const duckdb::OutOfMemoryException& e) {
// Provide helpful error message with configuration hints for OOM
elog(ERROR,
"DuckDB out of memory during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} catch (const duckdb::Exception& e) {
elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what());
// Check if the error message indicates memory issues
std::string msg = e.what();
std::string msg_lower;
msg_lower.reserve(msg.size());
for (char c : msg) {
msg_lower.push_back(static_cast<char>(std::tolower(static_cast<unsigned char>(c))));
}
if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) {
elog(ERROR,
"DuckDB memory error during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} else {
elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what());
}
} catch (const std::exception& e) {
elog(ERROR, "STD exception during Deeplake scan: %s", e.what());
// Check if the error message indicates memory issues
std::string msg = e.what();
std::string msg_lower;
msg_lower.reserve(msg.size());
for (char c : msg) {
msg_lower.push_back(static_cast<char>(std::tolower(static_cast<unsigned char>(c))));
}
if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) {
elog(ERROR,
"Memory error during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} else {
elog(ERROR, "STD exception during Deeplake scan: %s", e.what());
}
} catch (...) {
elog(ERROR, "Unknown exception during Deeplake scan");
}
Expand Down
Loading
Loading