Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::catalog::{Session, TableFunctionArgs, TableFunctionImpl};
use datafusion::common::{Column, plan_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
Expand Down Expand Up @@ -326,7 +326,8 @@ fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<Str
pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Expand Down Expand Up @@ -517,7 +518,8 @@ impl MetadataCacheFunc {
}

impl TableFunctionImpl for MetadataCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("metadata_cache should have no arguments");
}
Expand Down Expand Up @@ -635,7 +637,8 @@ impl StatisticsCacheFunc {
}

impl TableFunctionImpl for StatisticsCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("statistics_cache should have no arguments");
}
Expand Down Expand Up @@ -770,7 +773,8 @@ impl ListFilesCacheFunc {
}

impl TableFunctionImpl for ListFilesCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("list_files_cache should have no arguments");
}
Expand Down
21 changes: 11 additions & 10 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ cargo run --example dataframe -- dataframe

#### Category: Single Process

| Subcommand | File Path | Description |
| ---------- | ------------------------------------------------------- | ----------------------------------------------- |
| adv_udaf | [`udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs) | Advanced User Defined Aggregate Function (UDAF) |
| adv_udf | [`udf/advanced_udf.rs`](examples/udf/advanced_udf.rs) | Advanced User Defined Scalar Function (UDF) |
| adv_udwf | [`udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs) | Advanced User Defined Window Function (UDWF) |
| async_udf | [`udf/async_udf.rs`](examples/udf/async_udf.rs) | Asynchronous User Defined Scalar Function |
| udaf | [`udf/simple_udaf.rs`](examples/udf/simple_udaf.rs) | Simple UDAF example |
| udf | [`udf/simple_udf.rs`](examples/udf/simple_udf.rs) | Simple UDF example |
| udtf | [`udf/simple_udtf.rs`](examples/udf/simple_udtf.rs) | Simple UDTF example |
| udwf | [`udf/simple_udwf.rs`](examples/udf/simple_udwf.rs) | Simple UDWF example |
| Subcommand | File Path | Description |
| --------------- | ----------------------------------------------------------- | ----------------------------------------------- |
| adv_udaf | [`udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs) | Advanced User Defined Aggregate Function (UDAF) |
| adv_udf | [`udf/advanced_udf.rs`](examples/udf/advanced_udf.rs) | Advanced User Defined Scalar Function (UDF) |
| adv_udwf | [`udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs) | Advanced User Defined Window Function (UDWF) |
| async_udf | [`udf/async_udf.rs`](examples/udf/async_udf.rs) | Asynchronous User Defined Scalar Function |
| udaf | [`udf/simple_udaf.rs`](examples/udf/simple_udaf.rs) | Simple UDAF example |
| udf | [`udf/simple_udf.rs`](examples/udf/simple_udf.rs) | Simple UDF example |
| udtf | [`udf/simple_udtf.rs`](examples/udf/simple_udtf.rs) | Simple UDTF example |
| udwf | [`udf/simple_udwf.rs`](examples/udf/simple_udwf.rs) | Simple UDWF example |
| table_list_udtf | [`udf/table_list_udtf.rs`](examples/udf/table_list_udtf.rs) | Session-aware UDTF table list example |
8 changes: 7 additions & 1 deletion datafusion-examples/examples/udf/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//!
//! ## Usage
//! ```bash
//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf]
//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf|table_list_udtf]
//! ```
//!
//! Each subcommand runs a corresponding example:
Expand Down Expand Up @@ -50,6 +50,9 @@
//!
//! - `udwf`
//! (file: simple_udwf.rs, desc: Simple UDWF example)
//!
//! - `table_list_udtf`
//! (file: table_list_udtf.rs, desc: Session-aware UDTF table list example)

mod advanced_udaf;
mod advanced_udf;
Expand All @@ -59,6 +62,7 @@ mod simple_udaf;
mod simple_udf;
mod simple_udtf;
mod simple_udwf;
mod table_list_udtf;

use datafusion::error::{DataFusionError, Result};
use strum::{IntoEnumIterator, VariantNames};
Expand All @@ -76,6 +80,7 @@ enum ExampleKind {
Udaf,
Udwf,
Udtf,
TableListUdtf,
}

impl ExampleKind {
Expand All @@ -101,6 +106,7 @@ impl ExampleKind {
ExampleKind::Udf => simple_udf::simple_udf().await?,
ExampleKind::Udtf => simple_udtf::simple_udtf().await?,
ExampleKind::Udwf => simple_udwf::simple_udwf().await?,
ExampleKind::TableListUdtf => table_list_udtf::table_list_udtf().await?,
}

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::csv::reader::Format;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::catalog::{Session, TableFunctionArgs, TableFunctionImpl};
use datafusion::common::{ScalarValue, plan_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
Expand Down Expand Up @@ -135,7 +135,8 @@ impl TableProvider for LocalCsvTable {
struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let Some(Expr::Literal(ScalarValue::Utf8(Some(path)), _)) = exprs.first() else {
return plan_err!("read_csv requires at least one string argument");
};
Expand Down
128 changes: 128 additions & 0 deletions datafusion-examples/examples/udf/table_list_udtf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! See `main.rs` for how to run it.
use std::sync::{Arc, LazyLock};

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{
catalog::{MemTable, TableFunctionArgs, TableFunctionImpl, TableProvider},
common::Result,
execution::SessionState,
prelude::SessionContext,
};
use datafusion_common::{DataFusionError, plan_err};
use tokio::{runtime::Handle, task::block_in_place};

const FUNCTION_NAME: &str = "table_list";

// The example shows, how to create UDTF that depends on the session state.
// Defines a `table_list` UDTF that returns a list of tables within the provided session.

pub async fn table_list_udtf() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_udtf(FUNCTION_NAME, Arc::new(TableListUdtf));

// Register different kinds of tables.
ctx.sql("create view v as select 1")
.await?
.collect()
.await?;
ctx.sql("create table t(a int)").await?.collect().await?;

// Print results.
ctx.sql("select * from table_list()").await?.show().await?;

Ok(())
}

#[derive(Debug, Default)]
struct TableListUdtf;

static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
SchemaRef::new(Schema::new(vec![
Field::new("catalog", DataType::Utf8, false),
Field::new("schema", DataType::Utf8, false),
Field::new("table", DataType::Utf8, false),
Field::new("type", DataType::Utf8, false),
]))
});

