Skip to content

Add partition function expressions for chained partitioning#18165

Open
xiangfu0 wants to merge 1 commit intoapache:masterfrom
xiangfu0:chain-partition-functions
Open

Add partition function expressions for chained partitioning#18165
xiangfu0 wants to merge 1 commit intoapache:masterfrom
xiangfu0:chain-partition-functions

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Apr 10, 2026

What changed

This adds partition function expressions so Pinot can compute segment partitions and query-time pruning partitions from the raw column through a deterministic scalar-function pipeline instead of a single hard-coded partition function.

Key changes:

  • add expression-mode partition config support via functionExpr while preserving existing functionName behavior
  • add optional partitionIdNormalizer for expression mode, defaulting to POSITIVE_MODULO
  • compile restricted partition expressions into a typed pipeline in pinot-segment-spi
  • allow deterministic scalar functions, including varargs with literals, while enforcing a single raw-column argument chain
  • persist expression-based partition metadata, including the partition-id normalizer, through offline and realtime paths
  • enable broker and server pruning for equality and IN predicates on the raw column using the same partition pipeline
  • add regression coverage for default normalization and explicit compatibility normalizers

Why

Pinot's partition model assumed one partition function per raw column. That breaks down for common production layouts where the upstream partition key is derived by chaining transforms such as md5(id) -> fnv1a_32(...) -> partition or lower(key) -> murmur2(...) -> partition. Derived ingestion columns do not preserve correct pruning semantics when queries still filter on the raw column.

This PR keeps partitioning attached to the raw column and evaluates the same deterministic pipeline against query literals so pruning remains correct.

Example table config

A table can now configure partitioning on the raw column with functionExpr inside segmentPartitionConfig:

{
  "indexingConfig": {
    "segmentPartitionConfig": {
      "columnPartitionMap": {
        "id": {
          "functionExpr": "fnv1a_32(md5(id))",
          "numPartitions": 128
        }
      }
    }
  }
}

When partitionIdNormalizer is omitted, Pinot uses POSITIVE_MODULO by default.

If an operator needs compatibility with an existing hash-to-partition mapping, expression mode can also configure the normalizer explicitly. For example, ABS preserves absolute-remainder semantics:

{
  "indexingConfig": {
    "segmentPartitionConfig": {
      "columnPartitionMap": {
        "id": {
          "functionExpr": "fnv1a_32(md5(id))",
          "numPartitions": 128,
          "partitionIdNormalizer": "ABS"
        }
      }
    }
  }
}

Or MASK preserves sign-bit masking semantics:

{
  "indexingConfig": {
    "segmentPartitionConfig": {
      "columnPartitionMap": {
        "id": {
          "functionExpr": "fnv1a_32(md5(id))",
          "numPartitions": 128,
          "partitionIdNormalizer": "MASK"
        }
      }
    }
  }
}

Another supported example is:

{
  "indexingConfig": {
    "segmentPartitionConfig": {
      "columnPartitionMap": {
        "memberId": {
          "functionExpr": "murmur2(lower(memberId))",
          "numPartitions": 8
        }
      }
    }
  }
}

Partition-id normalization

Expression mode now separates two concerns:

  • functionExpr computes the deterministic scalar pipeline from the raw column to an integer hash candidate
  • partitionIdNormalizer converts that integer into the final partition id in [0, numPartitions)

Supported normalizers are:

  • POSITIVE_MODULO: default behavior for expression mode; uses modulo with negative remainders shifted back into [0, numPartitions)
  • ABS: compatibility mode for layouts that expect abs(hash % numPartitions) semantics
  • MASK: compatibility mode for hash pipelines that expect sign-bit masking semantics

Query behavior

Users still query the raw column. There is no query rewrite and no need to expose a derived ingestion column.

For example, with the config above users write:

SELECT *
FROM myTable
WHERE id = '000016be-9d72-466c-9632-cfa680dc8fa3'

or:

SELECT *
FROM myTable
WHERE id IN (
  '000016be-9d72-466c-9632-cfa680dc8fa3',
  '00000000-0000-0000-0000-000000000001'
)

