Skip to content

Commit 06feddd

Browse files
Add nth_value window function with tests
1 parent 89e5a77 commit 06feddd

2 files changed

Lines changed: 91 additions & 0 deletions

File tree

duckdb/experimental/spark/sql/functions.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6570,3 +6570,79 @@ def lead(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) ->
65706570
+---+---+----------+
65716571
""" # noqa: D205, D212
65726572
return _invoke_function("lead", _to_column_expr(col), ConstantExpression(offset), ConstantExpression(default))
6573+
6574+
6575+
def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = False) -> Column:
6576+
"""Window function: returns the value that is the `offset`\\th row of the window frame
6577+
(counting from 1), and `null` if the size of window frame is less than `offset` rows.
6578+
6579+
It will return the `offset`\\th non-null value it sees when `ignoreNulls` is set to
6580+
true. If all values are null, then null is returned.
6581+
6582+
This is equivalent to the nth_value function in SQL.
6583+
6584+
.. versionadded:: 3.1.0
6585+
6586+
.. versionchanged:: 3.4.0
6587+
Supports Spark Connect.
6588+
6589+
Parameters
6590+
----------
6591+
col : :class:`~pyspark.sql.Column` or column name
6592+
name of column or expression
6593+
offset : int
6594+
number of row to use as the value
6595+
ignoreNulls : bool, optional
6596+
indicates the Nth value should skip null in the
6597+
determination of which row to use
6598+
6599+
Returns:
6600+
-------
6601+
:class:`~pyspark.sql.Column`
6602+
value of nth row.
6603+
6604+
Examples:
6605+
--------
6606+
>>> from pyspark.sql import functions as sf
6607+
>>> from pyspark.sql import Window
6608+
>>> df = spark.createDataFrame(
6609+
... [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"]
6610+
... )
6611+
>>> df.show()
6612+
+---+---+
6613+
| c1| c2|
6614+
+---+---+
6615+
| a| 1|
6616+
| a| 2|
6617+
| a| 3|
6618+
| b| 8|
6619+
| b| 2|
6620+
+---+---+
6621+
6622+
>>> w = Window.partitionBy("c1").orderBy("c2")
6623+
>>> df.withColumn("nth_value", sf.nth_value("c2", 1).over(w)).show()
6624+
+---+---+---------+
6625+
| c1| c2|nth_value|
6626+
+---+---+---------+
6627+
| a| 1| 1|
6628+
| a| 2| 1|
6629+
| a| 3| 1|
6630+
| b| 2| 2|
6631+
| b| 8| 2|
6632+
+---+---+---------+
6633+
6634+
>>> df.withColumn("nth_value", sf.nth_value("c2", 2).over(w)).show()
6635+
+---+---+---------+
6636+
| c1| c2|nth_value|
6637+
+---+---+---------+
6638+
| a| 1| NULL|
6639+
| a| 2| 2|
6640+
| a| 3| 2|
6641+
| b| 2| NULL|
6642+
| b| 8| 8|
6643+
+---+---+---------+
6644+
""" # noqa: D205, D301
6645+
if ignoreNulls:
6646+
msg = "The ignoreNulls option of nth_value is not supported yet."
6647+
raise ContributionsAcceptedError(msg)
6648+
return _invoke_function("nth_value", _to_column_expr(col), ConstantExpression(offset))

tests/fast/spark/test_spark_functions_window.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,18 @@ def test_lead(self, spark):
162162
Row(c1="b", c2=2, next_value=8, next_value_default=8, next_value_offset2=-1),
163163
Row(c1="b", c2=8, next_value=None, next_value_default=0, next_value_offset2=-1),
164164
]
165+
166+
def test_nth_value(self, spark):
167+
df = spark.createDataFrame(data=[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], schema=["c1", "c2"])
168+
w = Window.partitionBy("c1").orderBy("c2")
169+
df = df.withColumn("nth1", F.nth_value("c2", 1).over(w))
170+
df = df.withColumn("nth2", F.nth_value("c2", 2).over(w))
171+
res = df.sort("c1", "c2").collect()
172+
173+
assert res == [
174+
Row(c1="a", c2=1, nth1=1, nth2=None),
175+
Row(c1="a", c2=2, nth1=1, nth2=2),
176+
Row(c1="a", c2=3, nth1=1, nth2=2),
177+
Row(c1="b", c2=2, nth1=2, nth2=None),
178+
Row(c1="b", c2=8, nth1=2, nth2=8),
179+
]

0 commit comments

Comments
 (0)