Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ public static SegmentPartitionInfo extractPartitionInfo(String tableNameWithType
}

return new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(),
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig()),
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
}

Expand Down Expand Up @@ -125,8 +124,7 @@ public static Map<String, SegmentPartitionInfo> extractPartitionInfoMap(String t
continue;
}
SegmentPartitionInfo segmentPartitionInfo = new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata.getFunctionName(),
columnPartitionMetadata.getNumPartitions(), columnPartitionMetadata.getFunctionConfig()),
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -109,7 +111,12 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t
String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
boolean minimizeDataMovement = segmentConfig.isMinimizeDataMovement();
if (partitionColumn != null) {
int numPartitions = tableConfig.getIndexingConfig().getSegmentPartitionConfig().getNumPartitions(partitionColumn);
SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
Preconditions.checkState(segmentPartitionConfig != null, "Failed to find the segment partition config");
ColumnPartitionConfig columnPartitionConfig = segmentPartitionConfig.getColumnPartitionConfig(partitionColumn);
Preconditions.checkState(columnPartitionConfig != null,
"Failed to find the column partition config for column: %s", partitionColumn);
int numPartitions = columnPartitionConfig.getNumPartitions();
Preconditions.checkState(numPartitions > 0, "Number of partitions for column: %s is not properly configured",
partitionColumn);
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, numPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit)
_defaultNullHandlingEnabled);
_segmentLogger.info("Trying to build segment");
try {
converter.build(_segmentVersion, _serverMetrics);
converter.build(_segmentVersion);
} catch (Exception e) {
String errorMessage = "Could not build segment";
FileUtils.deleteQuietly(tempSegmentFolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@
* Partitioner which computes partition values based on the ColumnPartitionConfig from the table config
*/
public class TableConfigPartitioner implements Partitioner {

private final String _column;
private final PartitionFunction _partitionFunction;

public TableConfigPartitioner(String columnName, ColumnPartitionConfig columnPartitionConfig) {
_column = columnName;
_partitionFunction = PartitionFunctionFactory
.getPartitionFunction(columnPartitionConfig.getFunctionName(), columnPartitionConfig.getNumPartitions(),
columnPartitionConfig.getFunctionConfig());
_partitionFunction = PartitionFunctionFactory.getPartitionFunction(columnPartitionConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class RealtimeSegmentConverter {
private final String _segmentName;
private final boolean _nullHandlingEnabled;
private final boolean _enableColumnMajor;
private final ServerMetrics _serverMetrics;

public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPropsConfig segmentZKPropsConfig,
String outputPath, Schema schema, String tableName, TableConfig tableConfig, String segmentName,
Expand All @@ -78,9 +79,10 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro
} else {
_enableColumnMajor = _tableConfig.getIndexingConfig().isColumnMajorSegmentBuilderEnabled();
}
_serverMetrics = ServerMetrics.get();
}

public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetrics serverMetrics)
public void build(@Nullable SegmentVersion segmentVersion)
throws Exception {
SegmentGeneratorConfig genConfig = new SegmentGeneratorConfig(_tableConfig, _dataSchema);
genConfig.setInstanceType(InstanceType.SERVER);
Expand Down Expand Up @@ -136,7 +138,7 @@ public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetri
try (CompactedPinotSegmentRecordReader recordReader = new CompactedPinotSegmentRecordReader(validDocIds)) {
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
buildSegmentWithReader(driver, genConfig, recordReader, sortedDocIds, sortedColumn, validDocIds);
publishCompactionMetrics(serverMetrics, preCommitRowCount, driver, compactionStartTime);
publishCompactionMetrics(preCommitRowCount, driver, compactionStartTime);
}
} else {
// Use regular PinotSegmentRecordReader (existing behavior)
Expand All @@ -156,11 +158,11 @@ public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetri
}
}

if (segmentPartitionConfig != null && serverMetrics != null) {
if (segmentPartitionConfig != null) {
Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
for (String columnName : columnPartitionMap.keySet()) {
int numPartitions = driver.getSegmentStats().getColumnProfileFor(columnName).getPartitions().size();
serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS, numPartitions);
_serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS, numPartitions);
}
}
}
Expand All @@ -175,36 +177,32 @@ private RoaringBitmap getValidDocIds() {
* Publishes segment build metrics including common metrics (always published) and compaction-specific metrics
* (published only when compaction is enabled)
*/
private void publishCompactionMetrics(@Nullable ServerMetrics serverMetrics,
int preCommitRowCount, SegmentIndexCreationDriverImpl driver, long buildStartTime) {
if (serverMetrics == null) {
return;
}
private void publishCompactionMetrics(int preCommitRowCount, SegmentIndexCreationDriverImpl driver,
long buildStartTime) {
try {
int postCommitRowCount = driver.getSegmentStats().getTotalDocCount();
long buildProcessingTime = System.currentTimeMillis() - buildStartTime;

int rowsRemoved = preCommitRowCount - postCommitRowCount;

// Only publish compaction-specific metrics when compaction is actually enabled
serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS, 1L);
serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_PRE_COMPACTION,
_serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS, 1L);
_serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_PRE_COMPACTION,
preCommitRowCount);
serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_POST_COMPACTION,
_serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_POST_COMPACTION,
postCommitRowCount);
serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_REMOVED, rowsRemoved);
serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_BUILD_TIME_MS,
_serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_ROWS_REMOVED, rowsRemoved);
_serverMetrics.addMeteredTableValue(_tableName, ServerMeter.COMMIT_TIME_COMPACTION_BUILD_TIME_MS,
buildProcessingTime);

