Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,22 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
}

public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
return getLastLLCCompletedSegments(getSegmentsZKMetadata(tableNameWithType));
}

/// Overload that operates on a caller-supplied list of {@link SegmentZKMetadata}, avoiding a
/// redundant ZK fetch when the caller already holds the list (e.g. periodic tasks that scan all
/// segments of a table and want to derive the last-completed LLC segment per partition without
/// re-reading the property store).
public Collection<String> getLastLLCCompletedSegments(List<? extends SegmentZKMetadata> segmentZKMetadataList) {
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
for (SegmentZKMetadata zkMetadata : getSegmentsZKMetadata(tableNameWithType)) {
for (SegmentZKMetadata zkMetadata : segmentZKMetadataList) {
if (zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
LLCSegmentName llcName = LLCSegmentName.of(zkMetadata.getSegmentName());
if (llcName == null) {
// llcName can be null if the segment is uploaded through offline ingestion
continue;
}
int partitionGroupId = llcName.getPartitionGroupId();
int sequenceNumber = llcName.getSequenceNumber();
String lastCompletedSegName = partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,14 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
for (int i = 0; i < deleteSuccessful.length; i++) {
final String segmentId = segmentsToDelete.get(i);
if (!deleteSuccessful[i]) {
// remove API can fail because the prop store entry did not exist, so check first.
if (_propertyStore.exists(propStorePathList.get(i), AccessOption.PERSISTENT)) {
LOGGER.info("Could not delete {} from propertystore", propStorePathList.get(i));
// The batch remove API takes a non-recursive ZK path: it cannot delete a znode that has
// accumulated children. Fall back to the single-path remove API, which falls back to a
// recursive delete on the same NotEmpty failure. Skip when the znode is already gone
// (the batch call may have failed simply because the entry did not exist).
String segmentPath = propStorePathList.get(i);
if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
&& !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) {
LOGGER.info("Could not delete {} from propertystore", segmentPath);
segmentsToRetryLater.add(segmentId);
propStoreFailedSegs.add(segmentId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,13 @@ private void manageRetentionForTable(TableConfig tableConfig) {
LOGGER.info("Segment push type is not APPEND for table: {}, skip managing retention", tableNameWithType);
return;
}
String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
String retentionTimeValue = validationConfig.getRetentionTimeValue();
int untrackedSegmentsDeletionBatchSize =
validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ? Integer.parseInt(
validationConfig.getUntrackedSegmentsDeletionBatchSize()) : DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;

RetentionStrategy retentionStrategy;
try {
retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
Long.parseLong(retentionTimeValue), _useCreationTimeFallbackForRetention);
} catch (Exception e) {
LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue,
tableNameWithType);
RetentionStrategy retentionStrategy =
TableConfigRetentionUtils.buildRetentionStrategy(tableConfig, _useCreationTimeFallbackForRetention);
if (retentionStrategy == null) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.retention;

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/// Utility methods for deriving retention-related objects from a {@link TableConfig}.
public class TableConfigRetentionUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigRetentionUtils.class);

private TableConfigRetentionUtils() {
}

/// Builds a {@link RetentionStrategy} from {@code tableConfig}, or returns {@code null} when the
/// retention config is absent, empty, or malformed. A null return means no retention is configured
/// and no segment should be treated as purgeable.
///
/// @param useCreationTimeFallback when true, the strategy falls back to segment creation time
/// when segment end time is unavailable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's follow existing convention for javadoc comments here with /** */.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang asked to add new comments in markdown style

@Nullable
public static RetentionStrategy buildRetentionStrategy(TableConfig tableConfig,
boolean useCreationTimeFallback) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
String unit = validationConfig.getRetentionTimeUnit();
String value = validationConfig.getRetentionTimeValue();
if (unit == null || unit.isEmpty() || value == null || value.isEmpty()) {
LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit, value, tableConfig.getTableName());
return null;
}
try {
return new TimeRetentionStrategy(TimeUnit.valueOf(unit.toUpperCase(Locale.ROOT)), Long.parseLong(value),
useCreationTimeFallback);
} catch (Exception e) {
LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit, value, tableConfig.getTableName());
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;


public class PinotHelixResourceManagerLastLLCSegmentsTest {

private static final String TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);

/**
* A realtime table can contain non-LLC-named segments (e.g. uploaded via batch ingestion) sitting in DONE state
* alongside the LLC-named consuming/committed segments. {@code getLastLLCCompletedSegments} must skip those
* uploaded segments rather than NPE when {@code LLCSegmentName.of(name)} returns {@code null}.
*/
@Test
public void testGetLastLLCCompletedSegmentsSkipsNonLLCNamedSegments() {
long now = System.currentTimeMillis();
int partitionId = 3;

List<SegmentZKMetadata> segments = new ArrayList<>();
// Two LLC-named DONE segments — sequence 0 and 1 for the same partition; sequence 1 is the latest.
LLCSegmentName seq0 = new LLCSegmentName(TABLE_NAME, partitionId, 0, now);
LLCSegmentName seq1 = new LLCSegmentName(TABLE_NAME, partitionId, 1, now);
segments.add(doneSegment(seq0.getSegmentName()));
segments.add(doneSegment(seq1.getSegmentName()));
// An uploaded (non-LLC-named) segment in DONE state — must be ignored, not crash the method.
segments.add(doneSegment("uploaded_segment_0"));

PinotHelixResourceManager rm = mock(PinotHelixResourceManager.class);
when(rm.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segments);
when(rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
when(rm.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();

Collection<String> lastCompleted = rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME);
Set<String> actual = new HashSet<>(lastCompleted);
assertEquals(actual, Set.of(seq1.getSegmentName()));
}

private static SegmentZKMetadata doneSegment(String name) {
SegmentZKMetadata md = new SegmentZKMetadata(name);
md.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
return md;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ private PinotHelixResourceManager setupSegmentMetadataForPausedTable(TableConfig
when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata);
when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME);
when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
when(pinotHelixResourceManager.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();

HelixAdmin helixAdmin = mock(HelixAdmin.class);
when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(idealState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ public static SegmentZKMetadata mockSegmentZKMetadata(String segmentName, long n
return segmentZKMetadata;
}

public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName, int numTotalDocs,
String crc, long startTime, long endTime, TimeUnit timeUnit, String partitionColumn, int partitionId,
int numPartitions) {
SegmentMetadata segmentMetadata =
mockSegmentMetadata(tableName, segmentName, numTotalDocs, crc, startTime, endTime, timeUnit);
ColumnMetadata colMeta = mock(ColumnMetadata.class);
when(colMeta.getPartitions()).thenReturn(Collections.singleton(partitionId));
when(colMeta.getPartitionFunction()).thenReturn(new MurmurPartitionFunction(numPartitions, null));
TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
columnMetadataMap.put(partitionColumn, colMeta);
when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
return segmentMetadata;
}

public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String rawTableName, String segmentName,
String columnName, int partitionNumber) {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
Expand Down