impl TableFunctionImpl for TableListUdtf {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
if !args.args.is_empty() {
return plan_err!(
"{}: unexpected number of arguments: {}, expected: 0",
FUNCTION_NAME,
args.args.len()
);
}
let state = args
.session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| {
DataFusionError::Internal("failed to downcast state".into())
})?;

let mut catalogs = StringBuilder::new();
let mut schemas = StringBuilder::new();
let mut tables = StringBuilder::new();
let mut types = StringBuilder::new();

let catalog_list = state.catalog_list();
for catalog_name in catalog_list.catalog_names() {
let Some(catalog) = catalog_list.catalog(&catalog_name) else {
continue;
};
for schema_name in catalog.schema_names() {
let Some(schema) = catalog.schema(&schema_name) else {
continue;
};
for table_name in schema.table_names() {
let Some(provider) = block_in_place(|| {
Handle::current().block_on(schema.table(&table_name))
})?
else {
continue;
};
catalogs.append_value(catalog_name.clone());
schemas.append_value(schema_name.clone());
tables.append_value(table_name.clone());
types.append_value(provider.table_type().to_string())
}
}
}

let batch = RecordBatch::try_new(
Arc::clone(&SCHEMA),
vec![
Arc::new(catalogs.finish()),
Arc::new(schemas.finish()),
Arc::new(tables.finish()),
Arc::new(types.finish()),
],
)?;

Ok(Arc::new(MemTable::try_new(
batch.schema(),
vec![vec![batch]],
)?))
}
}
37 changes: 35 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;
use crate::session::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{Constraints, Statistics, not_impl_err};
use datafusion_common::{Result, internal_err};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
Expand Down Expand Up @@ -485,10 +485,30 @@ pub trait TableProviderFactory: Debug + Sync + Send {
) -> Result<Arc<dyn TableProvider>>;
}

/// Describes arguments provided to the table function call.
pub struct TableFunctionArgs<'a> {
/// Call arguments.
pub args: &'a [Expr],
/// Session within which the function is called.
pub session: &'a dyn Session,
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
#[deprecated(
since = "53.0.0",
note = "Implement `TableFunctionImpl::call_with_args` instead"
)]
fn call(&self, _args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
internal_err!("unimplemented")
}

/// Create a table provider
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
#[expect(deprecated)]
self.call(args.args)
}
}

/// A table that uses a function to generate data
Expand Down Expand Up @@ -517,7 +537,20 @@ impl TableFunction {
}

/// Get the function implementation and generate a table
#[deprecated(
since = "53.0.0",
note = "Use `TableFunction::create_table_provider_with_args` instead"
)]
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
#[expect(deprecated)]
self.fun.call(args)
}

/// Get the function implementation and generate a table
pub fn create_table_provider_with_args(
&self,
args: TableFunctionArgs,
) -> Result<Arc<dyn TableProvider>> {
self.fun.call_with_args(args)
}
}
7 changes: 6 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,8 @@ impl ContextProvider for SessionContextProvider<'_> {
name: &str,
args: Vec<Expr>,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
use datafusion_catalog::TableFunctionArgs;

let tbl_func = self
.state
.table_functions
Expand All @@ -1850,7 +1852,10 @@ impl ContextProvider for SessionContextProvider<'_> {
.and_then(|e| simplifier.simplify(e))
})
.collect::<datafusion_common::Result<Vec<_>>>()?;
let provider = tbl_func.create_table_provider(&args)?;
let provider = tbl_func.create_table_provider_with_args(TableFunctionArgs {
args: &args,
session: self.state,
})?;

Ok(provider_as_source(provider))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{ExecutionPlan, collect};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
use datafusion_catalog::TableFunctionImpl;
use datafusion_catalog::{Session, TableFunctionArgs};
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};

Expand Down Expand Up @@ -200,7 +200,8 @@ impl SimpleCsvTable {
struct SimpleCsvTableFunc {}

impl TableFunctionImpl for SimpleCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let mut new_exprs = vec![];
let mut filepath = String::new();
for expr in exprs {
Expand Down Expand Up @@ -231,7 +232,7 @@ async fn test_udtf_type_coercion() -> Result<()> {
struct NoOpTableFunc;

impl TableFunctionImpl for NoOpTableFunc {
fn call(&self, _: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, _: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let schema = Arc::new(arrow::datatypes::Schema::empty());
Ok(Arc::new(MemTable::try_new(schema, vec![vec![]])?))
}
Expand Down
Loading