// Calculate and publish compaction ratio percentage (only if we had rows to compact)
if (preCommitRowCount > 0) {
double compactionRatioPercent = (double) rowsRemoved / preCommitRowCount * 100.0;
serverMetrics.setOrUpdateTableGauge(_tableName, ServerGauge.COMMIT_TIME_COMPACTION_RATIO_PERCENT,
_serverMetrics.setOrUpdateTableGauge(_tableName, ServerGauge.COMMIT_TIME_COMPACTION_RATIO_PERCENT,
(long) compactionRatioPercent);
}
} catch (Exception e) {
LOGGER.warn("Failed to publish segment build metrics for table: {}, segment: {}", _tableName,
_segmentName, e);
LOGGER.warn("Failed to publish segment build metrics for table: {}, segment: {}", _tableName, _segmentName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.converter.stats;

import com.google.common.base.Preconditions;
import com.google.common.base.Utf8;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand All @@ -37,14 +38,13 @@
/// When commit-time compaction is enabled, only valid (non-deleted) documents should be considered.
@SuppressWarnings("rawtypes")
public class CompactedColumnStatistics extends MutableColumnStatistics {
@Nullable
private final Object _minValue;
@Nullable
private final Object _maxValue;
private final Comparable _minValue;
private final Comparable _maxValue;
private final Object _uniqueValues;
private final int _cardinality;
private final int _minElementLength;
private final int _maxElementLength;
private final boolean _isAscii;
private final boolean _isSorted;
private final int _totalEntries;
private final int _maxMultiValues;
Expand All @@ -53,13 +53,15 @@ public class CompactedColumnStatistics extends MutableColumnStatistics {
public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDocIds, boolean isSortedColumn,
RoaringBitmap validDocIds) {
super(dataSource, sortedDocIds, isSortedColumn);

MutableForwardIndex forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
Dictionary dictionary = _dictionary;
DataType valueType = dictionary.getValueType();
boolean isSingleValue = forwardIndex.isSingleValue();
Preconditions.checkState(!validDocIds.isEmpty(), "Use EmptyColumnStatistics for empty column: %s",
_fieldSpec.getName());
DataType valueType = getValueType();
boolean isSingleValue = isSingleValue();
boolean isFixedWidth = valueType.isFixedWidth();
boolean isVarBytesMV = !isSingleValue && !isFixedWidth;
MutableForwardIndex forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
Preconditions.checkState(forwardIndex != null, "Failed to find forward index for column: %s", _fieldSpec.getName());
Dictionary dictionary = _dictionary;

// Single pass over valid documents to collect used dict IDs and entry counts.
// For SV columns, sort order is tracked inline: when sortedDocIds is provided, iterate in that order; when null,
Expand All @@ -69,9 +71,9 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
IntOpenHashSet usedDictIds = new IntOpenHashSet();
boolean isSorted = !_isSortedColumn;
int prevDictId = -1;
int maxRowLength = -1;
int maxRowLength = 0;
int totalEntries = 0;
int maxMultiValues = -1;
int maxMultiValues = 0;

if (isSingleValue) {
if (_sortedDocIds != null) {
Expand Down Expand Up @@ -127,7 +129,6 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
_cardinality = usedDictIds.size();
_totalEntries = totalEntries;
_maxMultiValues = maxMultiValues;
_maxRowLength = maxRowLength;

// Build a typed array from the used dict IDs, sort it by natural value order, and extract min/max value and element
// lengths from the sorted results.
Expand All @@ -136,6 +137,7 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
// For fixed-width types element length is constant; for variable-width it is tracked per entry
int minElementLength = isFixedWidth ? valueType.size() : Integer.MAX_VALUE;
int maxElementLength = isFixedWidth ? valueType.size() : 0;
boolean isAscii = false;
switch (valueType) {
case INT: {
int[] values = new int[_cardinality];
Expand Down Expand Up @@ -205,12 +207,8 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
maxValue = values[_cardinality - 1];
for (BigDecimal value : values) {
int length = BigDecimalUtils.byteSize(value);
if (length < minElementLength) {
minElementLength = length;
}
if (length > maxElementLength) {
maxElementLength = length;
}
minElementLength = Math.min(minElementLength, length);
maxElementLength = Math.max(maxElementLength, length);
}
}
_uniqueValues = values;
Expand All @@ -226,13 +224,13 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
Arrays.sort(values);
minValue = values[0];
maxValue = values[_cardinality - 1];
isAscii = true;
for (String value : values) {
int length = Utf8.encodedLength(value);
if (length < minElementLength) {
minElementLength = length;
}
if (length > maxElementLength) {
maxElementLength = length;
minElementLength = Math.min(minElementLength, length);
maxElementLength = Math.max(maxElementLength, length);
if (isAscii) {
isAscii = length == value.length();
}
}
}
Expand All @@ -251,12 +249,8 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
maxValue = values[_cardinality - 1];
for (ByteArray value : values) {
int length = value.length();
if (length < minElementLength) {
minElementLength = length;
}
if (length > maxElementLength) {
maxElementLength = length;
}
minElementLength = Math.min(minElementLength, length);
maxElementLength = Math.max(maxElementLength, length);
}
}
_uniqueValues = values;
Expand All @@ -269,7 +263,14 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
_maxValue = maxValue;
_minElementLength = minElementLength;
_maxElementLength = maxElementLength;

_isAscii = isAscii;
if (isSingleValue) {
_maxRowLength = maxElementLength;
} else if (isVarBytesMV) {
_maxRowLength = maxRowLength;
} else {
_maxRowLength = maxMultiValues * valueType.size();
}
if (_isSortedColumn) {
_isSorted = true;
} else if (!isSingleValue) {
Expand All @@ -279,15 +280,13 @@ public CompactedColumnStatistics(DataSource dataSource, @Nullable int[] sortedDo
}
}

@Nullable
@Override
public Object getMinValue() {
public Comparable getMinValue() {
return _minValue;
}

@Nullable
@Override
public Object getMaxValue() {
public Comparable getMaxValue() {
return _maxValue;
}

Expand All @@ -307,10 +306,15 @@ public int getLengthOfShortestElement() {
}

@Override
public int getLengthOfLargestElement() {
public int getLengthOfLongestElement() {
return _maxElementLength;
}

@Override
public boolean isAscii() {
return _isAscii;
}

@Override
public boolean isSorted() {
return _isSorted;
Expand Down
Loading
Loading