Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ private void buildRoutingInternal(String tableNameWithType) {
tableNameWithType, partitionConfig.getKey());
partitionMetadataManager =
new SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions());
partitionConfig.getValue());
} else {
LOGGER.warn(
"Cannot enable SegmentPartitionMetadataManager for table: {} with multiple partition columns: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -39,6 +40,8 @@
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo.PartitionInfo;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,7 +64,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi

// static content, if anything changes for the following. a rebuild of routing table is needed.
private final String _partitionColumn;
private final String _partitionFunctionName;
private final PartitionFunction _partitionFunction;
private final int _numPartitions;

// cache-able content, only follow changes if onlineSegments list (of ideal-state) is changed.
Expand All @@ -71,12 +74,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
private transient TablePartitionInfo _tablePartitionInfo;
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;

public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
int numPartitions) {
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn,
ColumnPartitionConfig columnPartitionConfig) {
_tableNameWithType = tableNameWithType;
_partitionColumn = partitionColumn;
_partitionFunctionName = partitionFunctionName;
_numPartitions = numPartitions;
_partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionConfig);
_numPartitions = _partitionFunction.getNumPartitions();
}

@Override
Expand All @@ -103,7 +106,7 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
return INVALID_PARTITION_ID;
}
PartitionFunction partitionFunction = segmentPartitionInfo.getPartitionFunction();
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) {
if (!isMatchingPartitionFunction(partitionFunction)) {
return INVALID_PARTITION_ID;
}
if (_numPartitions != partitionFunction.getNumPartitions()) {
Expand All @@ -116,6 +119,35 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
return partitions.iterator().next();
}

private String getPartitionFunctionDescription() {
String functionExpr = _partitionFunction.getFunctionExpr();
return functionExpr != null ? functionExpr : _partitionFunction.getName();
}

private boolean isMatchingPartitionFunction(PartitionFunction partitionFunction) {
return _partitionFunction.getName().equalsIgnoreCase(partitionFunction.getName())
&& functionConfigsMatch(_partitionFunction.getFunctionConfig(), partitionFunction.getFunctionConfig())
&& Objects.equals(_partitionFunction.getFunctionExpr(), partitionFunction.getFunctionExpr())
&& equalsIgnoreCaseNullable(_partitionFunction.getPartitionIdNormalizer(),
partitionFunction.getPartitionIdNormalizer());
}

private static boolean functionConfigsMatch(@Nullable Map<String, String> a, @Nullable Map<String, String> b) {
boolean aEmpty = a == null || a.isEmpty();
boolean bEmpty = b == null || b.isEmpty();
if (aEmpty && bEmpty) {
return true;
}
return Objects.equals(a, b);
}

private static boolean equalsIgnoreCaseNullable(@Nullable String a, @Nullable String b) {
if (a == null) {
return b == null;
}
return a.equalsIgnoreCase(b);
}

private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {
if (znRecord == null) {
return INVALID_CREATION_TIME_MS;
Expand Down Expand Up @@ -306,7 +338,7 @@ private void computeTablePartitionReplicatedServersInfo() {
}
}
_tablePartitionReplicatedServersInfo =
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName,
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, getPartitionFunctionDescription(),
_numPartitions, partitionInfoMap, segmentsWithInvalidPartition);
}

Expand Down Expand Up @@ -337,7 +369,7 @@ private void computeTablePartitionInfo() {
_tableNameWithType);
}
_tablePartitionInfo =
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName, _numPartitions,
new TablePartitionInfo(_tableNameWithType, _partitionColumn, getPartitionFunctionDescription(), _numPartitions,
segmentsByPartition, segmentsWithInvalidPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nullable;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -79,9 +80,15 @@ public static SegmentPartitionInfo extractPartitionInfo(String tableNameWithType
return INVALID_PARTITION_INFO;
}

return new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
try {
PartitionFunction partitionFunction =
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
return new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
} catch (Exception e) {
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
partitionColumn, segment, tableNameWithType, e);
return INVALID_PARTITION_INFO;
}
}

/**
Expand Down Expand Up @@ -123,10 +130,16 @@ public static Map<String, SegmentPartitionInfo> extractPartitionInfoMap(String t
segment, tableNameWithType);
continue;
}
SegmentPartitionInfo segmentPartitionInfo = new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
try {
PartitionFunction partitionFunction =
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
SegmentPartitionInfo segmentPartitionInfo =
new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
} catch (Exception e) {
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
partitionColumn, segment, tableNameWithType, e);
}
}
if (columnSegmentPartitionInfoMap.size() == 1) {
String partitionColumn = columnSegmentPartitionInfoMap.keySet().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code MultiPartitionColumnsSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
*/
public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiPartitionColumnsSegmentPruner.class);

private final String _tableNameWithType;
private final Set<String> _partitionColumns;
private final Map<String, Map<String, SegmentPartitionInfo>> _segmentColumnPartitionInfoMap =
Expand Down Expand Up @@ -140,8 +144,8 @@ private boolean isPartitionMatch(Expression filterExpression,
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null) {
SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
return partitionInfo == null || partitionInfo.getPartitions().contains(
partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
return partitionInfo == null || isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(
operands.get(1)));
} else {
return true;
}
Expand All @@ -155,8 +159,7 @@ private boolean isPartitionMatch(Expression filterExpression,
}
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
return true;
}
}
Expand All @@ -169,4 +172,14 @@ private boolean isPartitionMatch(Expression filterExpression,
return true;
}
}

private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
try {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
} catch (RuntimeException e) {
LOGGER.warn("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
+ "pruning", _tableNameWithType, partitionInfo.getPartitionColumn(), e);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code SinglePartitionColumnSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
*/
public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
private static final Logger LOGGER = LoggerFactory.getLogger(SinglePartitionColumnSegmentPruner.class);

private final String _tableNameWithType;
private final String _partitionColumn;
private final Map<String, SegmentPartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -129,8 +133,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
case EQUALS: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(1))));
return isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(1)));
} else {
return true;
}
Expand All @@ -140,8 +143,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
return true;
}
}
Expand All @@ -154,4 +156,14 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
return true;
}
}

private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
try {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
} catch (RuntimeException e) {
LOGGER.warn("Failed to evaluate partition function for table: {}, partition column: {}; skipping partition "
+ "pruning", _tableNameWithType, _partitionColumn, e);
return true;
}
}
}
Loading
Loading