Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ com.squareup.wire:wire-runtime-jvm:5.1.0
com.squareup.wire:wire-schema-jvm:5.1.0
com.squareup:javapoet:1.13.0
com.squareup:kotlinpoet-jvm:1.18.1
com.tdunning:t-digest:3.2
com.tdunning:t-digest:3.3
com.typesafe.scala-logging:scala-logging_2.13:3.9.5
com.uber:h3:4.4.0
com.yammer.metrics:metrics-core:2.2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,26 @@ public String callStr(String column, int percent) {
return "PERCENTILESMARTTDIGEST(" + column + ", " + percent + ", 'THRESHOLD=1')";
}

// t-digest 3.3 changed interpolation for small datasets: values snap to integers
// instead of interpolating between adjacent values (e.g., p10 returns 1.0 not 0.5)
@Override
String expectedAggrWithNull10(Scenario scenario) {
return "0.5";
return "1.0";
}

@Override
String expectedAggrWithNull30(Scenario scenario) {
return "2.5";
return "3.0";
}

@Override
String expectedAggrWithNull50(Scenario scenario) {
return "4.5";
return "5.0";
}

@Override
String expectedAggrWithNull70(Scenario scenario) {
return "6.5";
}

@Override
String expectedAggrWithoutNull55(Scenario scenario) {
switch (scenario.getDataType()) {
case INT:
return "-6.442450943999939E8";
case LONG:
return "-2.7670116110564065E18";
case FLOAT:
case DOUBLE:
return "-Infinity";
default:
throw new IllegalArgumentException("Unsupported datatype " + scenario.getDataType());
}
return "7.0";
}

@Override
Expand All @@ -76,7 +63,7 @@ String expectedAggrWithoutNull75(Scenario scenario) {

@Override
String expectedAggrWithoutNull90(Scenario scenario) {
return "7.100000000000001";
return "7.0";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public void testAggregationMV() {
.andOnSecondInstance(
new Object[]{"6.0;7.0;8.0;9.0;10.0"}
)
// All values: 1-10, p50 should be around 5
// All values: 1-10, p50 (t-digest approximate)
.whenQuery("select percentiletdigest(mv, 50) from testTable")
.thenResultIs("DOUBLE", "5.5");
.thenResultIs("DOUBLE", "6.0");
}

@Test
Expand All @@ -66,7 +66,7 @@ public void testAggregationMVGroupBySV() {
)
.whenQuery("select sv, percentiletdigest(mv, 50) from testTable group by sv order by sv")
.thenResultIs("STRING | DOUBLE",
"k1 | 5.5", // values: 1-10, p50 ~= 5.5
"k1 | 6.0", // values: 1-10, p50 (t-digest approximate)
"k2 | 30.0"); // values: 10, 20, 30, 40, 50, p50 ~= 30
}

Expand All @@ -89,7 +89,7 @@ public void testAggregationMVGroupByMV() {
)
.whenQuery("select tags, percentiletdigest(nums, 50) from testTable group by tags order by tags")
.thenResultIs("STRING | DOUBLE",
"tag1 | 3.5", // nums: 1, 2, 3, 4, 5, 6, p50 ~= 3.5
"tag2 | 3.5"); // nums: 1, 2, 3, 4, 5, 6, p50 ~= 3.5
"tag1 | 4.0", // nums: 1, 2, 3, 4, 5, 6, p50 (t-digest approximate)
"tag2 | 4.0"); // nums: 1, 2, 3, 4, 5, 6, p50 (t-digest approximate)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@


