Skip to content

Predicate Pushdown in Spark Structured Streaming (DataSource V2).#55679

Draft
jalpan-randeri wants to merge 1 commit intoapache:masterfrom
jalpan-randeri:jalpan/structured-streaming-filter-pushdown-4.2
Draft

Predicate Pushdown in Spark Structured Streaming (DataSource V2).#55679
jalpan-randeri wants to merge 1 commit intoapache:masterfrom
jalpan-randeri:jalpan/structured-streaming-filter-pushdown-4.2

Conversation

@jalpan-randeri
Copy link
Copy Markdown

@jalpan-randeri jalpan-randeri commented May 5, 2026

What changes were proposed in this pull request?

This PR introduces support for Predicate Pushdown in Spark Structured Streaming (DataSource V2).

This allows DSv2 connectors (like Apache Iceberg) to enabling metadata-level file pruning and reduced I/O for streaming micro-batches.

Fixes - #55680

Why are the changes needed?

Currently, Spark Structured streaming via the DSv2 api does not pushdown predicate. This results in more data being scan and filtered out at engine layer. This results in excessive I/O, driver bottlenecks and increased latency.

Does this PR introduce any user-facing change?

No There is no change to the user-facing API
This change improves the performance in the presence of filter at partition level.

How was this patch tested?

  • Added PushDownPredicateInMicroBatchExecutionSuite tests
  • Manual Testing
scala> val streamingDF = spark.readStream
     |   .format("iceberg")
     |   .load("local.db.filtered_stream")
     |   .filter("part = 'active'") // This filter is currently NOT pushed down to Iceberg
     |
     | val query = streamingDF.writeStream
     |   .format("console")
     |   .option("checkpointLocation", s"/tmp/iceberg_stream_${System.currentTimeMillis()}")
     |   .start()

val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, part: string]
val query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.runtime.StreamingQueryWrapper@577c2d74

scala> 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch stream
26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+
| id|  part|
+---+------+
|  1|active|
+---+------+

Was this patch authored or co-authored using generative AI tooling?

No

This allows DSv2 connectors (like Apache Iceberg) to enabling metadata-level
file pruning and reduced I/O for streaming micro-batches.

Currently, Spark Structured streaming via the DSv2 api does not pushdown predicate.
This results in more data being scan and filtered out at engine layer,
results in excessive I/O, driver bottlenecks and increased latency.

- Added PushDownPredicateInMicroBatchExecutionSuite  tests
- Manual Testing
@thesquelched
Copy link
Copy Markdown

Deleted some comments I made because I didn't realize there were two PR's (one iceberg, one spark) to address this issue. Sorry!

@jalpan-randeri
Copy link
Copy Markdown
Author

No problem at all, Scott— I completely understand. It’s a bit of a moving target across the two PRs. I'd still love to incorporate the subquery insight you had; could you re-share that test query? I want to make sure I’ve got full coverage in both the Spark and Iceberg logic before I refresh the PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants