Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,7 +93,7 @@ public void updateLineageForRetention(TableConfig tableConfig, SegmentLineage li
} else {
// If the lineage state is 'COMPLETED' and we already preserved the original segments for the required
// retention, it is safe to delete all segments from 'segmentsFrom'
if (shouldDeleteReplacedSegments(tableConfig, lineageEntry, replacedSegmentsRetentionMs)) {
if (shouldDeleteReplacedSegments(lineageEntry, replacedSegmentsRetentionMs)) {
segmentsToDelete.addAll(sourceSegments);
}
}
Expand Down Expand Up @@ -122,24 +121,17 @@ public void updateLineageForRetention(TableConfig tableConfig, SegmentLineage li
/**
* Helper function to decide whether we should delete segmentsFrom (replaced segments) given a lineage entry.
*
* The replaced segments are safe to delete if either:
* 1) The table is not "REFRESH" (e.g. "APPEND"), in which case they are deleted immediately, or
* 2) The lineage entry has been in "COMPLETED" state for longer than {@code replacedSegmentsRetentionMs}
* (configurable via {@code replacedSegmentsRetentionPeriod} in table config, defaulting to 1 day).
* The replaced segments are safe to delete once the lineage entry has been in "COMPLETED" state for longer
* than {@code replacedSegmentsRetentionMs} (configurable via {@code replacedSegmentsRetentionPeriod} in
* table config, defaulting to 1 day). The retention period applies uniformly to all batch ingestion
* types — any replacement protocol (REFRESH-table snapshot replace, APPEND-table minion-driven replace,
* segment-group merge) gets the same configurable grace window before its replaced segments are dropped.
*
* @param tableConfig a table config
* @param lineageEntry lineage entry
* @param replacedSegmentsRetentionMs configured retention in ms for replaced segments
* @return True if we can safely delete the replaced segments. False otherwise.
*/
private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, LineageEntry lineageEntry,
long replacedSegmentsRetentionMs) {
// TODO: Currently, we preserve the replaced segments for REFRESH tables only. Once we support
// data rollback for APPEND tables, we should remove this check.
String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")) {
return true;
}
private boolean shouldDeleteReplacedSegments(LineageEntry lineageEntry, long replacedSegmentsRetentionMs) {
// Strict < means a 0ms retention won't delete on the exact same millisecond; this is intentional to
// avoid edge-case races and is consistent with the existing behavior for non-zero retention values.
return lineageEntry.getTimestamp() < (System.currentTimeMillis() - replacedSegmentsRetentionMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,35 @@ public void testRefreshTableZeroRetentionDeletesImmediately() {
}

@Test
public void testAppendTableAlwaysDeletesRegardlessOfRetentionConfig() {
// APPEND table: source segments are always eligible for deletion regardless of retention setting
public void testAppendTableHonorsReplacedSegmentsRetentionPeriod() {
// APPEND table with a non-zero replacedSegmentsRetentionPeriod must defer deletion until the
// retention window elapses — same semantics as REFRESH. This pins the symmetric behavior after
// the REFRESH-only gate in shouldDeleteReplacedSegments was removed: any replacement protocol
// (REFRESH snapshot replace, APPEND minion replace, segment-group merge) gets the same
// configurable grace window for its replaced segments.
TableConfig tableConfig = appendTableBuilder().setReplacedSegmentsRetentionPeriod("7d").build();

String entryId = UUID.randomUUID().toString();
String recentEntryId = UUID.randomUUID().toString();
long recentTimestamp = System.currentTimeMillis();
String oldEntryId = UUID.randomUUID().toString();
long oldTimestamp = System.currentTimeMillis() - 8 * 24 * 60 * 60 * 1000L; // 8 days ago > 7d retention

SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
lineage.addLineageEntry(entryId,
new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"), LineageEntryState.COMPLETED, recentTimestamp));
lineage.addLineageEntry(recentEntryId,
new LineageEntry(Arrays.asList("recent_src"), Arrays.asList("recent_dst"), LineageEntryState.COMPLETED,
recentTimestamp));
lineage.addLineageEntry(oldEntryId,
new LineageEntry(Arrays.asList("old_src"), Arrays.asList("old_dst"), LineageEntryState.COMPLETED,
oldTimestamp));

List<String> segmentsToDelete = new ArrayList<>();
_lineageManager.updateLineageForRetention(tableConfig, lineage, Arrays.asList("src1", "dst1"), segmentsToDelete,
new HashSet<>());
_lineageManager.updateLineageForRetention(tableConfig, lineage,
Arrays.asList("recent_src", "recent_dst", "old_src", "old_dst"), segmentsToDelete, new HashSet<>());

assertTrue(segmentsToDelete.contains("src1"),
"APPEND table source segments must always be eligible for deletion");
assertFalse(segmentsToDelete.contains("recent_src"),
"APPEND table source segments must be retained within the configured retention window");
assertTrue(segmentsToDelete.contains("old_src"),
"APPEND table source segments must be deleted once the configured retention window has elapsed");
}

// ---------------------------------------------------------------------------
Expand Down
Loading