Pinot evaluates the same expression pipeline against the raw query literal(s) during pruning.

For the compatibility config:

{
  "functionExpr": "fnv1a_32(md5(id))",
  "numPartitions": 128,
  "partitionIdNormalizer": "MASK"
}

The UUID 000016be-9d72-466c-9632-cfa680dc8fa3 maps to partition 104, so an equality predicate on that raw id prunes using partition 104.

Safety and correctness

This also fixes a few issues discovered during review:

  • non-static scalar functions now use thread-local target instances in the partition-expression compiler
  • broker and server pruning fail open if partition-function evaluation throws for a query literal
  • broker partition metadata refresh treats invalid expression metadata as unprunable instead of failing the refresh path
  • partition metadata persists the configured partition-id normalizer so broker/server refresh logic can detect semantic changes correctly

Validation

Ran:

  • ./mvnw spotless:apply -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi
  • ./mvnw checkstyle:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi
  • ./mvnw license:format -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi
  • ./mvnw license:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi
  • ./mvnw -pl pinot-broker -am -Dtest=PartitionFunctionExprSegmentPrunerTest,SegmentPartitionMetadataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test
  • ./mvnw -pl pinot-core -am -Dtest=BaseTableDataManagerNeedRefreshTest -Dsurefire.failIfNoSpecifiedTests=false test
  • ./mvnw -pl pinot-segment-local -Dtest=TableConfigUtilsTest,RealtimeSegmentConverterTest test
  • ./mvnw -pl pinot-segment-spi -Dtest=PartitionFunctionExprCompilerTest test
  • ./mvnw -pl pinot-spi -Dtest=IndexingConfigTest test

Notable regression coverage includes:

  • default expression-mode normalization using POSITIVE_MODULO
  • explicit ABS normalization for negative-hash absolute-remainder compatibility
  • explicit MASK normalization for fnv1a_32(md5(id)) with 128 partitions and UUID 000016be-9d72-466c-9632-cfa680dc8fa3 mapping to partition 104
  • broker pruning for the same UUID equality predicate
  • fail-open behavior for invalid literals and invalid partition-expression metadata

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds expression-mode partitioning (functionExpr) so Pinot can derive partitions and perform query-time pruning by evaluating a deterministic scalar-function pipeline against raw column values (supporting chained transforms like fnv1a_32(md5(id))), while preserving legacy functionName behavior.

Changes:

  • Introduces functionExpr in table/segment partition configs and persists it through offline + realtime metadata paths.
  • Adds a partition-expression compiler that builds a typed PartitionPipeline and a PartitionPipelineFunction adapter.
  • Extends broker/server pruning to evaluate the same partition pipeline for EQ/IN predicates on the raw column (with fail-open behavior on evaluation errors) and adds/updates tests.

Reviewed changes

