Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,22 @@ impl DataSource for MemorySourceConfig {
&exprs,
self.projection().as_ref().unwrap_or(&all_projections),
);

MemorySourceConfig::try_new(
self.partitions(),
self.original_schema(),
Some(new_projections),
)
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
let projected_schema =
project_schema(&self.schema, Some(&new_projections));

projected_schema.map(|projected_schema| {
// Clone self to preserve all metadata (fetch, sort_information,
// show_sizes, etc.) then update only the projection-related fields.
let mut new_source = self.clone();
new_source.projection = Some(new_projections);
new_source.projected_schema = projected_schema;
// Project sort information to match the new projection
new_source.sort_information = project_orderings(
&new_source.sort_information,
&new_source.projected_schema,
);
Arc::new(new_source) as Arc<dyn DataSource>
})
})
.transpose()
}
Expand Down Expand Up @@ -897,6 +906,42 @@ mod tests {
Ok(())
}

/// Test that `try_swapping_with_projection` preserves the `fetch` limit.
/// Regression test for <https://github.com/apache/datafusion/issues/21176>
#[test]
fn try_swapping_with_projection_preserves_fetch() {
use datafusion_physical_expr::projection::ProjectionExprs;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Int64, false),
]));
let partitions: Vec<Vec<RecordBatch>> = vec![vec![batch(10)]];
let source = MemorySourceConfig::try_new(&partitions, schema.clone(), None)
.unwrap()
.with_limit(Some(5));

assert_eq!(source.fetch, Some(5));

// Create a projection that reorders columns: [c, a] (indices 2, 0)
let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
let swapped = source
.try_swapping_with_projection(&projection)
.unwrap()
.unwrap();
let new_source = swapped
.as_any()
.downcast_ref::<MemorySourceConfig>()
.unwrap();

assert_eq!(
new_source.fetch,
Some(5),
"fetch limit must be preserved after projection pushdown"
);
}

#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = aggr_test_schema();
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -927,3 +927,46 @@ DROP TABLE t;
# Tear down src_table table:
statement ok
DROP TABLE src_table;

# LIMIT must work when SELECT projects columns in different order than table schema

statement ok
CREATE TABLE t21176 (col_a TEXT, col_b DOUBLE, col_c TEXT) AS VALUES
('a-0', 0, 'c-0'), ('a-1', 1, 'c-1'), ('a-2', 2, 'c-2'), ('a-3', 3, 'c-3'),
('a-4', 4, 'c-4'), ('a-5', 5, 'c-5'), ('a-6', 6, 'c-6'), ('a-7', 7, 'c-7'),
('a-8', 8, 'c-8'), ('a-9', 9, 'c-9'), ('a-10', 10, 'c-10'), ('a-11', 11, 'c-11'),
('a-12', 12, 'c-12'), ('a-13', 13, 'c-13'), ('a-14', 14, 'c-14'), ('a-15', 15, 'c-15'),
('a-16', 16, 'c-16'), ('a-17', 17, 'c-17'), ('a-18', 18, 'c-18'), ('a-19', 19, 'c-19');

# Schema-order SELECT with LIMIT should return 5 rows
query RT rowsort
SELECT col_b, col_c FROM t21176 LIMIT 5;
----
0 c-0
1 c-1
2 c-2
3 c-3
4 c-4

# Reverse-order SELECT with LIMIT should also return 5 rows (not 20)
query TR rowsort
SELECT col_c, col_b FROM t21176 LIMIT 5;
----
c-0 0
c-1 1
c-2 2
c-3 3
c-4 4

# Single column reverse SELECT with LIMIT
query T rowsort
SELECT col_c FROM t21176 LIMIT 5;
----
c-0
c-1
c-2
c-3
c-4

statement ok
DROP TABLE t21176;
Loading