Skip to content

Support Filter Pushdown in Spark Structured Streaming#16210

Open
jalpan-randeri wants to merge 1 commit intoapache:mainfrom
jalpan-randeri:jalpan/streaming-filter-pushdown
Open

Support Filter Pushdown in Spark Structured Streaming#16210
jalpan-randeri wants to merge 1 commit intoapache:mainfrom
jalpan-randeri:jalpan/streaming-filter-pushdown

Conversation

@jalpan-randeri
Copy link
Copy Markdown

@jalpan-randeri jalpan-randeri commented May 5, 2026

Overview

This PR introduces the core Iceberg-side support for predicate pushdown within Spark Structured Streaming.

Currently, Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream. By moving filter evaluation from the Spark executor level to the Iceberg metadata level (during partition planning), we significantly reduce:

  1. I/O Overhead: Fewer manifests and data files are scanned.
  2. Driver Memory Pressure: Reduced metadata processing during task planning.
  3. Compute Costs: Lower scheduling overhead for micro-batches on high-cardinality partitioned tables.

Note on Dependencies: This commit provides the necessary interface and implementation in the Iceberg connector. To fully leverage this, a corresponding PR will be raised in the Apache Spark repository to ensure the engine correctly passes these predicates to the V2 streaming source.

Spark changes - apache/spark#55679

Fixes #15692

Testing & Verification

scala> val streamingDF = spark.readStream
     |   .format("iceberg")
     |   .load("local.db.filtered_stream")
     |   .filter("part = 'active'") // This filter is currently NOT pushed down to Iceberg
     |
     | val query = streamingDF.writeStream
     |   .format("console")
     |   .option("checkpointLocation", s"/tmp/iceberg_stream_${System.currentTimeMillis()}")
     |   .start()

val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, part: string]
val query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.runtime.StreamingQueryWrapper@577c2d74

scala> 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch stream
26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
26/05/03 20:32:59 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+
| id|  part|
+---+------+
|  1|active|
+---+------+

Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans.
This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream.
@jalpan-randeri jalpan-randeri force-pushed the jalpan/streaming-filter-pushdown branch from 99c5b40 to 3bb865b Compare May 5, 2026 02:16
@singhpk234 singhpk234 self-requested a review May 5, 2026 15:34
Copy link
Copy Markdown
Contributor

@anoopj anoopj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jalpan, thanks for this PR. A few comments.


for (Expression filter : pushedFilters) {
if (isPartitionOnly(
filter, specsById.values().iterator().next().partitionType(), caseSensitive)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tables with partition evolution will have multiple specs. The code here is grabbing an arbitrary partition spec. Take a look at ExpressionUtil.selectsPartitions() for an example.

// If this doesn't throw an error, it means the filter
// only uses columns present in the partition schema.
Binder.bind(partitionType, expr, caseSensitive);
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good idea to do exception-driven control flow for normal operations. The code base already has ExpressionUtil.selectsPartitions() which handles this.

import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class already has a logger called LOG a few lines below. This is initializing a duplicate logger.

new InitialOffsetStore(
table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration());
this.initialOffset = initialOffsetStore.initialOffset();
LOGGER.error("[jalpan] creating micro batch with filter {} ", filters);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these? Also below on L137

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Filter Pushdown for Spark Structured Streaming Reads

2 participants