Skip to content

Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable#35589

Closed
talatuyarer wants to merge 499 commits into
apache:masterfrom
talatuyarer:parquet-filter-pushdown-support
Closed

Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable#35589
talatuyarer wants to merge 499 commits into
apache:masterfrom
talatuyarer:parquet-filter-pushdown-support

Conversation

@talatuyarer
Copy link
Copy Markdown
Contributor

This PR fixes #19748

This pull request introduces support for filter and projection pushdown to the ParquetIO connector when used as a Beam SQL source, significantly improving query performance by reducing the amount of data read from disk and transferred over the network.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @m-trieu for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @talatuyarer, left some comments

Comment on lines +102 to +103
private static FilterPredicate convert(RexNode e, Schema beamSchema) {
SqlKind kind = e.getKind();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that RexNode e is indeed a RexCall?

List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size());
SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS;

// CHANGE: Use an explicit loop
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elaborate what this means? Should this be a TODO?

return beamSchema.getField(columnRef.getIndex()).getName();
}

private static FilterPredicate createIntPredicate(SqlKind kind, String name, Integer value) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the create___Predicate methods can be merged into one method. A lot of it seems to be duplicated logic

Comment on lines +269 to +276
case DATE:
case TIME:
case TIMESTAMP:
case ARRAY:
case MAP:
case ROW:
default:
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will suppress bad filters and may confuse users who expect it to work. Can we throw an UnsupportedOperationException instead?

Comment on lines +207 to +208
default:
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw an UnsupportedOperationException?

}

/** Specifies a filter predicate to use for filtering records. */
public Read withFilter(ParquetFilter predicate) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would a Java user make use of this? This sink relies on ParquetFilter being an instance of ParquetFilterImpl, which is not available to the user.

import org.apache.parquet.filter2.predicate.Operators.LongColumn;
import org.apache.parquet.io.api.Binary;

public class ParquetFilterFactory {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need thorough testing for this class

Schema projectionSchema = projectSchema(schema, fieldNames);
LOG.info("Projecting fields schema: {}", projectionSchema);
read = read.withProjection(projectionSchema, projectionSchema);
read = read.withProjection(readSchema, readSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be making the unwanted columns nullable for the second readSchema. Check ParquetIO javadoc:

* <p>Reading with projection can be enabled with the projection schema as following. Splittable
* reading is enabled when reading with projection. The projection_schema contains only the column
* that we would like to read and encoder_schema contains the schema to encode the output with the
* unwanted columns changed to nullable. Partial reading provide decrease of reading time due to
* partial processing of the data and partial encoding. The decrease in the reading time depends on

Comment on lines +94 to +101
@Parameter(3)
public long expectedReadCount;

@Parameter(4)
public List<Row> expectedRows;

@Parameter(5)
public Schema expectedSchema;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need expectedReadCount or expectedSchema, right? That information is already captured in expectedRows

Comment on lines +144 to +145
"SELECT * FROM ProductInfo WHERE price > ? AND is_stocked = ?",
Arrays.asList(100.0, true),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the values inside the array supposed to be substituted for "?" ? If so, the output seems to be incorrect

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @m-trieu

@damccorm
Copy link
Copy Markdown
Contributor

R: @ahmedabu98

Keeping this sticky/silencing the bot

@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@derrickaw
Copy link
Copy Markdown
Collaborator

waiting on author

dependabot Bot and others added 16 commits August 27, 2025 10:21
Bumps [github.com/aws/aws-sdk-go-v2/credentials](https://github.com/aws/aws-sdk-go-v2) from 1.18.6 to 1.18.7.
- [Release notes](https://github.com/aws/aws-sdk-go-v2/releases)
- [Changelog](https://github.com/aws/aws-sdk-go-v2/blob/config/v1.18.7/CHANGELOG.md)
- [Commits](aws/aws-sdk-go-v2@config/v1.18.6...config/v1.18.7)

---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go-v2/credentials
  dependency-version: 1.18.7
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
)

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.74.2 to 1.75.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](grpc/grpc-go@v1.74.2...v1.75.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-version: 1.75.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…che#35948)

* fix(parquetio): handle missing nullable fields in row conversion

Add null value handling when converting rows to Arrow tables for nullable fields that are missing from input data. This fixes KeyError when writing to Parquet with missing nullable fields, addressing issue apache#35791.

* fix lint
…he#35980)

Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.56.0 to 1.56.1.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
- [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
- [Commits](googleapis/google-cloud-go@spanner/v1.56.0...storage/v1.56.1)

---
updated-dependencies:
- dependency-name: cloud.google.com/go/storage
  dependency-version: 1.56.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix segv when docker container is self-terminated

* Add some debug logging for docker and process env.
* add a jinja include pipeline example

* update yaml doc with import example

* address gemini and other comments

* fix table of contents for readme

* add link to jinja pipeline examples
* Add the base log_analyzer

* Add github action for security logging

* Enhance LogAnalyzer to filter logs by time range and include file names in event summary

* Add dry-run option for weekly email report generation in LogAnalyzer

* Better error handling for timezones and missing details

* Refactor LogAnalyzer to use SinkCls for type consistency and enhance bucket permission management for log sinks
* add import jinja pipeline example

* revert name change

* update overall examples readme

* fix lint issue

* fix gemini small issue

* Update sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md

---------

Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
Fix PostCommit Python Arm job
…#35920)

* sdks/python: properly make milvus as extra dependency

* sdks/python: update image requirements

* .github: trigger postcommit python

* sdks/python: fix linting issues

* sdks/python: fix formatting issues

* .github: trigger beam postcommit python

* sdks/python: revert milvus version in itests

* sdks/python: update image requirements

* trigger_files: trigger postcommit python

* workflows: capture DinD tests in PreCommit Py Coverage workflow

* workflows: temporarily removing `ubuntu-latest` till resolving deps

* workflows: add `matrix.os` label to `beam_PreCommit_Python_Coverage`
@anowardear062-svg
Copy link
Copy Markdown

Thanks

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Dec 2, 2025

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Dec 2, 2025
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Dec 9, 2025

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Dec 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment