Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion crates/integrations/datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::RwLock;

use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::{CatalogProvider, MemorySchemaProvider, SchemaProvider};
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::MemTable;
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DFResult;
use paimon::catalog::{Catalog, Identifier};
Expand All @@ -44,6 +49,8 @@ pub struct PaimonCatalogProvider {
catalog: Arc<dyn Catalog>,
/// Session-scoped dynamic options shared with the SQL context.
dynamic_options: DynamicOptions,
/// In-memory databases for temporary tables, keyed by database name.
temp_databases: RwLock<HashMap<String, Arc<MemorySchemaProvider>>>,
}

impl Debug for PaimonCatalogProvider {
Expand All @@ -62,6 +69,7 @@ impl PaimonCatalogProvider {
PaimonCatalogProvider {
catalog,
dynamic_options: Default::default(),
temp_databases: RwLock::new(HashMap::new()),
}
}

Expand All @@ -72,6 +80,7 @@ impl PaimonCatalogProvider {
PaimonCatalogProvider {
catalog,
dynamic_options,
temp_databases: RwLock::new(HashMap::new()),
}
}
}
Expand All @@ -98,6 +107,15 @@ impl CatalogProvider for PaimonCatalogProvider {
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
// First check temp_databases
let databases = self
.temp_databases
.read()
.unwrap_or_else(|e| e.into_inner());
if let Some(schema) = databases.get(name) {
return Some(Arc::clone(schema) as Arc<dyn SchemaProvider>);
}

let catalog = Arc::clone(&self.catalog);
let dynamic_options = Arc::clone(&self.dynamic_options);
let name = name.to_string();
Expand Down Expand Up @@ -169,6 +187,59 @@ impl CatalogProvider for PaimonCatalogProvider {
}
}

impl PaimonCatalogProvider {
/// Creates or returns an existing temporary in-memory database under this catalog.
fn get_or_create_temp_database(&self, name: &str) -> Arc<MemorySchemaProvider> {
let mut databases = self
.temp_databases
.write()
.unwrap_or_else(|e| e.into_inner());
databases
.entry(name.to_string())
.or_insert_with(|| Arc::new(MemorySchemaProvider::new()))
.clone()
}

/// Registers a temporary table in the specified database.
/// Creates the database if it does not exist.
pub fn register_temp_table(
&self,
database: &str,
table_name: &str,
schema_ref: SchemaRef,
batches: Vec<RecordBatch>,
) -> DFResult<()> {
let mem_database = self.get_or_create_temp_database(database);
let mem_table = MemTable::try_new(schema_ref, vec![batches])?;
mem_database.register_table(table_name.to_string(), Arc::new(mem_table))?;
Ok(())
}

/// Deregisters a temporary table from the specified database.
pub fn deregister_temp_table(
&self,
database: &str,
table_name: &str,
) -> DFResult<Option<Arc<dyn TableProvider>>> {
let databases = self
.temp_databases
.read()
.unwrap_or_else(|e| e.into_inner());
let mem_database = databases
.get(database)
.ok_or_else(|| plan_datafusion_err!("Unknown temp database '{database}'"))?;
mem_database.deregister_table(table_name)
}

/// Returns whether a temp database exists with the given name.
pub fn has_temp_database(&self, name: &str) -> bool {
self.temp_databases
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key(name)
}
}

/// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing
/// access to table providers within a specific database.
///
Expand Down
20 changes: 12 additions & 8 deletions crates/integrations/datafusion/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ use paimon::spec::CoreOptions;
use paimon::table::{CopyOnWriteMergeWriter, Table};

use crate::error::to_datafusion_error;
use crate::merge_into::TempTableTracker;
use crate::merge_into::{
build_partition_set_from_where, extract_tracking_columns, is_delete_conflict, ok_result,
register_cow_target_table, retry_on_conflict,
};
use crate::sql_context::SQLContext;

/// Execute a DELETE statement on a Paimon table.
///
/// `table_ref` is the SQL table reference string (e.g. `"paimon.test_db.t"`),
/// already extracted by the caller for catalog resolution.
pub(crate) async fn execute_delete(
ctx: &SessionContext,
ctx: &SQLContext,
delete: &Delete,
table: Table,
table_ref: &str,
Expand Down Expand Up @@ -73,7 +75,7 @@ pub(crate) async fn execute_delete(

/// Execute DELETE on an append-only table with retry on delete conflict.
async fn execute_cow_delete(
ctx: &SessionContext,
ctx: &SQLContext,
delete: &Delete,
table: &Table,
table_ref: &str,
Expand All @@ -86,7 +88,7 @@ async fn execute_cow_delete(

/// Single attempt of CoW DELETE execution.
async fn execute_cow_delete_once(
ctx: &SessionContext,
ctx: &SQLContext,
delete: &Delete,
table: &Table,
table_ref: &str,
Expand All @@ -99,14 +101,16 @@ async fn execute_cow_delete_once(
.await
.map_err(to_datafusion_error)?;

let (has_data, cow_table_guard) = register_cow_target_table(ctx, table, &writer).await?;
let mut temp_tracker = TempTableTracker::new(ctx);
let (has_data, cow_table_name) =
register_cow_target_table(ctx, table, &writer, &mut temp_tracker).await?;
if !has_data {
return ok_result(ctx, 0);
return ok_result(ctx.ctx(), 0);
}

let cow_target_qualified = cow_table_name;
let result =
execute_cow_delete_inner(ctx, &cow_table_guard.qualified_name(), delete, &mut writer).await;
drop(cow_table_guard);
execute_cow_delete_inner(ctx.ctx(), &cow_target_qualified, delete, &mut writer).await;
let total_count = result?;

let messages = writer.prepare_commit().await.map_err(to_datafusion_error)?;
Expand All @@ -115,7 +119,7 @@ async fn execute_cow_delete_once(
commit.commit(messages).await.map_err(to_datafusion_error)?;
}

ok_result(ctx, total_count)
ok_result(ctx.ctx(), total_count)
}

async fn execute_cow_delete_inner(
Expand Down
Loading
Loading