Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions python/sedonadb/python/sedonadb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,144 @@ def group_by(self, *keys: Union[str, Expr]) -> "GroupedDataFrame":

return GroupedDataFrame(self, coerced)

def join(
self,
other: "DataFrame",
on: Union[str, List[str]],
how: str = "inner",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a typing hint here (Literal["inner", "left", "right", etc])? This gives a dropdown when typing this in a notebook.

) -> "DataFrame":
"""Equi-join two DataFrames on one or more common-named columns.

Both sides must share the same column name(s) for the join keys.
For joins where the keys differ between sides, or for arbitrary
predicate joins (including spatial joins like `st_intersects`),
an expression-based `on=` form is coming in a follow-up PR.

Args:
other: The right-hand DataFrame to join against.
on: A single column name (`str`) or a list of column names
shared between both DataFrames.
how: Join type. One of `"inner"` (default), `"left"`,
`"right"`, `"outer"`, `"left_semi"`, `"left_anti"`,
`"right_semi"`, `"right_anti"`.

Examples:

>>> sd = sedona.db.connect()
>>> left = sd.sql(
... "SELECT * FROM (VALUES (1, 'a'), (2, 'b')) AS t(k, v)"
... )
>>> right = sd.sql(
... "SELECT * FROM (VALUES (1, 'x'), (2, 'y'), (3, 'z')) AS t(k, w)"
... )
>>> left.join(right, on="k").sort("k").show()
┌───────┬──────┬──────┐
│ k ┆ v ┆ w │
│ int64 ┆ utf8 ┆ utf8 │
╞═══════╪══════╪══════╡
│ 1 ┆ a ┆ x │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2 ┆ b ┆ y │
└───────┴──────┴──────┘
"""
if not isinstance(other, DataFrame):
raise TypeError(
f"join() expects a DataFrame as the first argument, "
f"got {type(other).__name__}"
)

if isinstance(on, str):
keys: List[str] = [on]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLMs love to add these type hints but I would love to avoid them (like LLM-generated comments, they can very easily become type hints that are lying)

elif isinstance(on, list):
keys = on
else:
raise TypeError(
f"join() `on` expects str or list of str, got {type(on).__name__}"
)

if not keys:
raise ValueError("join() requires at least one column name in `on`")

for k in keys:
if not isinstance(k, str):
raise TypeError(
f"join() `on` list must contain only str, got {type(k).__name__}"
)

valid_how = {
"inner",
"left",
"right",
"outer",
"left_semi",
"left_anti",
"right_semi",
"right_anti",
}
Comment on lines +707 to +716
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind whether we do this or not, but pyspark seems to allow "semi" and "anti" as aliases for "left_semi" and "left_anti", and "full" as an alias for "outer".

if how not in valid_how:
raise ValueError(
f"join() `how` must be one of {sorted(valid_how)}, got {how!r}"
)

# DataFusion's DataFrame join (a) rejects two unaliased inputs that
# share column names because both default to the `?table?` qualifier,
# and (b) keeps both copies of the join key in the result. Pandas /
# Polars / Spark users expect one copy. To get that shape:
# 1. Alias both sides internally with sentinel qualifiers so the
# merged schema has no qualified-name collisions.
# 2. Run the join.
# 3. Project with fully qualified refs: left's full column list
# plus right's non-key columns. The qualified col() projection
# strips the qualifier from the output names, giving the
# pandas-shaped result.
# Semi/anti joins are special: the output is only one side's columns.
LEFT_ALIAS = "_sd_join_left_"
RIGHT_ALIAS = "_sd_join_right_"
left_cols = self._impl.columns()
right_cols = other._impl.columns()
left_aliased = self.alias(LEFT_ALIAS)
right_aliased = other.alias(RIGHT_ALIAS)

joined_impl = left_aliased._impl.join(right_aliased._impl, keys, how)

if how in ("left_semi", "left_anti"):
projection = [_col(c, LEFT_ALIAS)._impl for c in left_cols]
elif how in ("right_semi", "right_anti"):
projection = [_col(c, RIGHT_ALIAS)._impl for c in right_cols]
Comment on lines +745 to +746
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_join_right_semi and test_join_right_anti in ef61ea6, parallel to the existing left-semi/anti cases.

else:
# For the unified join-key column, pick the side that's always
# populated for that join type. Right joins take the right's
# key (left may be NULL for unmatched rows); outer joins COALESCE
# the two so unmatched rows on either side still get a key
# value.
key_set = set(keys)
projection = []
for c in left_cols:
if c in key_set and how == "right":
projection.append(_col(c, RIGHT_ALIAS)._impl)
Comment on lines +756 to +757
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a getter for getting the qualified column...you can just do right_aliased[c] (below can be simplified too).

elif c in key_set and how == "outer":
# Outer join: rows unmatched on either side have NULL on
# that side's key; COALESCE picks the populated one.
# Note: `self._ctx` here is the internal context handle
# (`_lib.InternalContext`), not the user-facing
# `SedonaContext` Python class — so `.scalar_udf(...)`
# resolves against the engine's function registry, the
# same path `con.funcs.<name>` uses underneath.
from sedonadb.expr.expression import ScalarUdf

