Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion duckdb/experimental/spark/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

if TYPE_CHECKING:
from ._typing import DateTimeLiteral, DecimalLiteral, LiteralType
from .window import WindowSpec

from duckdb import ColumnExpression, ConstantExpression, Expression, FunctionExpression
from duckdb import ColumnExpression, ConstantExpression, Expression, FunctionExpression, SQLExpression
from duckdb.sqltypes import DuckDBPyType

__all__ = ["Column"]
Expand Down Expand Up @@ -359,3 +360,44 @@ def isNull(self) -> "Column": # noqa: D102

def isNotNull(self) -> "Column": # noqa: D102
return Column(self.expr.isnotnull())

def over(self, window_spec: "WindowSpec") -> "Column":
"""Define a windowing column.

.. versionadded:: 1.4.0

.. versionchanged:: 3.4.0
Supports Spark Connect.

Parameters
----------
window : :class:`WindowSpec`

Returns:
-------
:class:`Column`

Examples:
--------
>>> from pyspark.sql import Window
>>> window = (
... Window.partitionBy("name")
... .orderBy("age")
... .rowsBetween(Window.unboundedPreceding, Window.currentRow)
... )
>>> from pyspark.sql.functions import rank, min, desc
>>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], ["age", "name"])
>>> df.withColumn("rank", rank().over(window)).withColumn(
... "min", min("age").over(window)
... ).sort(desc("age")).show()
+---+-----+----+---+
|age| name|rank|min|
+---+-----+----+---+
| 5| Bob| 1| 5|
| 2|Alice| 1| 2|
+---+-----+----+---+
"""
col_expr = self.expr
window_expr = window_spec._window_expr()
full_expr = f"{col_expr} OVER ({window_expr})"
return Column(SQLExpression(full_expr))
Loading
Loading