Support Filter Pushdown in Spark Structured Streaming#16210
Open
jalpan-randeri wants to merge 1 commit intoapache:mainfrom
Open
Support Filter Pushdown in Spark Structured Streaming#16210jalpan-randeri wants to merge 1 commit intoapache:mainfrom
jalpan-randeri wants to merge 1 commit intoapache:mainfrom
Conversation
Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream.
99c5b40 to
3bb865b
Compare
anoopj
reviewed
May 6, 2026
Contributor
anoopj
left a comment
There was a problem hiding this comment.
Jalpan, thanks for this PR. A few comments.
|
|
||
| for (Expression filter : pushedFilters) { | ||
| if (isPartitionOnly( | ||
| filter, specsById.values().iterator().next().partitionType(), caseSensitive)) { |
Contributor
There was a problem hiding this comment.
Tables with partition evolution will have multiple specs. The code here is grabbing an arbitrary partition spec. Take a look at ExpressionUtil.selectsPartitions() for an example.
| // If this doesn't throw an error, it means the filter | ||
| // only uses columns present in the partition schema. | ||
| Binder.bind(partitionType, expr, caseSensitive); | ||
| return true; |
Contributor
There was a problem hiding this comment.
It's not a good idea to do exception-driven control flow for normal operations. The code base already has ExpressionUtil.selectsPartitions() which handles this.
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class); |
Contributor
There was a problem hiding this comment.
This class already has a logger called LOG a few lines below. This is initializing a duplicate logger.
| new InitialOffsetStore( | ||
| table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration()); | ||
| this.initialOffset = initialOffsetStore.initialOffset(); | ||
| LOGGER.error("[jalpan] creating micro batch with filter {} ", filters); |
Contributor
There was a problem hiding this comment.
Remove these? Also below on L137
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Overview
This PR introduces the core Iceberg-side support for predicate pushdown within Spark Structured Streaming.
Currently, Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream. By moving filter evaluation from the Spark executor level to the Iceberg metadata level (during partition planning), we significantly reduce:
Spark changes - apache/spark#55679
Fixes #15692
Testing & Verification