Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable#35589
Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable#35589talatuyarer wants to merge 499 commits into
Conversation
|
Assigning reviewers: R: @m-trieu for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
ahmedabu98
left a comment
There was a problem hiding this comment.
Thanks @talatuyarer, left some comments
| private static FilterPredicate convert(RexNode e, Schema beamSchema) { | ||
| SqlKind kind = e.getKind(); |
There was a problem hiding this comment.
Do we need to validate that RexNode e is indeed a RexCall?
| List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); | ||
| SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; | ||
|
|
||
| // CHANGE: Use an explicit loop |
There was a problem hiding this comment.
Elaborate what this means? Should this be a TODO?
| return beamSchema.getField(columnRef.getIndex()).getName(); | ||
| } | ||
|
|
||
| private static FilterPredicate createIntPredicate(SqlKind kind, String name, Integer value) { |
There was a problem hiding this comment.
I think the create___Predicate methods can be merged into one method. A lot of it seems to be duplicated logic
| case DATE: | ||
| case TIME: | ||
| case TIMESTAMP: | ||
| case ARRAY: | ||
| case MAP: | ||
| case ROW: | ||
| default: | ||
| return null; |
There was a problem hiding this comment.
This will suppress bad filters and may confuse users who expect it to work. Can we throw an UnsupportedOperationException instead?
| default: | ||
| return null; |
There was a problem hiding this comment.
throw an UnsupportedOperationException?
| } | ||
|
|
||
| /** Specifies a filter predicate to use for filtering records. */ | ||
| public Read withFilter(ParquetFilter predicate) { |
There was a problem hiding this comment.
How would a Java user make use of this? This sink relies on ParquetFilter being an instance of ParquetFilterImpl, which is not available to the user.
| import org.apache.parquet.filter2.predicate.Operators.LongColumn; | ||
| import org.apache.parquet.io.api.Binary; | ||
|
|
||
| public class ParquetFilterFactory { |
There was a problem hiding this comment.
We need thorough testing for this class
| Schema projectionSchema = projectSchema(schema, fieldNames); | ||
| LOG.info("Projecting fields schema: {}", projectionSchema); | ||
| read = read.withProjection(projectionSchema, projectionSchema); | ||
| read = read.withProjection(readSchema, readSchema); |
There was a problem hiding this comment.
I think we should be making the unwanted columns nullable for the second readSchema. Check ParquetIO javadoc:
| @Parameter(3) | ||
| public long expectedReadCount; | ||
|
|
||
| @Parameter(4) | ||
| public List<Row> expectedRows; | ||
|
|
||
| @Parameter(5) | ||
| public Schema expectedSchema; |
There was a problem hiding this comment.
I think we don't need expectedReadCount or expectedSchema, right? That information is already captured in expectedRows
| "SELECT * FROM ProductInfo WHERE price > ? AND is_stocked = ?", | ||
| Arrays.asList(100.0, true), |
There was a problem hiding this comment.
Are the values inside the array supposed to be substituted for "?" ? If so, the output seems to be incorrect
|
Reminder, please take a look at this pr: @m-trieu |
|
R: @ahmedabu98 Keeping this sticky/silencing the bot |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
waiting on author |
Bumps [github.com/aws/aws-sdk-go-v2/credentials](https://github.com/aws/aws-sdk-go-v2) from 1.18.6 to 1.18.7. - [Release notes](https://github.com/aws/aws-sdk-go-v2/releases) - [Changelog](https://github.com/aws/aws-sdk-go-v2/blob/config/v1.18.7/CHANGELOG.md) - [Commits](aws/aws-sdk-go-v2@config/v1.18.6...config/v1.18.7) --- updated-dependencies: - dependency-name: github.com/aws/aws-sdk-go-v2/credentials dependency-version: 1.18.7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
) Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.74.2 to 1.75.0. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](grpc/grpc-go@v1.74.2...v1.75.0) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-version: 1.75.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…che#35948) * fix(parquetio): handle missing nullable fields in row conversion Add null value handling when converting rows to Arrow tables for nullable fields that are missing from input data. This fixes KeyError when writing to Parquet with missing nullable fields, addressing issue apache#35791. * fix lint
…he#35980) Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.56.0 to 1.56.1. - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](googleapis/google-cloud-go@spanner/v1.56.0...storage/v1.56.1) --- updated-dependencies: - dependency-name: cloud.google.com/go/storage dependency-version: 1.56.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Fix segv when docker container is self-terminated * Add some debug logging for docker and process env.
* add a jinja include pipeline example * update yaml doc with import example * address gemini and other comments * fix table of contents for readme * add link to jinja pipeline examples
* Add the base log_analyzer * Add github action for security logging * Enhance LogAnalyzer to filter logs by time range and include file names in event summary * Add dry-run option for weekly email report generation in LogAnalyzer * Better error handling for timezones and missing details * Refactor LogAnalyzer to use SinkCls for type consistency and enhance bucket permission management for log sinks
* add import jinja pipeline example * revert name change * update overall examples readme * fix lint issue * fix gemini small issue * Update sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md --------- Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
Fix PostCommit Python Arm job
…#35920) * sdks/python: properly make milvus as extra dependency * sdks/python: update image requirements * .github: trigger postcommit python * sdks/python: fix linting issues * sdks/python: fix formatting issues * .github: trigger beam postcommit python * sdks/python: revert milvus version in itests * sdks/python: update image requirements * trigger_files: trigger postcommit python * workflows: capture DinD tests in PreCommit Py Coverage workflow * workflows: temporarily removing `ubuntu-latest` till resolving deps * workflows: add `matrix.os` label to `beam_PreCommit_Python_Coverage`
|
Thanks |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR fixes #19748
This pull request introduces support for filter and projection pushdown to the ParquetIO connector when used as a Beam SQL source, significantly improving query performance by reducing the amount of data read from disk and transferred over the network.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.