Skip to content

Commit 0cfcbb4

Browse files
timsaucerclaude
andcommitted
Add missing SessionContext utility methods
Expose upstream DataFusion v53 utility methods: session_start_time, enable_ident_normalization, parse_sql_expr, execute_logical_plan, refresh_catalogs, remove_optimizer_rule, and table_provider. The add_optimizer_rule and add_analyzer_rule methods are omitted as the OptimizerRule and AnalyzerRule traits are not yet exposed to Python. Closes #1459. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2499409 commit 0cfcbb4

File tree

3 files changed

+147
-2
lines changed

3 files changed

+147
-2
lines changed

crates/core/src/context.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
2828
use datafusion::arrow::pyarrow::PyArrowType;
2929
use datafusion::arrow::record_batch::RecordBatch;
3030
use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
31-
use datafusion::common::{ScalarValue, TableReference, exec_err};
31+
use datafusion::common::{DFSchema, ScalarValue, TableReference, exec_err};
3232
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
3333
use datafusion::datasource::file_format::parquet::ParquetFormat;
3434
use datafusion::datasource::listing::{
@@ -70,11 +70,13 @@ use crate::catalog::{
7070
PyCatalog, PyCatalogList, RustWrappedPyCatalogProvider, RustWrappedPyCatalogProviderList,
7171
};
7272
use crate::common::data_type::PyScalarValue;
73+
use crate::common::df_schema::PyDFSchema;
7374
use crate::dataframe::PyDataFrame;
7475
use crate::dataset::Dataset;
7576
use crate::errors::{
7677
PyDataFusionError, PyDataFusionResult, from_datafusion_error, py_datafusion_err,
7778
};
79+
use crate::expr::PyExpr;
7880
use crate::expr::sort_expr::PySortExpr;
7981
use crate::options::PyCsvReadOptions;
8082
use crate::physical_plan::PyExecutionPlan;
@@ -1050,6 +1052,45 @@ impl PySessionContext {
10501052
self.ctx.session_id()
10511053
}
10521054

1055+
pub fn session_start_time(&self) -> String {
1056+
self.ctx.session_start_time().to_rfc3339()
1057+
}
1058+
1059+
pub fn enable_ident_normalization(&self) -> bool {
1060+
self.ctx.enable_ident_normalization()
1061+
}
1062+
1063+
pub fn parse_sql_expr(&self, sql: &str, schema: PyDFSchema) -> PyDataFusionResult<PyExpr> {
1064+
let df_schema: DFSchema = schema.into();
1065+
Ok(self.ctx.parse_sql_expr(sql, &df_schema)?.into())
1066+
}
1067+
1068+
pub fn execute_logical_plan(
1069+
&self,
1070+
plan: PyLogicalPlan,
1071+
py: Python,
1072+
) -> PyDataFusionResult<PyDataFrame> {
1073+
let df = wait_for_future(
1074+
py,
1075+
self.ctx.execute_logical_plan(plan.plan.as_ref().clone()),
1076+
)??;
1077+
Ok(PyDataFrame::new(df))
1078+
}
1079+
1080+
pub fn refresh_catalogs(&self, py: Python) -> PyDataFusionResult<()> {
1081+
wait_for_future(py, self.ctx.refresh_catalogs())??;
1082+
Ok(())
1083+
}
1084+
1085+
pub fn remove_optimizer_rule(&self, name: &str) -> bool {
1086+
self.ctx.remove_optimizer_rule(name)
1087+
}
1088+
1089+
pub fn table_provider(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
1090+
let provider = wait_for_future(py, self.ctx.table_provider(name))??;
1091+
Ok(PyTable { table: provider })
1092+
}
1093+
10531094
#[allow(clippy::too_many_arguments)]
10541095
#[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
10551096
pub fn read_json(

python/datafusion/context.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
import polars as pl # type: ignore[import]
6464

6565
from datafusion.catalog import CatalogProvider, Table
66-
from datafusion.expr import SortKey
66+
from datafusion.common import DFSchema
67+
from datafusion.expr import Expr, SortKey
6768
from datafusion.plan import ExecutionPlan, LogicalPlan
6869
from datafusion.user_defined import (
6970
AggregateUDF,
@@ -1141,6 +1142,67 @@ def session_id(self) -> str:
11411142
"""Return an id that uniquely identifies this :py:class:`SessionContext`."""
11421143
return self.ctx.session_id()
11431144

1145+
def session_start_time(self) -> str:
1146+
"""Return the session start time as an RFC 3339 formatted string."""
1147+
return self.ctx.session_start_time()
1148+
1149+
def enable_ident_normalization(self) -> bool:
1150+
"""Return whether identifier normalization (lowercasing) is enabled."""
1151+
return self.ctx.enable_ident_normalization()
1152+
1153+
def parse_sql_expr(self, sql: str, schema: DFSchema) -> Expr:
1154+
"""Parse a SQL expression string into a logical expression.
1155+
1156+
Args:
1157+
sql: SQL expression string.
1158+
schema: Schema to use for resolving column references.
1159+
1160+
Returns:
1161+
Parsed expression.
1162+
"""
1163+
from datafusion.expr import Expr # noqa: PLC0415
1164+
1165+
return Expr(self.ctx.parse_sql_expr(sql, schema))
1166+
1167+
def execute_logical_plan(self, plan: LogicalPlan) -> DataFrame:
1168+
"""Execute a :py:class:`~datafusion.plan.LogicalPlan` and return a DataFrame.
1169+
1170+
Args:
1171+
plan: Logical plan to execute.
1172+
1173+
Returns:
1174+
DataFrame resulting from the execution.
1175+
"""
1176+
return DataFrame(self.ctx.execute_logical_plan(plan._raw_plan))
1177+
1178+
def refresh_catalogs(self) -> None:
1179+
"""Refresh catalog metadata."""
1180+
self.ctx.refresh_catalogs()
1181+
1182+
def remove_optimizer_rule(self, name: str) -> bool:
1183+
"""Remove an optimizer rule by name.
1184+
1185+
Args:
1186+
name: Name of the optimizer rule to remove.
1187+
1188+
Returns:
1189+
True if a rule with the given name was found and removed.
1190+
"""
1191+
return self.ctx.remove_optimizer_rule(name)
1192+
1193+
def table_provider(self, name: str) -> Table:
1194+
"""Return the :py:class:`~datafusion.catalog.Table` for the given table name.
1195+
1196+
Args:
1197+
name: Name of the table.
1198+
1199+
Returns:
1200+
The table provider.
1201+
"""
1202+
from datafusion.catalog import Table # noqa: PLC0415
1203+
1204+
return Table(self.ctx.table_provider(name))
1205+
11441206
def read_json(
11451207
self,
11461208
path: str | pathlib.Path,

python/tests/test_context.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,48 @@ def test_table_not_found(ctx):
551551
ctx.table(f"not-found-{uuid4()}")
552552

553553

554+
def test_session_start_time(ctx):
555+
st = ctx.session_start_time()
556+
assert isinstance(st, str)
557+
assert "T" in st # RFC 3339 format
558+
559+
560+
def test_enable_ident_normalization(ctx):
561+
result = ctx.enable_ident_normalization()
562+
assert isinstance(result, bool)
563+
564+
565+
def test_parse_sql_expr(ctx):
566+
from datafusion.common import DFSchema
567+
568+
schema = DFSchema.empty()
569+
expr = ctx.parse_sql_expr("1 + 2", schema)
570+
assert "Int64(1) + Int64(2)" in str(expr)
571+
572+
573+
def test_execute_logical_plan(ctx):
574+
df = ctx.from_pydict({"a": [1, 2, 3]})
575+
plan = df.logical_plan()
576+
df2 = ctx.execute_logical_plan(plan)
577+
result = df2.collect()
578+
assert result[0].column(0) == pa.array([1, 2, 3])
579+
580+
581+
def test_refresh_catalogs(ctx):
582+
ctx.refresh_catalogs()
583+
584+
585+
def test_remove_optimizer_rule(ctx):
586+
assert ctx.remove_optimizer_rule("nonexistent_rule") is False
587+
588+
589+
def test_table_provider(ctx):
590+
batch = pa.RecordBatch.from_pydict({"x": [10, 20, 30]})
591+
ctx.register_record_batches("provider_test", [[batch]])
592+
tbl = ctx.table_provider("provider_test")
593+
assert tbl.schema == pa.schema([("x", pa.int64())])
594+
595+
554596
def test_read_json(ctx):
555597
path = pathlib.Path(__file__).parent.resolve()
556598

0 commit comments

Comments
 (0)