Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.calcite.big5;

import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;

import java.io.IOException;
import org.junit.FixMethodOrder;
import org.junit.Test;
Expand Down Expand Up @@ -42,4 +44,13 @@ public void coalesce_nonexistent_field_fallback() throws IOException {
String ppl = sanitize(loadExpectedQuery("coalesce_nonexistent_field_fallback.ppl"));
timing(summary, "coalesce_nonexistent_field_fallback", ppl);
}

/** Tests deduplication by metrics.size field with sorting by timestamp. */
@Test
public void dedup_metrics_size_field() throws IOException {
String ppl = sanitize(loadExpectedQuery("dedup_metrics_size_field.ppl"));
timing(summary, "dedup_metrics_size_field", ppl);
String expected = loadExpectedPlan("big5/dedup_metrics_size_field.yaml");
assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
{
"name": "dedup_metrics_size_field",
"operation-type": "search",
"index": "{{index_name | default('big5')}}",
"body": {
"query": {
"exists": {
"field": "metrics.size",
"boost": 1.0
}
},
"_source": {
"includes": ["agent", "process", "log", "message", "tags", "cloud", "input", "@timestamp", "ecs", "data_stream", "meta", "host", "metrics", "metrics.size", "aws", "event"],
"excludes": []
}
}
}
*/
source = big5
| dedup metrics.size
| sort - @timestamp
Copy link
Contributor

@aalva500-prog aalva500-prog Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LantaoJin, thank you for the changes. The dedup command now works, but looks like the query execution plan changed. Looks like it now includes a PROJECT pushdown optimization:

curl -X POST "localhost:9200/_plugins/_ppl/_explain" \
-H "Content-Type: application/json" \
-d '{
"query": "source = big5 | dedup metrics.size | sort - @timestamp"          
}'
{
  "calcite": {
    "logical": "LogicalSystemLimit(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])\n  LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], data_stream=[$18], host=[$22], metrics=[$24], aws=[$27], event=[$32])\n    LogicalSort(sort0=[$17], dir0=[DESC-nulls-last])\n      LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], data_stream=[$18], data_stream.dataset=[$19], data_stream.namespace=[$20], data_stream.type=[$21], host=[$22], host.name=[$23], metrics=[$24], metrics.size=[$25], metrics.tmin=[$26], aws=[$27], aws.cloudwatch=[$28], aws.cloudwatch.ingestion_time=[$29], aws.cloudwatch.log_group=[$30], aws.cloudwatch.log_stream=[$31], event=[$32], event.dataset=[$33], event.id=[$34], event.ingested=[$35], _id=[$36], _index=[$37], _score=[$38], _maxscore=[$39], _sort=[$40], _routing=[$41])\n        LogicalFilter(condition=[<=($42, 1)])\n          LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], data_stream=[$18], data_stream.dataset=[$19], data_stream.namespace=[$20], data_stream.type=[$21], host=[$22], host.name=[$23], metrics=[$24], metrics.size=[$25], metrics.tmin=[$26], aws=[$27], aws.cloudwatch=[$28], aws.cloudwatch.ingestion_time=[$29], aws.cloudwatch.log_group=[$30], aws.cloudwatch.log_stream=[$31], event=[$32], event.dataset=[$33], event.id=[$34], event.ingested=[$35], _id=[$36], _index=[$37], _score=[$38], _maxscore=[$39], _sort=[$40], _routing=[$41], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $25)])\n            LogicalFilter(condition=[IS NOT NULL($25)])\n              CalciteLogicalIndexScan(table=[[OpenSearch, big5]])\n",
    "physical": "EnumerableLimit(fetch=[10000])\n  EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last])\n    CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[agent, process, log, message, tags, cloud, input, @timestamp, data_stream, host, metrics, metrics.size, aws, event], AGGREGATION->rel#2183:LogicalAggregate.NONE.[](input=LogicalProject#2181,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"agent\",\"process\",\"log\",\"message\",\"tags\",\"cloud\",\"input\",\"@timestamp\",\"data_stream\",\"host\",\"metrics\",\"metrics.size\",\"aws\",\"event\"],\"excludes\":[]},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":10000,\"sources\":[{\"metrics.size\":{\"terms\":{\"field\":\"metrics.size\",\"missing_bucket\":false,\"order\":\"asc\"}}}]},\"aggregations\":{\"$f1\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"metrics.size\",\"agent\",\"process\",\"log\",\"message\",\"tags\",\"cloud\",\"input\",\"@timestamp\",\"data_stream\",\"host\",\"metrics\",\"aws\",\"event\"],\"excludes\":[]},\"script_fields\":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
  }

Copy link
Member Author

@LantaoJin LantaoJin Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it now includes a PROJECT pushdown optimization

Did you run on the latest code? I didn't see the project pushdown action in the explain output.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
calcite:
logical: |
LogicalSystemLimit(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], ecs=[$18], data_stream=[$20], meta=[$24], host=[$26], metrics=[$27], aws=[$30], event=[$35])
LogicalSort(sort0=[$17], dir0=[DESC-nulls-last])
LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44])
LogicalFilter(condition=[<=($45, 1)])
LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $28)])
LogicalFilter(condition=[IS NOT NULL($28)])
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","agent.ephemeral_id","agent.id","agent.name","agent.type","agent.version","process","process.name","log","log.file","log.file.path","message","tags","cloud","cloud.region","input","input.type","@timestamp","ecs","ecs.version","data_stream","data_stream.dataset","data_stream.namespace","data_stream.type","meta","meta.file","host","metrics","metrics.tmin","aws","aws.cloudwatch","aws.cloudwatch.ingestion_time","aws.cloudwatch.log_group","aws.cloudwatch.log_stream","event","event.dataset","event.id","event.ingested","_id","_index","_score","_maxscore","_sort","_routing"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean isBoolean() {

