Skip to content
Merged
76 changes: 76 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,82 @@ impl LogicalPlan {
}
}

/// Returns the skip (offset) of this plan node, if it has one.
///
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
pub fn skip(&self) -> Result<Option<usize>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not fold this into fn limit()? It seems like (almost) the same thing to me

Copy link
Contributor Author

@shivbhatia10 shivbhatia10 Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be confusing if limit returns an Option<usize>, but I could make it a boolean function like fn has_internal_limit(&self) -> Result<bool> instead if that's clearer than having two methods. Although I think these methods might be reusable in other contexts as they are now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking that if you know you have a LogicalPlan::Limit you can call get_skip_type directly

Given the size of the DataFusion API already, I would prefer to avoid adding new APIs when possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, would that mean not adding the fetch and skip methods to LogicalPlan at all, and instead handling the logic for each node type individually in the branches of the match statement in rewrite, similar to what we had before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I forgot that the LogicalPlan has both skip/fetch -- this API makes sense. Thank you

match self {
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
SkipType::Literal(0) => Ok(None),
SkipType::Literal(n) => Ok(Some(n)),
SkipType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Sort(_) => Ok(None),
LogicalPlan::TableScan(_) => Ok(None),
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// Returns the fetch (limit) of this plan node, if it has one.
///
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
/// return `Ok(None)`.
pub fn fetch(&self) -> Result<Option<usize>> {
match self {
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
FetchType::Literal(s) => Ok(s),
FetchType::UnsupportedExpr => Ok(None),
},
LogicalPlan::Projection(_) => Ok(None),
LogicalPlan::Filter(_) => Ok(None),
LogicalPlan::Window(_) => Ok(None),
LogicalPlan::Aggregate(_) => Ok(None),
LogicalPlan::Join(_) => Ok(None),
LogicalPlan::Repartition(_) => Ok(None),
LogicalPlan::Union(_) => Ok(None),
LogicalPlan::EmptyRelation(_) => Ok(None),
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::SubqueryAlias(_) => Ok(None),
LogicalPlan::Statement(_) => Ok(None),
LogicalPlan::Values(_) => Ok(None),
LogicalPlan::Explain(_) => Ok(None),
LogicalPlan::Analyze(_) => Ok(None),
LogicalPlan::Extension(_) => Ok(None),
LogicalPlan::Distinct(_) => Ok(None),
LogicalPlan::Dml(_) => Ok(None),
LogicalPlan::Ddl(_) => Ok(None),
LogicalPlan::Copy(_) => Ok(None),
LogicalPlan::DescribeTable(_) => Ok(None),
LogicalPlan::Unnest(_) => Ok(None),
LogicalPlan::RecursiveQuery(_) => Ok(None),
}
}

/// If this node's expressions contains any references to an outer subquery
pub fn contains_outer_reference(&self) -> bool {
let mut contains = false;
Expand Down
66 changes: 66 additions & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,13 @@ impl OptimizerRule for PushDownFilter {
filter.predicate = new_predicate;
}

// If the child has a fetch (limit) or skip (offset), pushing a filter
// below it would change semantics: the limit/offset should apply before
// the filter, not after.
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::Filter(child_filter) => {
let parents_predicates = split_conjunction_owned(filter.predicate);
Expand Down Expand Up @@ -4315,4 +4322,63 @@ mod tests {
"
)
}

#[test]
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
let scan = test_table_scan()?;
let scan_with_fetch = match scan {
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
fetch: Some(10),
..scan
}),
_ => unreachable!(),
};
let plan = LogicalPlanBuilder::from(scan_with_fetch)
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
TableScan: test, fetch=10
"
)
}

#[test]
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("a").sort(true, true)])?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter should be pushed below the sort
assert_optimized_plan_equal!(
plan,
@r"
Sort: test.a ASC NULLS FIRST
TableScan: test, full_filters=[test.a > Int64(10)]
"
)
}

#[test]
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed below the sort when it has a fetch (limit),
// because the limit should apply before the filter.
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
Sort: test.a ASC NULLS FIRST, fetch=5
TableScan: test
"
)
}
}
92 changes: 85 additions & 7 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Post {
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
return FilterDescription::from_children(parent_filters, &self.children());
}

let mut child =
ChildFilterDescription::from_child(&parent_filters, self.input())?;
// In Post phase: block parent filters when fetch is set,
// but still push the TopK dynamic filter (self-filter).
let mut child = if self.fetch.is_some() {
ChildFilterDescription::all_unsupported(&parent_filters)
} else {
ChildFilterDescription::from_child(&parent_filters, self.input())?
};

if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
Expand All @@ -1430,7 +1441,10 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
Expand All @@ -1441,14 +1455,18 @@ mod tests {
use arrow::datatypes::*;
use datafusion_common::ScalarValue;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_execution::RecordBatchStream;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, Literal};

use futures::{FutureExt, Stream};
use futures::{FutureExt, Stream, TryStreamExt};
use insta::assert_snapshot;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -2747,10 +2765,6 @@ mod tests {
/// those bytes become unaccounted-for reserved memory that nobody uses.
#[tokio::test]
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};

let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB

// Pool: merge reservation (10KB) + enough room for sort to work.
Expand Down Expand Up @@ -2861,4 +2875,68 @@ mod tests {
drop(contender);
Ok(())
}

fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(schema));
SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(fetch)
}

#[test]
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Sort with fetch (TopK) must not allow filters to be pushed below it.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
Ok(())
}

#[test]
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(None);
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
// Plain sort (no fetch) is filter-commutative.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::Yes
));
Ok(())
}

#[test]
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
assert!(sort.filter.is_some(), "TopK filter should be created");

let mut config = ConfigOptions::new();
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![Arc::new(Column::new("a", 0))],
&config,
)?;
// Parent filters are still blocked in the Post phase.
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
// But the TopK self-filter should be pushed down.
assert_eq!(desc.self_filters()[0].len(), 1);
Ok(())
}
}
39 changes: 39 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,45 @@ limit 1000;
statement ok
DROP TABLE test_limit_with_partitions;

# Tests for filter pushdown behavior with Sort + LIMIT (fetch).

statement ok
CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);

# Take the 3 smallest values (100, 200, 300), then filter value > 200.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
3 300

# Take the 3 largest values (500, 400, 300), then filter value < 400.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
----
3 300

# The filter stays above the sort+fetch in the plan.
query TT
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
logical_plan
01)SubqueryAlias: sub
02)--Filter: t.value > Int32(200)
03)----Sort: t.value ASC NULLS LAST, fetch=3
04)------TableScan: t projection=[id, value]
physical_plan
01)FilterExec: value@1 > 200
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t;

# Tear down src_table table:
statement ok
DROP TABLE src_table;
15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
02)--Sort: rn1 ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it definitely had the filter too low 👍

02)--Filter: rn1 < UInt64(50)
03)----Sort: rn1 ASC NULLS LAST, fetch=5
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
physical_plan
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
01)FilterExec: rn1@5 < 50
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
03)----GlobalLimitExec: skip=0, fetch=5
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]

# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being
Expand Down
Loading