Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
438 changes: 357 additions & 81 deletions cpp/deeplake_pg/dl_catalog.cpp

Large diffs are not rendered by default.

45 changes: 39 additions & 6 deletions cpp/deeplake_pg/dl_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
#include <icm/string_map.hpp>

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

namespace deeplake_api { class catalog_table; }

namespace pg::dl_catalog {

struct table_meta
Expand All @@ -15,6 +18,7 @@ struct table_meta
std::string table_name;
std::string dataset_path;
std::string state;
std::string db_name;
int64_t updated_at = 0;
};

Expand All @@ -36,6 +40,14 @@ struct index_meta
int32_t order_type = 0;
};

struct schema_meta
{
std::string schema_name; // PK
std::string owner;
std::string state; // "ready" or "dropping"
int64_t updated_at = 0;
};

struct database_meta
{
std::string db_name; // PK
Expand All @@ -48,23 +60,44 @@ struct database_meta
int64_t updated_at = 0;
};

// Shared (cluster-wide) catalog: meta + databases
int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds);

std::vector<table_meta> load_tables(const std::string& root_path, icm::string_map<> creds);
std::vector<column_meta> load_columns(const std::string& root_path, icm::string_map<> creds);
std::vector<index_meta> load_indexes(const std::string& root_path, icm::string_map<> creds);
// Per-database catalog: tables + columns + indexes + meta
int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);

// Per-database loaders (read from {root}/{db_name}/__deeplake_catalog/)
std::vector<table_meta> load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
std::vector<column_meta> load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
std::vector<index_meta> load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);

// Load tables and columns in parallel for better performance
std::pair<std::vector<table_meta>, std::vector<column_meta>>
load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);

// Per-database schema catalog
std::vector<schema_meta> load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta);

void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);
// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/)
void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta);
void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<column_meta>& columns);
void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<index_meta>& indexes);

// Shared (cluster-wide) database catalog
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta);

// Global (shared) catalog version
int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds);
void bump_catalog_version(const std::string& root_path, icm::string_map<> creds);

// Per-database catalog version
int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);

// Open the per-database meta table handle (for parallel .version() calls in sync worker)
std::shared_ptr<deeplake_api::catalog_table>
open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);

} // namespace pg::dl_catalog
67 changes: 67 additions & 0 deletions cpp/deeplake_pg/extension_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ extern "C" {
#include <postgres.h>

#include <catalog/namespace.h>
#include <commands/dbcommands.h>
#include <commands/defrem.h>
#include <miscadmin.h>
#include <commands/vacuum.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/planner.h>
Expand Down Expand Up @@ -586,6 +588,35 @@ static void process_utility(PlannedStmt* pstmt,
}
}
}

