Skip to content

Commit 51fc2bc

Browse files
timsaucerclaude
andcommitted
feat: SessionContext.read_batches / read_batch
Wrap upstream `SessionContext::read_batches`, which materializes a DataFrame directly from a sequence of `RecordBatch`es without registering a named table. The single-batch convenience `SessionContext.read_batch` is implemented in pure Python by calling `read_batches([batch])`, so the Rust side only needs the one binding. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 96c3a14 commit 51fc2bc

3 files changed

Lines changed: 61 additions & 0 deletions

File tree

crates/core/src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,13 @@ impl PySessionContext {
847847
Ok(())
848848
}
849849

850+
pub fn read_batches(
851+
&self,
852+
batches: PyArrowType<Vec<RecordBatch>>,
853+
) -> PyDataFusionResult<PyDataFrame> {
854+
Ok(PyDataFrame::new(self.ctx.read_batches(batches.0)?))
855+
}
856+
850857
#[allow(clippy::too_many_arguments)]
851858
#[pyo3(signature = (name, path, table_partition_cols=vec![],
852859
parquet_pruning=true,

python/datafusion/context.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,45 @@ def register_record_batches(
962962
"""
963963
self.ctx.register_record_batches(name, partitions)
964964

965+
def read_batch(self, batch: pa.RecordBatch) -> DataFrame:
966+
"""Return a :py:class:`~datafusion.DataFrame` reading a single batch.
967+
968+
Convenience wrapper around :py:meth:`read_batches` for the single-batch
969+
case. Unlike :py:meth:`register_batch`, this does not register the
970+
batch as a named table; it returns an anonymous
971+
:py:class:`~datafusion.DataFrame` directly.
972+
973+
Args:
974+
batch: Record batch to wrap as a DataFrame.
975+
976+
Examples:
977+
>>> ctx = dfn.SessionContext()
978+
>>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]})
979+
>>> ctx.read_batch(batch).to_pydict()
980+
{'a': [1, 2, 3]}
981+
"""
982+
return self.read_batches([batch])
983+
984+
def read_batches(self, batches: list[pa.RecordBatch]) -> DataFrame:
985+
"""Return a :py:class:`~datafusion.DataFrame` reading the given batches.
986+
987+
All batches must share the same schema. Unlike
988+
:py:meth:`register_record_batches`, this does not register the batches
989+
as a named table; it returns an anonymous
990+
:py:class:`~datafusion.DataFrame` directly.
991+
992+
Args:
993+
batches: Record batches to wrap as a DataFrame.
994+
995+
Examples:
996+
>>> ctx = dfn.SessionContext()
997+
>>> b1 = pa.RecordBatch.from_pydict({"a": [1, 2]})
998+
>>> b2 = pa.RecordBatch.from_pydict({"a": [3, 4]})
999+
>>> ctx.read_batches([b1, b2]).to_pydict()
1000+
{'a': [1, 2, 3, 4]}
1001+
"""
1002+
return DataFrame(self.ctx.read_batches(batches))
1003+
9651004
def register_parquet(
9661005
self,
9671006
name: str,

python/tests/test_context.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,21 @@ def test_register_batch_empty(ctx):
905905
assert result[0].num_rows == 0
906906

907907

908+
def test_read_batch_returns_dataframe(ctx):
909+
batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
910+
df = ctx.read_batch(batch)
911+
assert df.to_pydict() == {"a": [1, 2, 3], "b": [4, 5, 6]}
912+
# read_batch should not register a named table.
913+
assert ctx.catalog().schema().names() == set()
914+
915+
916+
def test_read_batches_concatenates(ctx):
917+
b1 = pa.RecordBatch.from_pydict({"a": [1, 2]})
918+
b2 = pa.RecordBatch.from_pydict({"a": [3, 4]})
919+
df = ctx.read_batches([b1, b2])
920+
assert df.to_pydict() == {"a": [1, 2, 3, 4]}
921+
922+
908923
def test_create_sql_options():
909924
SQLOptions()
910925

0 commit comments

Comments
 (0)