Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/deeplake_pg/pg_deeplake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ void save_index_metadata(Oid oid)
if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata")));
}

// Cross-instance propagation is driven by DDL WAL logging in ProcessUtility.
}

void load_index_metadata()
Expand Down
58 changes: 54 additions & 4 deletions cpp/deeplake_pg/table_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern "C" {
#include <icm/json.hpp>
#include <icm/string_map.hpp>
#include <nd/none.hpp>
#include <unordered_set>

#include <algorithm>
#include <vector>
Expand Down Expand Up @@ -287,34 +288,84 @@ void table_storage::load_table_metadata()
continue;
}

// Snapshot tables_ keys so we can roll back C++ state on failure
std::vector<Oid> tables_before;
tables_before.reserve(tables_.size());
for (const auto& [oid, _] : tables_) {
tables_before.push_back(oid);
}

MemoryContext saved_context = CurrentMemoryContext;
ResourceOwner saved_owner = CurrentResourceOwner;
BeginInternalSubTransaction(nullptr);
PG_TRY();
{
set_catalog_only_create(true);
pg::utils::spi_connector connector;
SPI_connect();
bool pushed_snapshot = false;
if (!ActiveSnapshotSet()) {
PushActiveSnapshot(GetTransactionSnapshot());
pushed_snapshot = true;
}
// Restore the original search_path so unqualified names resolve correctly
std::string saved_search_path;
if (!entry.search_path.empty()) {
const char* current_sp = GetConfigOption("search_path", true, false);
if (current_sp != nullptr) {
saved_search_path = current_sp;
}
StringInfoData sp_sql;
initStringInfo(&sp_sql);
appendStringInfo(&sp_sql,
"SELECT pg_catalog.set_config('search_path', %s, true)",
quote_literal_cstr(entry.search_path.c_str()));
SPI_execute(sp_sql.data, true, 0);
pfree(sp_sql.data);
}
SPI_execute(entry.ddl_sql.c_str(), false, 0);
// Restore the session's original search_path
if (!entry.search_path.empty()) {
StringInfoData restore_sql;
initStringInfo(&restore_sql);
appendStringInfo(&restore_sql,
"SELECT pg_catalog.set_config('search_path', %s, true)",
quote_literal_cstr(saved_search_path.c_str()));
SPI_execute(restore_sql.data, true, 0);
pfree(restore_sql.data);
}
if (pushed_snapshot) {
PopActiveSnapshot();
}
SPI_finish();
set_catalog_only_create(false);
ReleaseCurrentSubTransaction();
}
PG_CATCH();
{
set_catalog_only_create(false);
MemoryContextSwitchTo(saved_context);
ErrorData* edata = CopyErrorData();
CurrentResourceOwner = saved_owner;
RollbackAndReleaseCurrentSubTransaction();
FlushErrorState();
elog(WARNING, "pg_deeplake: DDL WAL replay failed (seq=%ld, tag=%s): %.200s",
entry.seq, entry.command_tag.c_str(), entry.ddl_sql.c_str());

// Remove any tables_ entries added during the failed replay,
// since the subtransaction rollback undid the catalog changes
// but the C++ map entries persist.
std::unordered_set<Oid> before_set(tables_before.begin(), tables_before.end());
for (auto it = tables_.begin(); it != tables_.end(); ) {
if (!before_set.contains(it->first)) {
it = tables_.erase(it);
} else {
++it;
}
}

elog(WARNING, "pg_deeplake: DDL WAL replay failed (seq=%ld, tag=%s): %s (SQL: %.200s)",
entry.seq, entry.command_tag.c_str(),
edata->message ? edata->message : "unknown error",
entry.ddl_sql.c_str());
FreeErrorData(edata);
}
PG_END_TRY();
}
Expand Down Expand Up @@ -853,7 +904,6 @@ void table_storage::drop_table(const std::string& table_name)
auto& table_data = get_table_data(table_name);
auto creds = session_credentials::get_credentials();


try {
table_data.commit(); // Ensure all changes are committed before deletion
table_version_tracker::drop_table(table_data.get_table_oid());
Expand Down
15 changes: 0 additions & 15 deletions postgres/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
FROM BASE_IMAGE
ARG VERSION=VERSION
ARG TARGETARCH
ARG STATELESS=false

LABEL name="pg-deeplake" \
version="${VERSION}" \
Expand Down Expand Up @@ -29,18 +28,4 @@ COPY ./debs/ /tmp/debs/
COPY --chmod=444 ./LICENSE /LICENSE
COPY ./postgres/docker-entrypoint.d/ /docker-entrypoint-initdb.d/
RUN apt-get install --no-install-recommends -y /tmp/debs/pg-deeplake-${VERSION}_${TARGETARCH}.deb && rm -rf /tmp/debs/
COPY ./serverless/scripts/init-deeplake-stateless.sh /tmp/init-deeplake-stateless.sh
COPY ./serverless/config/postgresql-overrides.conf /tmp/postgresql-overrides.conf
COPY ./serverless/scripts/health-check.sh /tmp/health-check.sh
RUN if [ "$STATELESS" = "true" ]; then \
mv /tmp/init-deeplake-stateless.sh /docker-entrypoint-initdb.d/3-stateless-init.sh && \
chmod 755 /docker-entrypoint-initdb.d/3-stateless-init.sh && \
mv /tmp/postgresql-overrides.conf /etc/postgresql-overrides.conf && \
chmod 644 /etc/postgresql-overrides.conf && \
mv /tmp/health-check.sh /usr/local/bin/health-check.sh && \
chmod 755 /usr/local/bin/health-check.sh && \
mkdir -p /deeplake-data; \
else \
rm -f /tmp/init-deeplake-stateless.sh /tmp/postgresql-overrides.conf /tmp/health-check.sh; \
fi
USER 999