coalesce_udf = ScalarUdf(self._ctx.scalar_udf("coalesce"))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._ctx in this DataFrame class is actually the internal context handle (_lib.InternalContext), not the user-facing SedonaContext Python class — and InternalContext exposes scalar_udf(name) (added in #885). The outer-join test test_join_outer covers this code path and passes. Added a clarifying comment at the call site in ef61ea6 since the naming is genuinely confusing here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth I updated this in #901 (it was confusing there, too)

coalesced = coalesce_udf(
_col(c, LEFT_ALIAS), _col(c, RIGHT_ALIAS)
).alias(c)
projection.append(coalesced._impl)
else:
projection.append(_col(c, LEFT_ALIAS)._impl)
for c in right_cols:
if c not in key_set:
projection.append(_col(c, RIGHT_ALIAS)._impl)

return DataFrame(self._ctx, joined_impl.select(projection), self._options)

def limit(self, n: Optional[int], /, *, offset: int = 0) -> "DataFrame":
"""Limit result to n rows starting at offset

Expand Down
45 changes: 44 additions & 1 deletion python/sedonadb/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::logical_expr::SortExpr;
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::{Column, DataFusionError, ParamValues};
use datafusion_execution::TaskContextProvider;
use datafusion_expr::{ExplainFormat, ExplainOption, Expr};
use datafusion_expr::{ExplainFormat, ExplainOption, Expr, JoinType};
use datafusion_ffi::table_provider::FFI_TableProvider;
use futures::lock::Mutex;
use futures::TryStreamExt;
Expand Down Expand Up @@ -257,6 +257,49 @@ impl InternalDataFrame {
Ok(InternalDataFrame::new(inner, self.runtime.clone()))
}

/// Equi-join two DataFrames on one or more common-named columns.
///
/// The Python wrapper handles all input normalization (single-column
/// `on=str` → one-element list; rejects empty input; rejects
/// unknown `how` strings). On the Rust side we just:
///
/// 1. Map the `how` string to a `JoinType` enum value. Unrecognized
/// strings come back from Python — we still match exhaustively
/// here so the binding fails loudly if the Python wrapper ever
/// drifts out of sync.
/// 2. Borrow the column-name vectors as `&[&str]` since DataFusion's
/// `DataFrame::join` takes that shape.
/// 3. Pass `filter=None`; an additional residual predicate is the
/// next sub-PR (expression-based join).
fn join(
&self,
right: &InternalDataFrame,
join_cols: Vec<String>,
how: &str,
) -> Result<InternalDataFrame, PySedonaError> {
Comment on lines +260 to +279
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably worth only exposing the expression endpoint here to reduce the binding surface (you can easily generate the expression with [left_aliased[c] == right_aliased[c] for c in keys]).

Can you avoid any references to future PRs? These PRs stand nicely on their own.

let join_type = match how {
"inner" => JoinType::Inner,
"left" => JoinType::Left,
"right" => JoinType::Right,
"outer" => JoinType::Full,
"left_semi" => JoinType::LeftSemi,
"left_anti" => JoinType::LeftAnti,
"right_semi" => JoinType::RightSemi,
"right_anti" => JoinType::RightAnti,
other => {
return Err(PySedonaError::SedonaPython(format!(
"Unsupported join type '{other}'"
)))
}
};
let borrowed: Vec<&str> = join_cols.iter().map(String::as_str).collect();
let inner =
self.inner
.clone()
.join(right.inner.clone(), join_type, &borrowed, &borrowed, None)?;
Ok(InternalDataFrame::new(inner, self.runtime.clone()))
}

fn execute<'py>(&self, py: Python<'py>) -> Result<usize, PySedonaError> {
let df = self.inner.clone();
let count = wait_for_future(py, &self.runtime, async move {
Expand Down
165 changes: 165 additions & 0 deletions python/sedonadb/tests/expr/test_dataframe_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pandas as pd
import pandas.testing as pdt
import pytest

from sedonadb.dataframe import DataFrame


def test_join_inner_single_key(con):
left = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 2, 4], "w": ["x", "y", "z"]}))
out = left.join(right, on="k").sort("k").to_pandas()
pdt.assert_frame_equal(
out,
pd.DataFrame({"k": [1, 2], "v": ["a", "b"], "w": ["x", "y"]}),
)


def test_join_inner_multiple_keys(con):
left = con.create_data_frame(
pd.DataFrame({"k1": [1, 1, 2], "k2": ["a", "b", "a"], "v": [10, 20, 30]})
)
right = con.create_data_frame(
pd.DataFrame({"k1": [1, 2, 2], "k2": ["a", "a", "b"], "w": [100, 200, 300]})
)
out = left.join(right, on=["k1", "k2"]).sort("k1", "k2").to_pandas()
# Only (1, 'a') and (2, 'a') match on both sides.
pdt.assert_frame_equal(
out,
pd.DataFrame({"k1": [1, 2], "k2": ["a", "a"], "v": [10, 30], "w": [100, 200]}),
)


def test_join_left(con):
left = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 2], "w": ["x", "y"]}))
out = left.join(right, on="k", how="left").sort("k").to_pandas()
pdt.assert_frame_equal(
out,
pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", "c"], "w": ["x", "y", None]}),
)


def test_join_right(con):
left = con.create_data_frame(pd.DataFrame({"k": [1, 2], "v": ["a", "b"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "w": ["x", "y", "z"]}))
out = left.join(right, on="k", how="right").sort("k").to_pandas()
# Column order follows the projection: left columns first (with the
# right-join key sourced from the right side), then right's non-key
# columns. v has NULL for the unmatched row.
pdt.assert_frame_equal(
out,
pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", None], "w": ["x", "y", "z"]}),
)


def test_join_outer(con):
left = con.create_data_frame(pd.DataFrame({"k": [1, 2], "v": ["a", "b"]}))
right = con.create_data_frame(pd.DataFrame({"k": [2, 3], "w": ["y", "z"]}))
# `outer` returns all rows from both sides; the merged key column has
# the union of values.
out = left.join(right, on="k", how="outer").sort("k").to_pandas()
assert sorted(out["k"].tolist()) == [1, 2, 3]
assert len(out) == 3
Comment on lines +76 to +80
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this isn't testing the actual dataframe output?



def test_join_left_semi(con):
# Left-semi: rows from the left that have a match on the right, no
# right-side columns returned.
left = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 3], "w": ["x", "z"]}))
out = left.join(right, on="k", how="left_semi").sort("k").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"k": [1, 3], "v": ["a", "c"]}))


def test_join_left_anti(con):
# Left-anti: rows from the left with no match on the right.
left = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "v": ["a", "b", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 3], "w": ["x", "z"]}))
out = left.join(right, on="k", how="left_anti").sort("k").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"k": [2], "v": ["b"]}))


def test_join_right_semi(con):
# Right-semi: rows from the right that have a match on the left, no
# left-side columns returned.
left = con.create_data_frame(pd.DataFrame({"k": [1, 3], "v": ["a", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "w": ["x", "y", "z"]}))
out = left.join(right, on="k", how="right_semi").sort("k").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"k": [1, 3], "w": ["x", "z"]}))


def test_join_right_anti(con):
# Right-anti: rows from the right with no match on the left.
left = con.create_data_frame(pd.DataFrame({"k": [1, 3], "v": ["a", "c"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1, 2, 3], "w": ["x", "y", "z"]}))
out = left.join(right, on="k", how="right_anti").sort("k").to_pandas()
pdt.assert_frame_equal(out, pd.DataFrame({"k": [2], "w": ["y"]}))


def test_join_returns_lazy_dataframe(con):
left = con.create_data_frame(pd.DataFrame({"k": [1], "v": ["a"]}))
right = con.create_data_frame(pd.DataFrame({"k": [1], "w": ["x"]}))
out = left.join(right, on="k")
assert isinstance(out, DataFrame)


def test_join_non_dataframe_other_raises(con):
left = con.create_data_frame(pd.DataFrame({"k": [1]}))
with pytest.raises(TypeError, match="join\\(\\) expects a DataFrame"):
left.join({"k": 1}, on="k")


def test_join_on_bad_type_raises(con):
left = con.create_data_frame(pd.DataFrame({"k": [1]}))
right = con.create_data_frame(pd.DataFrame({"k": [1]}))
with pytest.raises(TypeError, match="`on` expects str or list of str"):
left.join(right, on=123)


def test_join_on_empty_list_raises(con):
left = con.create_data_frame(pd.DataFrame({"k": [1]}))
right = con.create_data_frame(pd.DataFrame({"k": [1]}))
with pytest.raises(ValueError, match="at least one column name"):
left.join(right, on=[])


def test_join_on_list_with_non_str_raises(con):
left = con.create_data_frame(pd.DataFrame({"k": [1]}))
right = con.create_data_frame(pd.DataFrame({"k": [1]}))
with pytest.raises(TypeError, match="`on` list must contain only str"):
left.join(right, on=["k", 1])


def test_join_bad_how_raises(con):
left = con.create_data_frame(pd.DataFrame({"k": [1]}))
right = con.create_data_frame(pd.DataFrame({"k": [1]}))
with pytest.raises(ValueError, match="`how` must be one of"):
left.join(right, on="k", how="cross")


def test_join_unknown_column_errors(con):
# DataFusion raises on unknown column at plan-build time.
from sedonadb._lib import SedonaError

left = con.create_data_frame(pd.DataFrame({"a": [1]}))
right = con.create_data_frame(pd.DataFrame({"b": [1]}))
with pytest.raises(SedonaError):
left.join(right, on="nonexistent")