Copilot reviewed 46 out of 46 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java Uses functionExpr when computing partitions for backfill segment selection.
pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommandTest.java Adds coverage for segment partition matching with functionExpr.
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java Adds functionExpr mode with validation to enforce exclusive config mode.
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentPartitionConfig.java Exposes getFunctionExpr() for partition configs.
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java Adds JSON round-trip and invalid-mode tests for functionExpr.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java Adds segment metadata key for partitionFunctionExpr.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java Extends partition function interface to optionally expose getFunctionExpr().
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java Adds overloads to create partition functions from configs/metadata, including expression mode.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java Persists/loads functionExpr in partition metadata JSON.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java Reads partitionFunctionExpr from segment properties and constructs the correct partition function.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java Implements compilation of restricted scalar-function expressions into a typed pipeline.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipelineFunction.java Adapts compiled pipelines to the PartitionFunction interface.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipeline.java Defines the immutable compiled pipeline and evaluation logic.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionStep.java Defines a typed pipeline step contract and runtime validation.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValue.java Adds typed runtime value wrapper for pipeline evaluation.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValueType.java Defines supported type system for the partition pipeline compiler.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java Adds normalization strategies to match legacy partition-id semantics.
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompilerTest.java Adds compiler coverage (canonicalization, literals, determinism, thread-safety, legacy normalization).
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprTestFunctions.java Provides test-only scalar functions to exercise the compiler in pinot-segment-spi.
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprRacyTestFunctions.java Provides a non-static racy scalar function to validate thread-local targets.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java Validates partition configs by constructing the partition function (including expression mode).
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java Adds validation tests for functionExpr, including literal args and invalid output type.
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java Exposes getPartitionFunctionExpr() to segment creation stats collection.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java Creates partition functions using either functionName or functionExpr.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java Persists partitionFunctionExpr into segment metadata properties when present.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java Builds realtime partition function from ColumnPartitionConfig (supports expr mode).
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java Preserves functionExpr when reporting segment partition config.
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java Adds coverage ensuring expression partition metadata is preserved across realtime conversion.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java Uses factory method that supports expression-mode partition functions.
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java Uses fail-open partition evaluation for EQ/IN pruning and supports expr-based partition functions.
pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java Adds pruning coverage for functionExpr and fail-open behavior when evaluation throws.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java Builds realtime partition function from config with expression support.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java Detects staleness when partition function expression changes (in addition to name/partition count).
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java Persists expression partition metadata into ZK using the effective partition function.
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java Writes partition metadata including functionExpr into ZK metadata.
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java Marks request-context dependent scalar functions as non-deterministic.
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java Marks time-dependent scalar functions as non-deterministic.
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java Adds hash aliases (murmur2, murmur3_32, fnv1*_xx, md5_raw) for partition-expression compatibility.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java Treats invalid expression metadata as unprunable by catching construction exceptions.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java Tracks partition function expression for routing metadata validation.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java Initializes partition metadata manager with effective function name + expression.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java Adds fail-open behavior for partition evaluation errors.
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java Adds fail-open behavior for partition evaluation errors.
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/PartitionFunctionExprSegmentPrunerTest.java Adds broker pruning tests for expression-mode partitions and fail-open on bad literals/metadata.
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java Extends existing routing/pruning tests to cover functionExpr.
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java Adds metadata-manager coverage for expression-mode partition metadata handling.

@xiangfu0 xiangfu0 force-pushed the chain-partition-functions branch 2 times, most recently from 645dc07 to f230f4d Compare April 11, 2026 00:25
@xiangfu0 xiangfu0 marked this pull request as ready for review April 11, 2026 00:28
@xiangfu0 xiangfu0 requested review from Copilot April 11, 2026 00:29
@xiangfu0 xiangfu0 added the feature New functionality label Apr 11, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 55 out of 55 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 force-pushed the chain-partition-functions branch 2 times, most recently from 9b026ad to ffdb486 Compare April 11, 2026 02:29
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 11, 2026

Codecov Report

❌ Patch coverage is 60.53443% with 384 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.14%. Comparing base (34d3fd6) to head (4c371b0).

Files with missing lines Patch % Lines
...tition/pipeline/PartitionFunctionExprCompiler.java 59.36% 186 Missing and 68 partials ⚠️
...segment/spi/partition/pipeline/PartitionValue.java 45.09% 23 Missing and 5 partials ⚠️
...outing/segmentpartition/SegmentPartitionUtils.java 44.44% 10 Missing ⚠️
...egment/spi/partition/PartitionFunctionFactory.java 16.66% 9 Missing and 1 partial ⚠️
...spi/partition/pipeline/PartitionIntNormalizer.java 54.54% 5 Missing and 5 partials ⚠️
...mentpruner/MultiPartitionColumnsSegmentPruner.java 0.00% 9 Missing ⚠️
...mentpartition/SegmentPartitionMetadataManager.java 0.00% 8 Missing ⚠️
.../pinot/core/data/manager/BaseTableDataManager.java 36.36% 4 Missing and 3 partials ⚠️
...pi/partition/metadata/ColumnPartitionMetadata.java 73.91% 2 Missing and 4 partials ⚠️
...oker/routing/manager/BaseBrokerRoutingManager.java 0.00% 5 Missing ⚠️
... and 14 more
Additional details and impacted files
@@            Coverage Diff             @@
##             master   #18165    +/-   ##
==========================================
  Coverage     63.13%   63.14%            
