Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
select_exprs: mut select_exprs_post_aggr,
having_expr: having_expr_post_aggr,
qualify_expr: qualify_expr_post_aggr,
order_by_exprs: order_by_rex,
order_by_exprs: mut order_by_rex,
} = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
self.aggregate(
&base_plan,
Expand Down Expand Up @@ -297,14 +297,23 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan
};

// The outer expressions we will search through for window functions.
// Window functions may be sourced from the SELECT list or from the QUALIFY expression.
let windows_expr_haystack = select_exprs_post_aggr
.iter()
.chain(qualify_expr_post_aggr.iter());
// The window expressions from SELECT and QUALIFY only, used to validate that
// QUALIFY is used with window functions (ORDER BY window functions don't count).
let qualify_window_func_exprs = find_window_exprs(
select_exprs_post_aggr
.iter()
.chain(qualify_expr_post_aggr.iter()),
);

// All of the window expressions (deduplicated and rewritten to reference aggregates as
// columns from input).
let window_func_exprs = find_window_exprs(windows_expr_haystack);
// columns from input). Window functions may be sourced from the SELECT list, QUALIFY
// expression, or ORDER BY.
let window_func_exprs = find_window_exprs(
select_exprs_post_aggr
.iter()
.chain(qualify_expr_post_aggr.iter())
.chain(order_by_rex.iter().map(|s| &s.expr)),
);

// Process window functions after aggregation as they can reference
// aggregate functions in their body
Expand All @@ -319,14 +328,25 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
.collect::<Result<Vec<Expr>>>()?;

order_by_rex = order_by_rex
.into_iter()
.map(|sort_expr| {
Ok(sort_expr.with_expr(rebase_expr(
&sort_expr.expr,
&window_func_exprs,
&plan,
)?))
})
.collect::<Result<Vec<_>>>()?;

plan
};

// Process QUALIFY clause after window functions
// QUALIFY filters the results of window functions, similar to how HAVING filters aggregates
let plan = if let Some(qualify_expr) = qualify_expr_post_aggr {
// Validate that QUALIFY is used with window functions
if window_func_exprs.is_empty() {
// Validate that QUALIFY is used with window functions in SELECT or QUALIFY
if qualify_window_func_exprs.is_empty() {
return plan_err!(
"QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
);
Expand Down
142 changes: 142 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,138 @@ fn over_order_by_with_window_frame_double_end() {
);
}

#[test]
fn window_function_only_in_order_by() {
let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: orders
"
);
}

#[test]
fn window_function_in_select_and_order_by() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id) FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: orders
"
);
}

#[test]
fn window_function_in_order_by_nested_expr() {
let sql =
"SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) + 1";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + Int64(1) ASC NULLS LAST
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: orders
"
);
}

#[test]
fn window_function_in_order_by_desc() {
let sql =
"SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id) DESC";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW DESC NULLS FIRST
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: orders
"
);
}

#[test]
fn multiple_window_functions_in_order_by() {
let sql = "SELECT order_id FROM orders ORDER BY MAX(qty) OVER (ORDER BY order_id), MIN(qty) OVER (ORDER BY order_id DESC)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id
Sort: max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
Projection: orders.order_id, max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(orders.qty) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
WindowAggr: windowExpr=[[min(orders.qty) ORDER BY [orders.order_id DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: orders
"
);
}

#[test]
fn window_function_in_order_by_with_group_by() {
let sql = "SELECT order_id, SUM(qty) FROM orders GROUP BY order_id ORDER BY MAX(SUM(qty)) OVER (ORDER BY order_id)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id, sum(orders.qty)
Sort: max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST
Projection: orders.order_id, sum(orders.qty), max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
WindowAggr: windowExpr=[[max(sum(orders.qty)) ORDER BY [orders.order_id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
Aggregate: groupBy=[[orders.order_id]], aggr=[[sum(orders.qty)]]
TableScan: orders
"
);
}

#[test]
fn window_function_in_order_by_with_qualify() {
let sql = "SELECT person.id, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn FROM person QUALIFY rn = 1 ORDER BY ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Sort: rn ASC NULLS LAST
Projection: person.id, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = Int64(1)
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: person
"
);
}

#[test]
fn window_function_in_order_by_not_in_select() {
let sql =
"SELECT order_id FROM orders ORDER BY MIN(qty) OVER (PARTITION BY order_id)";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r"
Projection: orders.order_id
Sort: min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ASC NULLS LAST
Projection: orders.order_id, min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
WindowAggr: windowExpr=[[min(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
TableScan: orders
"
);
}

#[test]
fn over_order_by_with_window_frame_single_end() {
let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id ROWS 3 PRECEDING), MIN(qty) OVER (ORDER BY order_id DESC) from orders";
Expand Down Expand Up @@ -4256,6 +4388,16 @@ fn test_select_qualify_without_window_function() {
);
}

#[test]
fn test_select_qualify_without_window_function_but_window_in_order_by() {
let sql = "SELECT person.id FROM person QUALIFY person.id > 1 ORDER BY ROW_NUMBER() OVER (ORDER BY person.id)";
let err = logical_plan(sql).unwrap_err();
assert_eq!(
err.strip_backtrace(),
"Error during planning: QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
);
}

#[test]
fn test_select_qualify_complex_condition() {
let sql = "SELECT person.id, person.age, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn, RANK() OVER (ORDER BY person.salary) as rank FROM person QUALIFY rn <= 2 AND rank <= 5";
Expand Down
91 changes: 91 additions & 0 deletions datafusion/sqllogictest/test_files/window_order_by.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Tests for window functions used in ORDER BY clause

statement ok
set datafusion.execution.target_partitions = 1;

statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
)
STORED AS CSV
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true');

# Window function only in ORDER BY
query I
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5;
----
4
2
5
2
2

# Window function in both SELECT and ORDER BY (deduplication)
query II
SELECT c2, row_number() OVER (ORDER BY c9) as rn FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) LIMIT 5;
----
4 1
2 2
5 3
2 4
2 5

# Nested expression: ORDER BY window_func(...) + 1
query I
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) + 1 LIMIT 5;
----
4
2
5
2
2

# Multiple window functions in ORDER BY
query I
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9), max(c3) OVER (ORDER BY c9) LIMIT 5;
----
4
2
5
2
2

# DESC ordering with window function
query I
SELECT c2 FROM aggregate_test_100 ORDER BY row_number() OVER (ORDER BY c9) DESC LIMIT 5;
----
5
1
1
2
1
Loading