public class PreAggregatedPercentileTDigestStarTreeV2Test extends BaseStarTreeV2Test<Object, TDigest> {
// Use non-default compression
private static final double COMPRESSION = 50;
// Use high compression to keep star-tree vs non-star-tree quantile divergence within 0.5%.
// t-digest 3.3 changed centroid management (unit-weight first/last centroids, stricter tail interpolation),
// which increases merge-order sensitivity. The star-tree path does multi-level serialize/deserialize/merge
// while the non-star-tree path merges sequentially, causing quantile divergence at low compression values.
// Experimentally verified: compression >= 750 keeps error < 0.5% across 10 randomized runs.
private static final double COMPRESSION = 750;
private static final double MAX_ERROR = 0.005;
private static final int MAX_VALUE = 10000;

@Override
Expand All @@ -54,7 +59,7 @@ Object getRandomRawValue(Random random) {

@Override
void assertAggregatedValue(TDigest starTreeResult, TDigest nonStarTreeResult) {
double delta = MAX_VALUE * 0.05;
double delta = MAX_VALUE * MAX_ERROR;
for (int i = 0; i <= 100; i++) {
assertEquals(starTreeResult.quantile(i / 100.0), nonStarTreeResult.quantile(i / 100.0), delta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
public class PercentileTDigestValueAggregator implements ValueAggregator<Object, TDigest> {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;

// TODO: This is copied from PercentileTDigestAggregationFunction.
public static final int DEFAULT_TDIGEST_COMPRESSION = 100;
// NOTE: This is intentionally higher than the query-time default (100) in PercentileTDigestAggregationFunction.
// t-digest 3.3 has higher merge-order sensitivity, and star-tree building involves multi-level merge with
// intermediate serialization/deserialization. At compression=100, this causes ~0.35% error (vs 0.06% in 3.2).
// Compression=500 brings error back to ~0.09%, comparable to 3.2's accuracy at compression=100.
public static final int DEFAULT_TDIGEST_COMPRESSION = 500;
private final int _compressionFactor;
private int _maxByteSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.aggregator.PercentileTDigestValueAggregator;
import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.Constants;
Expand Down Expand Up @@ -395,6 +396,12 @@ public static List<ExpressionContext> expressionContextFromFunctionParameters(
expressionContexts.add(ExpressionContext.forLiteral(
Literal.intValue(Integer.parseInt(String.valueOf(
functionParameters.get(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY))))));
} else {
// Always inject the default compression so it is stored in star-tree metadata.
// This allows canUseStarTree() to distinguish old segments (no metadata, built with compression=100)
// from new segments (metadata present, built with the current default).
Comment on lines +400 to +402
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here says injecting the default compression will be “stored in star-tree metadata” and used by canUseStarTree() to distinguish old vs new segments. However, this method only returns ExpressionContext arguments for ValueAggregatorFactory; star-tree metadata persistence is based on AggregationSpec.getFunctionParameters() (see StarTreeV2Metadata.writeMetadata). If AggregationSpec.functionParameters is empty (e.g., aggregationConfigs without functionParameters), metadata will still omit compressionFactor and canUseStarTree() will treat it as default=100 while the builder will create digests at DEFAULT_TDIGEST_COMPRESSION=500. Consider fixing this by ensuring AggregationSpec.functionParameters always includes compressionFactor for PERCENTILETDIGEST/PERCENTILERAWTDIGEST when absent, and adjust/remove this comment accordingly.

Suggested change
// Always inject the default compression so it is stored in star-tree metadata.
// This allows canUseStarTree() to distinguish old segments (no metadata, built with compression=100)
// from new segments (metadata present, built with the current default).
// Persist the default compression in functionParameters so downstream star-tree metadata writers
// record the same compression used by the builder when the config omits it.
functionParameters.put(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY,
PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION);

Copilot uses AI. Check for mistakes.
expressionContexts.add(ExpressionContext.forLiteral(
Literal.intValue(PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION)));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pinot.segment.local.aggregator.PercentileTDigestValueAggregator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.AggregationSpec;
Expand Down Expand Up @@ -80,7 +82,8 @@ public static StarTreeV2BuilderConfig fromIndexConfig(StarTreeIndexConfig indexC
AggregationFunctionColumnPair.resolveToStoredType(aggregationFunctionColumnPair);
// If there is already an equivalent functionColumnPair in the map, do not load another.
// This prevents the duplication of the aggregation when the StarTree is constructed.
aggregationSpecs.putIfAbsent(storedType, AggregationSpec.DEFAULT);
aggregationSpecs.putIfAbsent(storedType,
getDefaultAggregationSpec(aggregationFunctionColumnPair.getFunctionType()));
}
}
if (indexConfig.getAggregationConfigs() != null) {
Expand Down Expand Up @@ -323,4 +326,22 @@ public String toString() {
.append("skipStarNodeCreation", _skipStarNodeCreationForDimensions)
.append("aggregationSpecs", _aggregationSpecs).append("maxLeafRecords", _maxLeafRecords).toString();
}

/**
* Returns the default {@link AggregationSpec} for the given function type. For PERCENTILETDIGEST and
* PERCENTILERAWTDIGEST, the default compression factor is explicitly included in the function parameters so that
* it is persisted in star-tree metadata. This allows {@code canUseStarTree()} to distinguish old segments (built
* with the previous default of 100, which have no compression metadata) from new segments.
*/
private static AggregationSpec getDefaultAggregationSpec(AggregationFunctionType functionType) {
switch (functionType) {
case PERCENTILETDIGEST:
case PERCENTILERAWTDIGEST:
return new AggregationSpec(null, null, null, null, null,
Map.of(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY,
PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION));
default:
return AggregationSpec.DEFAULT;
}
Comment on lines +330 to +345
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By persisting compressionFactor=PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION (500) into star-tree metadata, PercentileTDigestAggregationFunction.canUseStarTree() will reject using the star-tree for queries that omit the compression argument (query-time default is still 100). This means percentiletdigest(col, p) queries will stop using the star-tree after segments are rebuilt unless users update queries to specify compression=500 (or canUseStarTree is relaxed to allow higher-compression star-trees). If this behavior change is intended, it would be good to make it explicit in code/comments and ensure there’s a migration path for existing star-tree users.

Copilot uses AI. Check for mistakes.
}
}
Loading
Loading