Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/deeplake_pg/dl_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ std::vector<int64_t> load_int64_vector(const nd::array& arr)

int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
{
if (root_path.empty()) {
return 0;
}
const auto tables_path = join_path(root_path, k_tables_name);
const auto columns_path = join_path(root_path, k_columns_name);
const auto indexes_path = join_path(root_path, k_indexes_name);
Expand Down
8 changes: 7 additions & 1 deletion cpp/deeplake_pg/pg_deeplake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void deeplake_xact_callback(XactEvent event, void *arg)
void init_deeplake()
{
static bool initialized = false;
if (initialized) {
if (initialized || !IsUnderPostmaster) {
return;
}
initialized = true;
Expand All @@ -368,6 +368,12 @@ void init_deeplake()
constexpr int THREAD_POOL_MULTIPLIER = 8; // Threads per CPU core for async operations
deeplake_api::initialize(std::make_shared<pg::logger_adapter>(), THREAD_POOL_MULTIPLIER * base::system_report::cpu_cores());

const std::string redis_url = base::getenv<std::string>("REDIS_URL", "");
if (!redis_url.empty()) {
deeplake_api::initialize_redis_cache(redis_url, 86400,
deeplake_api::metadata_catalog_cache_pattern);
}

pg::table_storage::instance(); /// initialize table storage

RegisterXactCallback(deeplake_xact_callback, nullptr);
Expand Down
35 changes: 20 additions & 15 deletions cpp/deeplake_pg/table_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ void table_storage::save_table_metadata(const pg::table_data& table_data)
}
return root;
}();
if (root_dir.empty()) {
return;
}
auto creds = session_credentials::get_credentials();
pg::dl_catalog::ensure_catalog(root_dir, creds);

Expand Down Expand Up @@ -292,8 +295,8 @@ void table_storage::load_table_metadata()
}();
auto creds = session_credentials::get_credentials();

// Stateless catalog sync (only when enabled)
if (pg::stateless_enabled) {
// Stateless catalog sync (only when enabled and root_dir is configured)
if (pg::stateless_enabled && !root_dir.empty()) {
// Fast path: if already loaded, just check version without ensure_catalog
if (tables_loaded_) {
const auto current_version = pg::dl_catalog::get_catalog_version(root_dir, creds);
Expand Down Expand Up @@ -534,7 +537,7 @@ void table_storage::load_table_metadata()
}
try {
// Seed the DL catalog with legacy metadata (only when stateless is enabled).
if (pg::stateless_enabled) {
if (pg::stateless_enabled && !root_dir.empty()) {
auto [schema_name, simple_table_name] = split_table_name(table_name);
pg::dl_catalog::table_meta meta;
meta.table_id = schema_name + "." + simple_table_name;
Expand Down Expand Up @@ -580,7 +583,7 @@ void table_storage::load_table_metadata()
base::log_channel::generic, "Failed to delete invalid table metadata for table_oid: {}", invalid_oid);
}
}
if (catalog_seeded && pg::stateless_enabled) {
if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) {
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
}
Expand Down Expand Up @@ -991,17 +994,19 @@ void table_storage::drop_table(const std::string& table_name)
}
return root;
}();
pg::dl_catalog::ensure_catalog(root_dir, creds);
auto [schema_name, simple_table_name] = split_table_name(table_name);
pg::dl_catalog::table_meta meta;
meta.table_id = schema_name + "." + simple_table_name;
meta.schema_name = schema_name;
meta.table_name = simple_table_name;
meta.dataset_path = table_data.get_dataset_path().url();
meta.state = "dropping";
pg::dl_catalog::upsert_table(root_dir, creds, meta);
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
if (!root_dir.empty()) {
pg::dl_catalog::ensure_catalog(root_dir, creds);
auto [schema_name, simple_table_name] = split_table_name(table_name);
pg::dl_catalog::table_meta meta;
meta.table_id = schema_name + "." + simple_table_name;
meta.schema_name = schema_name;
meta.table_name = simple_table_name;
meta.dataset_path = table_data.get_dataset_path().url();
meta.state = "dropping";
pg::dl_catalog::upsert_table(root_dir, creds, meta);
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
}
}

try {
Expand Down
15 changes: 10 additions & 5 deletions cpp/deeplake_pg/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,22 @@ static std::string get_pg_data_directory()
{
const char* data_dir = GetConfigOption("data_directory", true, false);
if (data_dir == nullptr) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Unable to retrieve data_directory")));
return "";
}
return std::string(data_dir);
}

static std::string get_deeplake_root_directory()
{
static const std::string root_dir_variable_name = "DEEPLAKE_ROOT_PATH";
static const std::string pg_data_dir = get_pg_data_directory();
static const std::string deeplake_root_dir = base::getenv<std::string>(root_dir_variable_name, pg_data_dir);
return deeplake_root_dir;
// Avoid static locals: if get_pg_data_directory() previously failed via
// ereport(ERROR) (longjmp through C++ static init), the static guard
// variable is permanently poisoned and subsequent calls return "".
// Re-evaluate every time so a later call can succeed once GUCs are ready.
auto root = base::getenv<std::string>("DEEPLAKE_ROOT_PATH", "");
if (root.empty()) {
root = get_pg_data_directory();
}
return root;
}

inline std::pair<BlockNumber, OffsetNumber> row_number_to_tid(int64_t row_number)
Expand Down
15 changes: 15 additions & 0 deletions postgres/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM BASE_IMAGE
ARG VERSION=VERSION
ARG TARGETARCH
ARG STATELESS=false

LABEL name="pg-deeplake" \
version="${VERSION}" \
Expand Down Expand Up @@ -28,4 +29,18 @@ COPY ./debs/ /tmp/debs/
COPY --chmod=444 ./LICENSE /LICENSE
COPY ./postgres/docker-entrypoint.d/ /docker-entrypoint-initdb.d/
RUN apt-get install --no-install-recommends -y /tmp/debs/pg-deeplake-${VERSION}_${TARGETARCH}.deb && rm -rf /tmp/debs/
COPY ./serverless/scripts/init-deeplake-stateless.sh /tmp/init-deeplake-stateless.sh
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing source files: These COPY commands reference files in ./serverless/ directory that don't exist in this PR or repository, causing build failures when STATELESS=true.

Fix: Either add the missing files to this PR or remove the stateless Docker build logic until the serverless infrastructure is ready.

COPY ./serverless/config/postgresql-overrides.conf /tmp/postgresql-overrides.conf
COPY ./serverless/scripts/health-check.sh /tmp/health-check.sh
RUN if [ "$STATELESS" = "true" ]; then \
mv /tmp/init-deeplake-stateless.sh /docker-entrypoint-initdb.d/3-stateless-init.sh && \
chmod 755 /docker-entrypoint-initdb.d/3-stateless-init.sh && \
mv /tmp/postgresql-overrides.conf /etc/postgresql-overrides.conf && \
chmod 644 /etc/postgresql-overrides.conf && \
mv /tmp/health-check.sh /usr/local/bin/health-check.sh && \
chmod 755 /usr/local/bin/health-check.sh && \
mkdir -p /deeplake-data; \
else \
rm -f /tmp/init-deeplake-stateless.sh /tmp/postgresql-overrides.conf /tmp/health-check.sh; \
fi
USER 999
Loading