Add partition function expressions for chained partitioning#18165
Add partition function expressions for chained partitioning#18165xiangfu0 wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds expression-mode partitioning (functionExpr) so Pinot can derive partitions and perform query-time pruning by evaluating a deterministic scalar-function pipeline against raw column values (supporting chained transforms like fnv1a_32(md5(id))), while preserving legacy functionName behavior.
Changes:
- Introduces
functionExprin table/segment partition configs and persists it through offline + realtime metadata paths. - Adds a partition-expression compiler that builds a typed
PartitionPipelineand aPartitionPipelineFunctionadapter. - Extends broker/server pruning to evaluate the same partition pipeline for EQ/IN predicates on the raw column (with fail-open behavior on evaluation errors) and adds/updates tests.
Reviewed changes
Copilot reviewed 46 out of 46 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java | Uses functionExpr when computing partitions for backfill segment selection. |
| pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommandTest.java | Adds coverage for segment partition matching with functionExpr. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java | Adds functionExpr mode with validation to enforce exclusive config mode. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentPartitionConfig.java | Exposes getFunctionExpr() for partition configs. |
| pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java | Adds JSON round-trip and invalid-mode tests for functionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java | Adds segment metadata key for partitionFunctionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java | Extends partition function interface to optionally expose getFunctionExpr(). |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java | Adds overloads to create partition functions from configs/metadata, including expression mode. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java | Persists/loads functionExpr in partition metadata JSON. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java | Reads partitionFunctionExpr from segment properties and constructs the correct partition function. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java | Implements compilation of restricted scalar-function expressions into a typed pipeline. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipelineFunction.java | Adapts compiled pipelines to the PartitionFunction interface. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipeline.java | Defines the immutable compiled pipeline and evaluation logic. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionStep.java | Defines a typed pipeline step contract and runtime validation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValue.java | Adds typed runtime value wrapper for pipeline evaluation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValueType.java | Defines supported type system for the partition pipeline compiler. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java | Adds normalization strategies to match legacy partition-id semantics. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompilerTest.java | Adds compiler coverage (canonicalization, literals, determinism, thread-safety, legacy normalization). |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprTestFunctions.java | Provides test-only scalar functions to exercise the compiler in pinot-segment-spi. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprRacyTestFunctions.java | Provides a non-static racy scalar function to validate thread-local targets. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java | Validates partition configs by constructing the partition function (including expression mode). |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java | Adds validation tests for functionExpr, including literal args and invalid output type. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java | Exposes getPartitionFunctionExpr() to segment creation stats collection. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java | Creates partition functions using either functionName or functionExpr. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java | Persists partitionFunctionExpr into segment metadata properties when present. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java | Builds realtime partition function from ColumnPartitionConfig (supports expr mode). |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java | Preserves functionExpr when reporting segment partition config. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java | Adds coverage ensuring expression partition metadata is preserved across realtime conversion. |
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java | Uses factory method that supports expression-mode partition functions. |
| pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java | Uses fail-open partition evaluation for EQ/IN pruning and supports expr-based partition functions. |
| pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java | Adds pruning coverage for functionExpr and fail-open behavior when evaluation throws. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | Builds realtime partition function from config with expression support. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java | Detects staleness when partition function expression changes (in addition to name/partition count). |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java | Persists expression partition metadata into ZK using the effective partition function. |
| pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java | Writes partition metadata including functionExpr into ZK metadata. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java | Marks request-context dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java | Marks time-dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java | Adds hash aliases (murmur2, murmur3_32, fnv1*_xx, md5_raw) for partition-expression compatibility. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java | Treats invalid expression metadata as unprunable by catching construction exceptions. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java | Tracks partition function expression for routing metadata validation. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java | Initializes partition metadata manager with effective function name + expression. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/PartitionFunctionExprSegmentPrunerTest.java | Adds broker pruning tests for expression-mode partitions and fail-open on bad literals/metadata. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java | Extends existing routing/pruning tests to cover functionExpr. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java | Adds metadata-manager coverage for expression-mode partition metadata handling. |
...i/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java
Show resolved
Hide resolved
645dc07 to
f230f4d
Compare
.../src/main/java/org/apache/pinot/segment/spi/partition/LegacyExpressionPartitionFunction.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java
Show resolved
Hide resolved
9b026ad to
ffdb486
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18165 +/- ##
==========================================
Coverage 63.13% 63.14%
- Complexity 1610 1736 +126
==========================================
Files 3213 3221 +8
Lines 195730 196629 +899
Branches 30240 30423 +183
==========================================
+ Hits 123583 124169 +586
- Misses 62281 62475 +194
- Partials 9866 9985 +119
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ffdb486 to
2bc240a
Compare
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
Outdated
Show resolved
Hide resolved
...i/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java
Show resolved
Hide resolved
...main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java
Show resolved
Hide resolved
f53f97f to
d5aef71
Compare
d5aef71 to
4c371b0
Compare
| return (int) (partition < 0 ? partition + numPartitions : partition); | ||
| } | ||
| }, | ||
| ABS { |
There was a problem hiding this comment.
There are 2 ways to do abs:
- The one showing here (used in
ModuloPartitionFunction) - First calculate abs of value, then do modulo (used in
HashCodePartitionFunction)
| } | ||
|
|
||
| public static PartitionIntNormalizer fromConfigString(@Nullable String partitionIdNormalizer) { | ||
| Preconditions.checkArgument(partitionIdNormalizer != null && !partitionIdNormalizer.trim().isEmpty(), |
There was a problem hiding this comment.
This pre-condition check means partitionIdNormalizer cannot be null
| /** | ||
| * Normalizes the final INT output from a compiled partition pipeline into a partition id. | ||
| */ | ||
| public enum PartitionIntNormalizer { |
There was a problem hiding this comment.
For each mode, we need to differentiate pre-modulo vs post-modulo.
For pre-modulo, there are 2 common ways to make the value positive:
abs()(need to handle MIN_VALUE)- mask
For post-modulo, there are 2 common ways to make the partition id positive: abs()+ numPartitions
For existing partition function, do you think we can put a default normalizer (based on current impl)? This way user can use a different normalizer if necessary for common partition function
| import static org.testng.Assert.assertTrue; | ||
|
|
||
|
|
||
| public class LaunchBackfillIngestionJobCommandTest { |
There was a problem hiding this comment.
I don't think we need this test. The partition function related tests are covered on its own
| private ScalarFunctionUtils() { | ||
| } |
There was a problem hiding this comment.
(nit) We usually put this next to the class definition for util class for readability
| @@ -72,10 +75,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi | |||
| private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo; | |||
|
|
|||
| public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName, | |||
There was a problem hiding this comment.
Directly pass in ColumnPartitionConfig
| this(functionName, numPartitions, partitions, functionConfig, null, null); | ||
| } | ||
|
|
||
| public ColumnPartitionMetadata(String functionName, int numPartitions, Set<Integer> partitions, |
There was a problem hiding this comment.
Don't add these new constructors. We should deprecate the existing constructor and always take PartitionFunction
|
|
||
| @Nullable | ||
| private static String normalizeOptionalText(@Nullable String value) { | ||
| return StringUtils.isBlank(value) || StringUtils.equalsIgnoreCase(value, "null") ? null : value; |
There was a problem hiding this comment.
(minor) StringUtils.equalsIgnoreCase() is deprecated
| public int hashCode() { | ||
| return 37 * 37 * _functionName.hashCode() + 37 * _numPartitions + _partitions.hashCode() | ||
| + Objects.hashCode(_functionConfig); | ||
| + Objects.hashCode(_functionConfig) + Objects.hashCode(_functionExpr) |
There was a problem hiding this comment.
Consider adding equals and hash to PartitionFunction, or simply remove them. I don't think we need to compare metadata
| * Compiles a restricted partition-function expression into a typed {@link PartitionPipeline} backed by deterministic | ||
| * scalar functions. | ||
| */ | ||
| public final class PartitionFunctionExprCompiler { |
There was a problem hiding this comment.
Can we reuse existing InbuiltFunctionEvaluator? We don't want to maintain multiple logic of function evaluation. With FunctionEvaluator, you can even allow groovy
What changed
This adds partition function expressions so Pinot can compute segment partitions and query-time pruning partitions from the raw column through a deterministic scalar-function pipeline instead of a single hard-coded partition function.
Key changes:
functionExprwhile preserving existingfunctionNamebehaviorpartitionIdNormalizerfor expression mode, defaulting toPOSITIVE_MODULOpinot-segment-spiWhy
Pinot's partition model assumed one partition function per raw column. That breaks down for common production layouts where the upstream partition key is derived by chaining transforms such as
md5(id) -> fnv1a_32(...) -> partitionorlower(key) -> murmur2(...) -> partition. Derived ingestion columns do not preserve correct pruning semantics when queries still filter on the raw column.This PR keeps partitioning attached to the raw column and evaluates the same deterministic pipeline against query literals so pruning remains correct.
Example table config
A table can now configure partitioning on the raw column with
functionExprinsidesegmentPartitionConfig:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128 } } } } }When
partitionIdNormalizeris omitted, Pinot usesPOSITIVE_MODULOby default.If an operator needs compatibility with an existing hash-to-partition mapping, expression mode can also configure the normalizer explicitly. For example,
ABSpreserves absolute-remainder semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "ABS" } } } } }Or
MASKpreserves sign-bit masking semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" } } } } }Another supported example is:
{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionExpr": "murmur2(lower(memberId))", "numPartitions": 8 } } } } }Partition-id normalization
Expression mode now separates two concerns:
functionExprcomputes the deterministic scalar pipeline from the raw column to an integer hash candidatepartitionIdNormalizerconverts that integer into the final partition id in[0, numPartitions)Supported normalizers are:
POSITIVE_MODULO: default behavior for expression mode; uses modulo with negative remainders shifted back into[0, numPartitions)ABS: compatibility mode for layouts that expectabs(hash % numPartitions)semanticsMASK: compatibility mode for hash pipelines that expect sign-bit masking semanticsQuery behavior
Users still query the raw column. There is no query rewrite and no need to expose a derived ingestion column.
For example, with the config above users write:
or:
Pinot evaluates the same expression pipeline against the raw query literal(s) during pruning.
For the compatibility config:
{ "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" }The UUID
000016be-9d72-466c-9632-cfa680dc8fa3maps to partition104, so an equality predicate on that rawidprunes using partition104.Safety and correctness
This also fixes a few issues discovered during review:
Validation
Ran:
./mvnw spotless:apply -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw checkstyle:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:format -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw -pl pinot-broker -am -Dtest=PartitionFunctionExprSegmentPrunerTest,SegmentPartitionMetadataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-core -am -Dtest=BaseTableDataManagerNeedRefreshTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-segment-local -Dtest=TableConfigUtilsTest,RealtimeSegmentConverterTest test./mvnw -pl pinot-segment-spi -Dtest=PartitionFunctionExprCompilerTest test./mvnw -pl pinot-spi -Dtest=IndexingConfigTest testNotable regression coverage includes:
POSITIVE_MODULOABSnormalization for negative-hash absolute-remainder compatibilityMASKnormalization forfnv1a_32(md5(id))with 128 partitions and UUID000016be-9d72-466c-9632-cfa680dc8fa3mapping to partition 104