@Override
public boolean isArray() {
return value instanceof ArrayNode;
return value instanceof ArrayNode || value instanceof List;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ public ExprValue construct(String jsonString, boolean supportArrays) {
* @return ExprValue
*/
public ExprValue construct(String field, Object value, boolean supportArrays) {
Object extractedValue = extractFinalPrimitiveValue(value);
return parse(new ObjectContent(extractedValue), field, type(field), supportArrays);
return parse(new ObjectContent(value), field, type(field), supportArrays);
}

private ExprValue parse(
Expand Down Expand Up @@ -218,7 +217,9 @@ private ExprValue parse(
|| type == STRUCT) {
return parseStruct(content, field, supportArrays);
} else if (typeActionMap.containsKey(type)) {
return typeActionMap.get(type).apply(content, type);
return content.isArray()
? parseArray(content, field, type, supportArrays)
: typeActionMap.get(type).apply(content, type);
} else {
throw new IllegalStateException(
String.format(
Expand Down Expand Up @@ -586,26 +587,4 @@ private ExprValue parseInnerArrayValue(
private String makeField(String path, String field) {
return path.equalsIgnoreCase(TOP_PATH) ? field : String.join(".", path, field);
}

/**
* Recursively extracts the final primitive value from nested Map structures. For example:
* {attributes={telemetry={sdk={language=java}}}} -> "java"
*
* @param value The value to extract from
* @return The extracted primitive value, or the original value if extraction is not possible
*/
@SuppressWarnings("unchecked")
private Object extractFinalPrimitiveValue(Object value) {
if (value == null || !(value instanceof Map)) {
return value;
}

Map<String, Object> map = (Map<String, Object>) value;
if (map.size() == 1) {
Object singleValue = map.values().iterator().next();
return extractFinalPrimitiveValue(singleValue);
}

return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected void apply(
.filter(pair -> ((RexInputRef) pair.getKey()).getIndex() == i)
.map(Pair::getValue)
.findFirst()
.get())
.orElse(projectWithWindow.getInput().getRowType().getFieldNames().get(i)))
.toList();
if (dedupColumnIndices.size() != dedupColumnNames.size()) {
return;
Expand Down
Loading