// Mark schema as "dropping" in the S3 catalog
if (pg::stateless_enabled) {
try {
auto root_path = pg::session_credentials::get_root_path();
if (root_path.empty()) {
root_path = pg::utils::get_deeplake_root_directory();
}
if (!root_path.empty()) {
auto creds = pg::session_credentials::get_credentials();
const char* dbname = get_database_name(MyDatabaseId);
std::string db_name = dbname ? dbname : "postgres";
if (dbname) pfree(const_cast<char*>(dbname));

pg::dl_catalog::ensure_catalog(root_path, creds);
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);

pg::dl_catalog::schema_meta s_meta;
s_meta.schema_name = schema_name;
s_meta.state = "dropping";
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);

pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
}
} catch (const std::exception& e) {
elog(WARNING, "pg_deeplake: failed to mark schema '%s' as dropping in catalog: %s", schema_name, e.what());
}
}
}
} else if (stmt->removeType == OBJECT_DATABASE) {
const char* query = "SELECT nspname, relname "
Expand Down Expand Up @@ -691,6 +722,7 @@ static void process_utility(PlannedStmt* pstmt,
}
if (!root_path.empty()) {
auto creds = pg::session_credentials::get_credentials();
pg::dl_catalog::ensure_catalog(root_path, creds);
pg::dl_catalog::database_meta db_meta;
db_meta.db_name = dbstmt->dbname;
db_meta.state = "dropping";
Expand Down Expand Up @@ -727,6 +759,7 @@ static void process_utility(PlannedStmt* pstmt,
}
if (!root_path.empty()) {
auto creds = pg::session_credentials::get_credentials();
pg::dl_catalog::ensure_catalog(root_path, creds);
pg::dl_catalog::database_meta db_meta;
db_meta.db_name = dbstmt->dbname;
db_meta.state = "ready";
Expand Down Expand Up @@ -758,6 +791,40 @@ static void process_utility(PlannedStmt* pstmt,
}
}

// Post-hook: record CREATE SCHEMA in S3 catalog for multi-instance sync
if (IsA(pstmt->utilityStmt, CreateSchemaStmt) && pg::stateless_enabled) {
CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt;
try {
auto root_path = pg::session_credentials::get_root_path();
if (root_path.empty()) {
root_path = pg::utils::get_deeplake_root_directory();
}
if (!root_path.empty() && schemastmt->schemaname != nullptr) {
auto creds = pg::session_credentials::get_credentials();
const char* dbname = get_database_name(MyDatabaseId);
std::string db_name = dbname ? dbname : "postgres";
if (dbname) pfree(const_cast<char*>(dbname));

pg::dl_catalog::ensure_catalog(root_path, creds);
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);

pg::dl_catalog::schema_meta s_meta;
s_meta.schema_name = schemastmt->schemaname;
s_meta.state = "ready";
if (schemastmt->authrole != nullptr) {
s_meta.owner = schemastmt->authrole->rolename;
}
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);

pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
elog(DEBUG1, "pg_deeplake: recorded CREATE SCHEMA '%s' in catalog", schemastmt->schemaname);
}
} catch (const std::exception& e) {
elog(DEBUG1, "pg_deeplake: failed to record CREATE SCHEMA in catalog: %s", e.what());
}
}

// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;
Expand Down
2 changes: 1 addition & 1 deletion cpp/deeplake_pg/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class logger_adapter : public base::logger_adapter
elog(DEBUG1, "%s", message.c_str());
break;
case base::log_level::info:
elog(INFO, "%s", message.c_str());
elog(LOG, "%s", message.c_str());
break;
case base::log_level::warning:
elog(WARNING, "%s", message.c_str());
Expand Down
39 changes: 39 additions & 0 deletions cpp/deeplake_pg/pg_deeplake.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "pg_deeplake.hpp"
#include "dl_catalog.hpp"
#include "logger.hpp"
#include "table_storage.hpp"
#include "utils.hpp"

#include <deeplake_api/deeplake_api.hpp>
#include <deeplake_core/deeplake_index_type.hpp>
Expand All @@ -9,6 +11,8 @@
extern "C" {
#endif

#include <commands/dbcommands.h>
#include <miscadmin.h>
#include <storage/ipc.h>

#ifdef __cplusplus
Expand Down Expand Up @@ -260,6 +264,41 @@ void save_index_metadata(Oid oid)
if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata")));
}

// Persist index to shared catalog for stateless multi-instance sync.
// Skip when in catalog-only mode — the table was synced FROM the catalog,
// so writing back would be redundant and cause version bump loops.
if (pg::stateless_enabled && !pg::table_storage::is_catalog_only_create()) {
try {
auto root_dir = pg::session_credentials::get_root_path();
if (root_dir.empty()) {
root_dir = pg::utils::get_deeplake_root_directory();
}
if (!root_dir.empty()) {
auto creds = pg::session_credentials::get_credentials();
const char* dbname = get_database_name(MyDatabaseId);
std::string db_name = dbname ? dbname : "postgres";
if (dbname) pfree(const_cast<char*>(dbname));

const std::string& table_id = idx_info.table_name(); // already schema-qualified

pg::dl_catalog::index_meta idx_meta;
idx_meta.table_id = table_id;
idx_meta.column_names = idx_info.get_column_names_string();
idx_meta.index_type = std::string(deeplake_core::deeplake_index_type::to_string(idx_info.index_type()));
idx_meta.order_type = static_cast<int32_t>(idx_info.order_type());

std::vector<pg::dl_catalog::index_meta> indexes = {idx_meta};
pg::dl_catalog::upsert_indexes(root_dir, db_name, creds, indexes);
pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, creds);
pg::dl_catalog::bump_catalog_version(root_dir, creds);
}
} catch (const std::exception& e) {
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: %s", e.what());
} catch (...) {
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: unknown error");
}
}
}

void load_index_metadata()
Expand Down
Loading