Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# Sort Pushdown Benchmarks
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files

# Sorted Data Benchmarks (ORDER BY Optimization)
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)

Expand Down Expand Up @@ -309,6 +313,10 @@ main() {
# same data as for tpch
data_tpch "1" "parquet"
;;
sort_pushdown|sort_pushdown_sorted)
# same data as for tpch
data_tpch "1" "parquet"
;;
sort_tpch)
# same data as for tpch
data_tpch "1" "parquet"
Expand Down Expand Up @@ -509,6 +517,12 @@ main() {
external_aggr)
run_external_aggr
;;
sort_pushdown)
run_sort_pushdown
;;
sort_pushdown_sorted)
run_sort_pushdown_sorted
;;
sort_tpch)
run_sort_tpch "1"
;;
Expand Down Expand Up @@ -1070,6 +1084,22 @@ run_external_aggr() {
debug_run $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the sort pushdown benchmark (without WITH ORDER)
run_sort_pushdown() {
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown.json"
echo "Running sort pushdown benchmark (no WITH ORDER)..."
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort pushdown benchmark with WITH ORDER (enables sort elimination)
run_sort_pushdown_sorted() {
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_sorted.json"
echo "Running sort pushdown benchmark (with WITH ORDER)..."
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort integration benchmark
run_sort_tpch() {
SCALE_FACTOR=$1
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

use datafusion_benchmarks::{
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, sort_tpch, tpcds,
tpch,
};

#[derive(Debug, Parser)]
Expand All @@ -53,6 +54,7 @@ enum Options {
Imdb(imdb::RunOpt),
Nlj(nlj::RunOpt),
Smj(smj::RunOpt),
SortPushdown(sort_pushdown::RunOpt),
SortTpch(sort_tpch::RunOpt),
Tpch(tpch::RunOpt),
Tpcds(tpcds::RunOpt),
Expand All @@ -72,6 +74,7 @@ pub async fn main() -> Result<()> {
Options::Imdb(opt) => Box::pin(opt.run()).await,
Options::Nlj(opt) => opt.run().await,
Options::Smj(opt) => opt.run().await,
Options::SortPushdown(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Options::Tpch(opt) => Box::pin(opt.run()).await,
Options::Tpcds(opt) => Box::pin(opt.run()).await,
Expand Down
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod hj;
pub mod imdb;
pub mod nlj;
pub mod smj;
pub mod sort_pushdown;
pub mod sort_tpch;
pub mod tpcds;
pub mod tpch;
Expand Down
306 changes: 306 additions & 0 deletions benchmarks/src/sort_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmark for sort pushdown optimization.
//!
//! Tests performance of sort elimination when files are non-overlapping and
//! internally sorted (declared via `--sorted` / `WITH ORDER`).
//!
//! # Usage
//!
//! ```text
//! # Prepare sorted TPCH lineitem data (SF=1)
//! ./bench.sh data sort_pushdown
//!
//! # Baseline (no WITH ORDER, full SortExec)
//! ./bench.sh run sort_pushdown
//!
//! # With sort elimination (WITH ORDER, SortExec removed)
//! ./bench.sh run sort_pushdown_sorted
//! ```
//!
//! # Reference Results
//!
//! Measured on 300k rows, 8 non-overlapping sorted parquet files, single partition,
//! debug build (results vary by hardware; relative speedup is the key metric):
//!
//! ```text
//! Query | Description | baseline (ms) | sort eliminated (ms) | speedup
//! ------|----------------------|---------------|---------------------|--------
//! Q1 | ASC full scan | 159 | 91 | 43%
//! Q2 | ASC LIMIT 100 | 36 | 12 | 67%
//! Q3 | ASC full (wide, *) | 487 | 333 | 31%
//! Q4 | ASC LIMIT 100 (wide) | 119 | 30 | 74%
//! ```
//!
//! Key observations:
//! - **LIMIT queries benefit most** (67-74%): sort elimination + limit pushdown
//! means only the first few rows are read before stopping.
//! - **Full scans** (31-43%): saving comes from eliminating the O(n log n) sort
//! step entirely.
//! - **Wide projections** amplify the benefit: larger rows make sorting more
//! expensive, so eliminating it saves more.

use clap::Args;
use futures::StreamExt;
use std::path::PathBuf;
use std::sync::Arc;

use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::error::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
use datafusion_common::instant::Instant;

use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};

#[derive(Debug, Args)]
pub struct RunOpt {
/// Common options
#[command(flatten)]
common: CommonOpt,

/// Sort pushdown query number. If not specified, runs all queries
#[arg(short, long)]
pub query: Option<usize>,

/// Path to data files (lineitem). Only parquet format is supported.
/// Data should be pre-sorted by l_orderkey ASC for meaningful results.
#[arg(required = true, short = 'p', long = "path")]
path: PathBuf,

/// Path to JSON benchmark result to be compared using `compare.py`
#[arg(short = 'o', long = "output")]
output_path: Option<PathBuf>,

/// Mark the first column (l_orderkey) as sorted via WITH ORDER.
/// When set, enables sort elimination for matching queries.
#[arg(short = 't', long = "sorted")]
sorted: bool,
}

pub const SORT_PUSHDOWN_QUERY_START_ID: usize = 1;
pub const SORT_PUSHDOWN_QUERY_END_ID: usize = 4;

impl RunOpt {
const TABLES: [&'static str; 1] = ["lineitem"];

/// Queries benchmarking sort elimination when files are non-overlapping
/// and internally sorted (WITH ORDER declared via `--sorted`).
///
/// With `--sorted`: ParquetSource returns Exact, files are verified
/// non-overlapping by statistics → SortExec eliminated, no SPM needed
/// for single partition.
///
/// Without `--sorted`: baseline with full SortExec.
const QUERIES: [&'static str; 4] = [
// Q1: Sort elimination — full scan
// With --sorted: SortExec removed, sequential scan in file order
// Without --sorted: full SortExec required
r#"
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey
"#,
// Q2: Sort elimination + limit pushdown
// With --sorted: SortExec removed + limit pushed to DataSourceExec
// → reads only first ~100 rows then stops
// Without --sorted: TopK sort over all data
r#"
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey
LIMIT 100
"#,
// Q3: Sort elimination — wide projection (all columns)
// Tests sort elimination benefit with larger row payload
r#"
SELECT *
FROM lineitem
ORDER BY l_orderkey
"#,
// Q4: Sort elimination + limit — wide projection
r#"
SELECT *
FROM lineitem
ORDER BY l_orderkey
LIMIT 100
"#,
];

pub async fn run(&self) -> Result<()> {
let mut benchmark_run = BenchmarkRun::new();

let query_range = match self.query {
Some(query_id) => query_id..=query_id,
None => SORT_PUSHDOWN_QUERY_START_ID..=SORT_PUSHDOWN_QUERY_END_ID,
};

for query_id in query_range {
benchmark_run.start_new_case(&format!("{query_id}"));

let query_results = self.benchmark_query(query_id).await;
match query_results {
Ok(query_results) => {
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
}
}
Err(e) => {
benchmark_run.mark_failed();
eprintln!("Query {query_id} failed: {e}");
}
}
}

benchmark_run.maybe_write_json(self.output_path.as_ref())?;
benchmark_run.maybe_print_failures();
Ok(())
}

async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config()?;
let rt = self.common.build_runtime()?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(rt)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

self.register_tables(&ctx).await?;

let mut millis = vec![];
let mut query_results = vec![];
for i in 0..self.iterations() {
let start = Instant::now();

let query_idx = query_id - 1;
let sql = Self::QUERIES[query_idx].to_string();
let row_count = self.execute_query(&ctx, sql.as_str()).await?;

let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);

println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
query_results.push(QueryResult { elapsed, row_count });
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

print_memory_stats();

Ok(query_results)
}

async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
for table in Self::TABLES {
let table_provider = self.get_table(ctx, table).await?;
ctx.register_table(table, table_provider)?;
}
Ok(())
}

async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<usize> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();

if debug {
println!("=== Logical plan ===\n{plan}\n");
}

let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}

let mut row_count = 0;
let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
while let Some(batch) = stream.next().await {
row_count += batch?.num_rows();
}

if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
}

Ok(row_count)
}

async fn get_table(
&self,
ctx: &SessionContext,
table: &str,
) -> Result<Arc<dyn TableProvider>> {
let path = self.path.to_str().unwrap();
let state = ctx.state();
let path = format!("{path}/{table}");
let format = Arc::new(
ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone()),
);
let extension = DEFAULT_PARQUET_EXTENSION;

let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_collect_stat(true); // Always collect statistics for sort pushdown

let table_path = ListingTableUrl::parse(path)?;
let schema = options.infer_schema(&state, &table_path).await?;
let options = if self.sorted {
// Declare the first column (l_orderkey) as sorted
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
} else {
options
};

let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
}

fn iterations(&self) -> usize {
self.common.iterations
}
}
Loading
Loading