- Complexity     1610     1736   +126     
==========================================
  Files          3213     3221     +8     
  Lines        195730   196629   +899     
  Branches      30240    30423   +183     
==========================================
+ Hits         123583   124169   +586     
- Misses        62281    62475   +194     
- Partials       9866     9985   +119     
Flag Coverage Δ
integration ?
integration2 ?
java-11 63.12% <60.53%> (+0.01%) ⬆️
java-21 63.10% <60.53%> (+<0.01%) ⬆️
temurin 63.14% <60.53%> (+<0.01%) ⬆️
unittests 63.14% <60.53%> (+<0.01%) ⬆️
unittests1 55.40% <60.67%> (+0.02%) ⬆️
unittests2 34.65% <7.19%> (-0.12%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 force-pushed the chain-partition-functions branch from ffdb486 to 2bc240a Compare April 11, 2026 06:35
@xiangfu0 xiangfu0 requested a review from Jackie-Jiang April 11, 2026 06:38
@xiangfu0 xiangfu0 force-pushed the chain-partition-functions branch 2 times, most recently from f53f97f to d5aef71 Compare April 11, 2026 10:38
@xiangfu0 xiangfu0 force-pushed the chain-partition-functions branch from d5aef71 to 4c371b0 Compare April 11, 2026 12:35
return (int) (partition < 0 ? partition + numPartitions : partition);
}
},
ABS {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 ways to do abs:

  • The one showing here (used in ModuloPartitionFunction)
  • First calculate abs of value, then do modulo (used in HashCodePartitionFunction)

}

public static PartitionIntNormalizer fromConfigString(@Nullable String partitionIdNormalizer) {
Preconditions.checkArgument(partitionIdNormalizer != null && !partitionIdNormalizer.trim().isEmpty(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pre-condition check means partitionIdNormalizer cannot be null

/**
* Normalizes the final INT output from a compiled partition pipeline into a partition id.
*/
public enum PartitionIntNormalizer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each mode, we need to differentiate pre-modulo vs post-modulo.
For pre-modulo, there are 2 common ways to make the value positive:

  • abs() (need to handle MIN_VALUE)
  • mask
    For post-modulo, there are 2 common ways to make the partition id positive:
  • abs()
  • + numPartitions

For existing partition function, do you think we can put a default normalizer (based on current impl)? This way user can use a different normalizer if necessary for common partition function

import static org.testng.Assert.assertTrue;


public class LaunchBackfillIngestionJobCommandTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this test. The partition function related tests are covered on its own

Comment on lines +36 to +37
private ScalarFunctionUtils() {
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) We usually put this next to the class definition for util class for readability

@@ -72,10 +75,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;

public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly pass in ColumnPartitionConfig

this(functionName, numPartitions, partitions, functionConfig, null, null);
}

public ColumnPartitionMetadata(String functionName, int numPartitions, Set<Integer> partitions,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add these new constructors. We should deprecate the existing constructor and always take PartitionFunction


@Nullable
private static String normalizeOptionalText(@Nullable String value) {
return StringUtils.isBlank(value) || StringUtils.equalsIgnoreCase(value, "null") ? null : value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) StringUtils.equalsIgnoreCase() is deprecated

public int hashCode() {
return 37 * 37 * _functionName.hashCode() + 37 * _numPartitions + _partitions.hashCode()
+ Objects.hashCode(_functionConfig);
+ Objects.hashCode(_functionConfig) + Objects.hashCode(_functionExpr)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding equals and hash to PartitionFunction, or simply remove them. I don't think we need to compare metadata

* Compiles a restricted partition-function expression into a typed {@link PartitionPipeline} backed by deterministic
* scalar functions.
*/
public final class PartitionFunctionExprCompiler {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse existing InbuiltFunctionEvaluator? We don't want to maintain multiple logic of function evaluation. With FunctionEvaluator, you can even allow groovy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants