Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions python/sedonadb/python/sedonadb/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def read_parquet(
options: Optional[Dict[str, Any]] = None,
geometry_columns: Optional[Union[str, Dict[str, Any]]] = None,
validate: bool = False,
partitioning: Optional[List[str]] = None,
) -> DataFrame:
"""Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more Parquet files

Expand Down Expand Up @@ -252,6 +253,12 @@ def read_parquet(

Currently the only property that is validated is the WKB of input geometry
columns.
partitioning:
Optional list of column names for hive-style partitioning. When reading
from a directory with paths like `/col=value/file.parquet`, partition
column names are auto-discovered by default (`None`). Explicitly specify
column names (e.g., `["col"]`) to override auto-discovery, or pass an
empty list `[]` to disable partitioning entirely.


Examples:
Expand All @@ -273,7 +280,11 @@ def read_parquet(
return DataFrame(
self._impl,
self._impl.read_parquet(
[str(path) for path in table_paths], options, geometry_columns, validate
[str(path) for path in table_paths],
options,
geometry_columns,
validate,
partitioning,
),
self.options,
)
Expand All @@ -283,6 +294,7 @@ def read_pyogrio(
table_paths: Union[str, Path, Iterable[str]],
options: Optional[Dict[str, Any]] = None,
extension: str = "",
partitioning: Optional[List[str]] = None,
) -> DataFrame:
"""Read spatial file formats using GDAL/OGR via pyogrio

Expand Down Expand Up @@ -312,6 +324,12 @@ def read_pyogrio(
extension: An optional file extension (e.g., `"fgb"`) used when
`table_paths` specifies one or more directories or a glob
that does not enforce a file extension.
partitioning:
Optional list of column names for hive-style partitioning. When reading
from a directory with paths like `/col=value/file.fgb`, partition
column names are auto-discovered by default (`None`). Explicitly specify
column names (e.g., `["col"]`) to override auto-discovery, or pass an
empty list `[]` to disable partitioning entirely.

Examples:

Expand Down Expand Up @@ -346,7 +364,7 @@ def read_pyogrio(
return DataFrame(
self._impl,
self._impl.read_external_format(
spec, [str(path) for path in table_paths], False
spec, [str(path) for path in table_paths], False, partitioning
),
self.options,
)
Expand All @@ -356,6 +374,7 @@ def read_format(
spec: "ExternalFormatSpec",
table_paths: Union[str, Path, Iterable[str]],
check_extension: bool = False,
partitioning: Optional[List[str]] = None,
) -> DataFrame:
"""Read one or more paths using a Python-defined `ExternalFormatSpec`.

Expand All @@ -375,6 +394,12 @@ def read_format(
table_paths: A str, Path, or iterable of paths/URLs.
check_extension: When `True`, error if a non-collection path
doesn't end in the spec's `extension`. Defaults to `False`.
partitioning:
Optional list of column names for hive-style partitioning. When reading
from a directory with paths like `/col=value/file.ext`, partition
column names are auto-discovered by default (`None`). Explicitly specify
column names (e.g., `["col"]`) to override auto-discovery, or pass an
empty list `[]` to disable partitioning entirely.

Examples:
>>> import sedonadb_zarr # doctest: +SKIP
Expand All @@ -390,7 +415,7 @@ def read_format(
return DataFrame(
self._impl,
self._impl.read_external_format(
spec, [str(path) for path in table_paths], check_extension
spec, [str(path) for path in table_paths], check_extension, partitioning
),
self.options,
)
Expand Down
14 changes: 12 additions & 2 deletions python/sedonadb/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl InternalContext {
options: HashMap<String, PyObject>,
geometry_columns: Option<String>,
validate: bool,
partitioning: Option<Vec<String>>,
) -> Result<InternalDataFrame, PySedonaError> {
// Convert Python options to strings, filtering out None values
let rust_options: HashMap<String, String> = options
Expand Down Expand Up @@ -115,6 +116,9 @@ impl InternalContext {
})?;
}
geo_options = geo_options.with_validate(validate);
if let Some(partitioning) = partitioning {
geo_options = geo_options.with_table_partition_cols(partitioning);
}

let df = wait_for_future(
py,
Expand All @@ -130,15 +134,21 @@ impl InternalContext {
format_spec: Bound<PyAny>,
table_paths: Vec<String>,
check_extension: bool,
partitioning: Option<Vec<String>>,
) -> Result<InternalDataFrame, PySedonaError> {
let spec = format_spec
.call_method0("__sedona_external_format__")?
.extract::<PyExternalFormat>()?;
let df = wait_for_future(
py,
&self.runtime,
self.inner
.read_external_format(Arc::new(spec), table_paths, None, check_extension),
self.inner.read_external_format(
Arc::new(spec),
table_paths,
None,
check_extension,
partitioning,
),
)??;

Ok(InternalDataFrame::new(df, self.runtime.clone()))
Expand Down
38 changes: 38 additions & 0 deletions python/sedonadb/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,3 +755,41 @@ def test_prune_geography_parquet():
assert matched < total, (
f"Expected pruning to reduce row groups: {matched} matched out of {total}"
)


def test_write_partitioned_parquet(con):
t = con.funcs.table.sd_random_geometry(seed=3847)
t = t.select(
id=t.id,
geom=con.funcs.st_setsrid(t.geometry, 3857),
grp=con.funcs.floor(t.id / 100).cast(pa.string()),
)

with tempfile.TemporaryDirectory() as td:
out_dir = Path(td) / "out_dir"

t.to_parquet(out_dir, partition_by="grp")

# Test auto-discovery of partition columns (partitioning=None)
result = con.read_parquet(out_dir)
assert result.columns == t.columns
geopandas.testing.assert_geodataframe_equal(
result.sort("id").to_pandas(),
t.sort("id").to_pandas(),
)

# The default is to discover the partitioning
result_explicit = con.read_parquet(out_dir, partitioning=["grp"])
assert result_explicit.columns == t.columns
geopandas.testing.assert_geodataframe_equal(
result_explicit.sort("id").to_pandas(),
t.sort("id").to_pandas(),
)

# partitioning=[] disables auto-discovery
result_disabled = con.read_parquet(out_dir, partitioning=[])
assert result_disabled.columns == ["id", "geom"]
geopandas.testing.assert_geodataframe_equal(
result_disabled.sort("id").to_pandas(),
t.sort("id").to_pandas()[["id", "geom"]],
)
62 changes: 60 additions & 2 deletions python/sedonadb/tests/io/test_pyogrio.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def test_read_ogr_multi_file(con):
con.read_parquet(parquet_path).to_pandas().to_file(fgb_path)

# Reading a directory while specifying the extension should work
con.read_pyogrio(f"{td}", extension="fgb").to_view(
# Disable partitioning since this test focuses on multi-file reading
con.read_pyogrio(f"{td}", extension="fgb", partitioning=[]).to_view(
"gdf_from_dir", overwrite=True
)
geopandas.testing.assert_geodataframe_equal(
Expand All @@ -100,7 +101,9 @@ def test_read_ogr_multi_file(con):
)

# Reading using a glob without specifying the extension should work
con.read_pyogrio(f"{td}/**/*.fgb").to_view("gdf_from_glob", overwrite=True)
con.read_pyogrio(f"{td}/**/*.fgb", partitioning=[]).to_view(
"gdf_from_glob", overwrite=True
)
geopandas.testing.assert_geodataframe_equal(
con.sql("SELECT * FROM gdf_from_glob ORDER BY idx").to_pandas(),
gdf.filter(["idx", "wkb_geometry"]),
Expand Down Expand Up @@ -290,3 +293,58 @@ def test_write_ogr_from_view_types(con):
con.create_data_frame(tab).to_pyogrio(f"{td}/foofy.fgb")
tab_roundtrip = con.read_pyogrio(f"{td}/foofy.fgb").to_arrow_table()
assert tab_roundtrip.sort_by("string_col") == tab_simple


def test_read_ogr_partitioned(con):
n = 100
series = geopandas.GeoSeries.from_xy(
list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
)
gdf = geopandas.GeoDataFrame(
{
"idx": list(range(n)),
"grp": [str(i // 10) for i in range(n)],
"wkb_geometry": series,
}
)
gdf = gdf.set_geometry(gdf["wkb_geometry"])

with tempfile.TemporaryDirectory() as td:
# Write partitioned FGB files using hive-style directories because
# write_pyogrio doesn't support writing partitions yet
for grp_val in gdf["grp"].unique():
grp_dir = Path(td) / f"grp={grp_val}"
grp_dir.mkdir()
subset = gdf[gdf["grp"] == grp_val].drop(columns=["grp"])
subset.to_file(grp_dir / "data.fgb")

# Test auto-discovery of partition columns (partitioning=None)
con.read_pyogrio(td, extension="fgb").to_view(
"partitioned_auto", overwrite=True
)
geopandas.testing.assert_geodataframe_equal(
con.sql(
"SELECT idx, grp, wkb_geometry FROM partitioned_auto ORDER BY idx"
).to_pandas(),
gdf,
)

# Test explicit partitioning specification
con.read_pyogrio(td, extension="fgb", partitioning=["grp"]).to_view(
"partitioned_explicit", overwrite=True
)
geopandas.testing.assert_geodataframe_equal(
con.sql(
"SELECT idx, grp, wkb_geometry FROM partitioned_explicit ORDER BY idx"
).to_pandas(),
gdf,
)

# Test partitioning=[] disables auto-discovery
con.read_pyogrio(td, extension="fgb", partitioning=[]).to_view(
"partitioned_disabled", overwrite=True
)
geopandas.testing.assert_geodataframe_equal(
con.sql("SELECT * FROM partitioned_disabled ORDER BY idx").to_pandas(),
gdf.filter(["idx", "wkb_geometry"]),
)
12 changes: 9 additions & 3 deletions rust/sedona-datasource/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ mod test {
.map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
.collect(),
true,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -651,6 +652,7 @@ mod test {
.map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
.collect(),
true,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -686,7 +688,7 @@ mod test {
let (temp_dir, mut files) = create_echo_spec_temp_dir();

// Listing table with no files should error
let err = external_table(spec.clone(), &ctx, vec![], true)
let err = external_table(spec.clone(), &ctx, vec![], true, None)
.await
.unwrap_err();
assert_eq!(err.message(), "No table paths were provided");
Expand All @@ -708,6 +710,7 @@ mod test {
.map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
.collect(),
true,
None,
)
.await
.unwrap_err();
Expand All @@ -725,6 +728,7 @@ mod test {
.map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
.collect(),
false,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -810,7 +814,9 @@ mod test {

let ctx = SessionContext::new();
let url = ListingTableUrl::parse(dir_path.to_string_lossy()).unwrap();
let provider = external_table(spec, &ctx, vec![url], false).await.unwrap();
let provider = external_table(spec, &ctx, vec![url], false, None)
.await
.unwrap();

let batches = ctx.read_table(provider).unwrap().collect().await.unwrap();

Expand All @@ -837,7 +843,7 @@ mod test {
// doesn't try to span.
let url_a = ListingTableUrl::parse("file:///tmp/a.dirfmt").unwrap();
let url_b = ListingTableUrl::parse("https://example.com/b.dirfmt").unwrap();
let err = external_table(spec, &ctx, vec![url_a, url_b], false)
let err = external_table(spec, &ctx, vec![url_a, url_b], false, None)
.await
.unwrap_err();
assert!(
Expand Down
Loading
Loading