-
Notifications
You must be signed in to change notification settings - Fork 180
Dedup pushdown (TopHits Agg) should work with Object fields #4991
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dedup pushdown (TopHits Agg) should work with Object fields #4991
Conversation
Signed-off-by: Lantao Jin <ltjin@amazon.com>
📝 WalkthroughWalkthroughThis PR adds a new PPL dedup test case with corresponding expected query and plan outputs, while enhancing array detection in ObjectContent, refactoring primitive value extraction in OpenSearchExprValueFactory, and adding fallback logic to DedupPushdownRule for safer column index mapping. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
|
cc @aaarone90 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @LantaoJin, thank you for the changes. The dedup command now works, but looks like the query execution plan changed. Looks like it now includes a PROJECT pushdown optimization:
curl -X POST "localhost:9200/_plugins/_ppl/_explain" \
-H "Content-Type: application/json" \
-d '{
"query": "source = big5 | dedup metrics.size | sort - @timestamp"
}'
{
"calcite": {
"logical": "LogicalSystemLimit(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], data_stream=[$18], host=[$22], metrics=[$24], aws=[$27], event=[$32])\n LogicalSort(sort0=[$17], dir0=[DESC-nulls-last])\n LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], data_stream=[$18], data_stream.dataset=[$19], data_stream.namespace=[$20], data_stream.type=[$21], host=[$22], host.name=[$23], metrics=[$24], metrics.size=[$25], metrics.tmin=[$26], aws=[$27], aws.cloudwatch=[$28], aws.cloudwatch.ingestion_time=[$29], aws.cloudwatch.log_group=[$30], aws.cloudwatch.log_stream=[$31], event=[$32], event.dataset=[$33], event.id=[$34], event.ingested=[$35], _id=[$36], _index=[$37], _score=[$38], _maxscore=[$39], _sort=[$40], _routing=[$41])\n LogicalFilter(condition=[<=($42, 1)])\n LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], data_stream=[$18], data_stream.dataset=[$19], data_stream.namespace=[$20], data_stream.type=[$21], host=[$22], host.name=[$23], metrics=[$24], metrics.size=[$25], metrics.tmin=[$26], aws=[$27], aws.cloudwatch=[$28], aws.cloudwatch.ingestion_time=[$29], aws.cloudwatch.log_group=[$30], aws.cloudwatch.log_stream=[$31], event=[$32], event.dataset=[$33], event.id=[$34], event.ingested=[$35], _id=[$36], _index=[$37], _score=[$38], _maxscore=[$39], _sort=[$40], _routing=[$41], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $25)])\n LogicalFilter(condition=[IS NOT NULL($25)])\n CalciteLogicalIndexScan(table=[[OpenSearch, big5]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last])\n CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[agent, process, log, message, tags, cloud, input, @timestamp, data_stream, host, metrics, metrics.size, aws, event], AGGREGATION->rel#2183:LogicalAggregate.NONE.[](input=LogicalProject#2181,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"agent\",\"process\",\"log\",\"message\",\"tags\",\"cloud\",\"input\",\"@timestamp\",\"data_stream\",\"host\",\"metrics\",\"metrics.size\",\"aws\",\"event\"],\"excludes\":[]},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10000,\"sources\":[{\"metrics.size\":{\"terms\":{\"field\":\"metrics.size\",\"missing_bucket\":false,\"order\":\"asc\"}}}]},\"aggregations\":{\"$f1\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"metrics.size\",\"agent\",\"process\",\"log\",\"message\",\"tags\",\"cloud\",\"input\",\"@timestamp\",\"data_stream\",\"host\",\"metrics\",\"aws\",\"event\"],\"excludes\":[]},\"script_fields\":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it now includes a PROJECT pushdown optimization
Did you run on the latest code? I didn't see the project pushdown action in the explain output.
yuancu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
The backport to To backport manually, run these commands in your terminal: # Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/sql/backport-2.19-dev 2.19-dev
# Navigate to the new working tree
pushd ../.worktrees/sql/backport-2.19-dev
# Create a new branch
git switch --create backport/backport-4991-to-2.19-dev
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 1192376a7b0856375b3dbea6c54ed3420e593b7d
# Push it to GitHub
git push --set-upstream origin backport/backport-4991-to-2.19-dev
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/sql/backport-2.19-devThen, create a pull request where the |
…ch-project#4991) Signed-off-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit 1192376)
Description
#4844 converted
dedupto TopHits Agg. But failed to parse dedup column if the column is a child of Object field.#4360 restored the internal primitive value in a Map for Aggregates (first, last, min, max) which stored these Map objects in their accumulators.(first, last, min, max) stored these Map objects in their accumulators. But this fixing was not necessary since #4844 fixed them in other way.
In this PR:
ClassCastExceptionfor value-storing aggregates on nested PPL fields #4360Related Issues
Resolves #4990
Check List
--signoffor-s.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.