Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,19 @@ public static class ControllerPeriodicTasksConf {
// Untracked segments are those that exist in deep store but have no corresponding entry in the ZK property store.
public static final String ENABLE_UNTRACKED_SEGMENT_DELETION =
"controller.retentionManager.untrackedSegmentDeletionEnabled";
public static final boolean DEFAULT_ENABLE_UNTRACKED_SEGMENT_DELETION = false;
public static final String UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS =
"controller.retentionManager.untrackedSegmentsRetentionTimeInDays";
public static final int DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS = 3;
public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
"controller.retentionManager.agedSegmentsDeletionBatchSize";
public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;

// When enabled, the retention manager will fall back to using segment creation time for retention decisions
// when the segment end time is invalid (e.g., time column populated with 0).
public static final String ENABLE_RETENTION_CREATION_TIME_FALLBACK =
"controller.retentionManager.enableCreationTimeFallback";
public static final boolean DEFAULT_ENABLE_RETENTION_CREATION_TIME_FALLBACK = false;
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour.
Expand Down Expand Up @@ -1217,7 +1224,8 @@ public int getTmpSegmentRetentionInSeconds() {
}

public boolean getUntrackedSegmentDeletionEnabled() {
return getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, false);
return getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
ControllerPeriodicTasksConf.DEFAULT_ENABLE_UNTRACKED_SEGMENT_DELETION);
}

public int getUntrackedSegmentsRetentionTimeInDays() {
Expand All @@ -1229,6 +1237,11 @@ public void setUntrackedSegmentDeletionEnabled(boolean untrackedSegmentDeletionE
setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, untrackedSegmentDeletionEnabled);
}

public boolean isRetentionCreationTimeFallbackEnabled() {
return getProperty(ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK,
ControllerPeriodicTasksConf.DEFAULT_ENABLE_RETENTION_CREATION_TIME_FALLBACK);
}

public int getAgedSegmentsDeletionBatchSize() {
return getProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE,
ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);
private volatile boolean _isHybridTableRetentionStrategyEnabled;
private volatile boolean _useCreationTimeFallbackForRetention;
private final BrokerServiceHelper _brokerServiceHelper;

public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
Expand All @@ -97,6 +98,7 @@ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
_untrackedSegmentsRetentionTimeInDays = config.getUntrackedSegmentsRetentionTimeInDays();
_agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize();
_isHybridTableRetentionStrategyEnabled = config.isHybridTableRetentionStrategyEnabled();
_useCreationTimeFallbackForRetention = config.isRetentionCreationTimeFallbackEnabled();
_brokerServiceHelper = brokerServiceHelper;
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", getIntervalInSeconds());
}
Expand Down Expand Up @@ -146,7 +148,7 @@ private void manageRetentionForTable(TableConfig tableConfig) {
RetentionStrategy retentionStrategy;
try {
retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
Long.parseLong(retentionTimeValue));
Long.parseLong(retentionTimeValue), _useCreationTimeFallbackForRetention);
} catch (Exception e) {
LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue,
tableNameWithType);
Expand Down Expand Up @@ -534,11 +536,32 @@ public void onChange(Set<String> changedConfigs, Map<String, String> clusterConf
updateHybridTableRetentionStrategyEnabled(
clusterConfigs.get(ControllerConf.ENABLE_HYBRID_TABLE_RETENTION_STRATEGY));
}

if (changedConfigs.contains(
ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)) {
updateRetentionCreationTimeFallbackEnabled(
clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK));
}
}

private void updateUntrackedSegmentDeletionEnabled(String newValue) {
boolean oldValue = _untrackedSegmentDeletionEnabled;

// When the cluster config key is deleted, newValue will be null. Reset to default.
boolean defaultValue =
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_ENABLE_UNTRACKED_SEGMENT_DELETION;
if (newValue == null) {
if (oldValue != defaultValue) {
_untrackedSegmentDeletionEnabled = defaultValue;
LOGGER.info("Cluster config for untrackedSegmentDeletionEnabled was removed, "
+ "reverting from {} to default ({})", oldValue, defaultValue);
} else {
LOGGER.info("Cluster config for untrackedSegmentDeletionEnabled was removed, "
+ "already at default ({})", defaultValue);
}
return;
}

// Validate that the value is a proper boolean string
if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) {
LOGGER.warn("Invalid value for untrackedSegmentDeletionEnabled: {}, keeping current value: {}", newValue,
Expand All @@ -557,6 +580,21 @@ private void updateUntrackedSegmentDeletionEnabled(String newValue) {

private void updateUntrackedSegmentsRetentionTimeInDays(String newValue) {
int oldValue = _untrackedSegmentsRetentionTimeInDays;

// When the cluster config key is deleted, newValue will be null. Reset to default.
if (newValue == null) {
int defaultValue = ControllerConf.ControllerPeriodicTasksConf.DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS;
if (oldValue != defaultValue) {
_untrackedSegmentsRetentionTimeInDays = defaultValue;
LOGGER.info("Cluster config for untrackedSegmentsRetentionTimeInDays was removed, "
+ "reverting from {} to default ({})", oldValue, defaultValue);
} else {
LOGGER.info("Cluster config for untrackedSegmentsRetentionTimeInDays was removed, "
+ "already at default ({})", defaultValue);
}
return;
}

try {
int parsedValue = Integer.parseInt(newValue);
if (parsedValue <= 0) {
Expand All @@ -577,22 +615,79 @@ private void updateUntrackedSegmentsRetentionTimeInDays(String newValue) {

private void updateHybridTableRetentionStrategyEnabled(String newValue) {
boolean oldValue = _isHybridTableRetentionStrategyEnabled;
try {
boolean parsedValue = Boolean.parseBoolean(newValue);
if (oldValue == parsedValue) {
LOGGER.info("No change in isHybridTableRetentionStrategyEnabled, current value: {}", oldValue);

// When the cluster config key is deleted, newValue will be null. Reset to default.
boolean defaultValue = ControllerConf.DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY;
if (newValue == null) {
if (oldValue != defaultValue) {
_isHybridTableRetentionStrategyEnabled = defaultValue;
LOGGER.info("Cluster config for isHybridTableRetentionStrategyEnabled was removed, "
+ "reverting from {} to default ({})", oldValue, defaultValue);
} else {
_isHybridTableRetentionStrategyEnabled = parsedValue;
LOGGER.info("Updated isHybridTableRetentionStrategyEnabled from {} to {}", oldValue, parsedValue);
LOGGER.info("Cluster config for isHybridTableRetentionStrategyEnabled was removed, "
+ "already at default ({})", defaultValue);
}
} catch (Exception e) {
return;
}

// Validate that the value is a proper boolean string
if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) {
Comment on lines +633 to +634
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets extract this into a util method to be used where this is duplicated here.

LOGGER.warn("Invalid value for isHybridTableRetentionStrategyEnabled: {}, keeping current value: {}", newValue,
oldValue);
return;
}

boolean parsedValue = Boolean.parseBoolean(newValue);
if (oldValue == parsedValue) {
LOGGER.info("No change in isHybridTableRetentionStrategyEnabled, current value: {}", oldValue);
} else {
_isHybridTableRetentionStrategyEnabled = parsedValue;
LOGGER.info("Updated isHybridTableRetentionStrategyEnabled from {} to {}", oldValue, parsedValue);
}
}

private void updateRetentionCreationTimeFallbackEnabled(String newValue) {
boolean oldValue = _useCreationTimeFallbackForRetention;

// When the cluster config key is deleted, newValue will be null.
// Reset to default since this flag gates destructive retention deletion.
boolean defaultValue =
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_ENABLE_RETENTION_CREATION_TIME_FALLBACK;
if (newValue == null) {
if (oldValue != defaultValue) {
_useCreationTimeFallbackForRetention = defaultValue;
LOGGER.info("Cluster config for retentionCreationTimeFallbackEnabled was removed, "
+ "reverting from {} to default ({})", oldValue, defaultValue);
} else {
LOGGER.info("Cluster config for retentionCreationTimeFallbackEnabled was removed, "
+ "already at default ({})", defaultValue);
}
return;
}

// Validate that the value is a proper boolean string
if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this cluster config key is deleted, changedConfigs will still contain it but clusterConfigs.get(...) will be null (DefaultClusterConfigChangeHandler explicitly reports deleted keys that way). This branch treats null as invalid and keeps the old value, so removing the override never reverts to the default false until restart. Because this flag gates destructive retention deletion, the current leader can keep purging segments after an operator thinks they disabled the feature. Please handle null explicitly and reset _useCreationTimeFallbackForRetention to the default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Made changes for other config for this class as well.
Thanks for pointing this out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ @xiangfu0, is this okay. If so, I will merge.

LOGGER.warn("Invalid value for retentionCreationTimeFallbackEnabled: {}, keeping current value: {}", newValue,
oldValue);
return;
}

boolean parsedValue = Boolean.parseBoolean(newValue);
if (oldValue == parsedValue) {
LOGGER.info("No change in retentionCreationTimeFallbackEnabled, current value: {}", oldValue);
} else {
_useCreationTimeFallbackForRetention = parsedValue;
LOGGER.info("Updated retentionCreationTimeFallbackEnabled from {} to {}", oldValue, parsedValue);
}
}

@VisibleForTesting
public boolean isUntrackedSegmentDeletionEnabled() {
return _untrackedSegmentDeletionEnabled;
}

@VisibleForTesting
public boolean isRetentionCreationTimeFallbackEnabled() {
return _useCreationTimeFallbackForRetention;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,50 @@ public class TimeRetentionStrategy implements RetentionStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeRetentionStrategy.class);

private final long _retentionMs;
private final boolean _useCreationTimeFallback;

public TimeRetentionStrategy(TimeUnit timeUnit, long timeValue) {
this(timeUnit, timeValue, false);
}

public TimeRetentionStrategy(TimeUnit timeUnit, long timeValue, boolean useCreationTimeFallback) {
_retentionMs = timeUnit.toMillis(timeValue);
_useCreationTimeFallback = useCreationTimeFallback;
}

@Override
public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {

// For realtime tables, only completed segments(DONE or UPLOADED) are eligible for purging.
//For offline tables, status defaults to UPLOADED which is completed, so they proceed to normal retention
// For offline tables, status defaults to UPLOADED which is completed, so they proceed to normal retention
if (!segmentZKMetadata.getStatus().isCompleted()) {
return false; // Incomplete segments don't have final end time and should not be purged
}

return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(), segmentZKMetadata.getEndTimeMs());
Comment thread
noob-se7en marked this conversation as resolved.
String segmentName = segmentZKMetadata.getSegmentName();
long endTimeMs = segmentZKMetadata.getEndTimeMs();

// If end time is valid, use it directly
if (TimeUtils.timeValueInValidRange(endTimeMs)) {
return System.currentTimeMillis() - endTimeMs > _retentionMs;
}

long creationTimeMs = segmentZKMetadata.getCreationTime();

if (_useCreationTimeFallback && TimeUtils.timeValueInValidRange(creationTimeMs)) {
LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
segmentName, tableNameWithType, endTimeMs, creationTimeMs);
return System.currentTimeMillis() - creationTimeMs > _retentionMs;
}

if (_useCreationTimeFallback) {
LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
+ "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
} else {
LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}. "
+ "Creation time fallback is disabled", segmentName, tableNameWithType, endTimeMs);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,109 @@ private void setFileModificationTime(File file, long timestamp) {
}
}

@Test
public void testCreationTimeFallbackOnChange() {
ControllerConf conf = new ControllerConf();
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
PinotHelixResourceManager mockResourceManager = mock(PinotHelixResourceManager.class);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
new RetentionManager(mockResourceManager, mock(LeadControllerManager.class), conf, controllerMetrics,
brokerServiceHelper);

// Default should be false
assertFalse(retentionManager.isRetentionCreationTimeFallbackEnabled());

// Simulate cluster config change to enable
String configKey = ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK;
Map<String, String> clusterConfigs = new HashMap<>();
clusterConfigs.put(configKey, "true");
retentionManager.onChange(Set.of(configKey), clusterConfigs);
assertTrue(retentionManager.isRetentionCreationTimeFallbackEnabled());

// Simulate cluster config change to disable
clusterConfigs.put(configKey, "false");
retentionManager.onChange(Set.of(configKey), clusterConfigs);
assertFalse(retentionManager.isRetentionCreationTimeFallbackEnabled());

// Invalid value should keep current value
clusterConfigs.put(configKey, "invalid");
retentionManager.onChange(Set.of(configKey), clusterConfigs);
assertFalse(retentionManager.isRetentionCreationTimeFallbackEnabled());

// Simulate config key deletion (null value) while feature is enabled — should revert to default (false)
clusterConfigs.put(configKey, "true");
retentionManager.onChange(Set.of(configKey), clusterConfigs);
assertTrue(retentionManager.isRetentionCreationTimeFallbackEnabled());

// Now delete the config key: changedConfigs contains the key, but clusterConfigs.get() returns null
Map<String, String> configsWithDeletedKey = new HashMap<>();
configsWithDeletedKey.put(configKey, null);
retentionManager.onChange(Set.of(configKey), configsWithDeletedKey);
assertFalse(retentionManager.isRetentionCreationTimeFallbackEnabled());
}

@Test
public void testRetentionWithInvalidEndTimeAndCreationTimeFallback() {
long now = System.currentTimeMillis();
// Creation time must exceed the table's retention period (365 days) to be purgeable
long fourHundredDaysAgoMs = now - TimeUnit.DAYS.toMillis(400);

List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();

// Segment with invalid end time but old creation time — should be deleted when fallback is enabled
SegmentZKMetadata invalidEndTimeSeg = mock(SegmentZKMetadata.class);
when(invalidEndTimeSeg.getSegmentName()).thenReturn("seg_invalid_endtime");
when(invalidEndTimeSeg.getEndTimeMs()).thenReturn(-1L);
when(invalidEndTimeSeg.getCreationTime()).thenReturn(fourHundredDaysAgoMs);
when(invalidEndTimeSeg.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
segmentsZKMetadata.add(invalidEndTimeSeg);

// Segment with valid end time that is recent — should NOT be deleted
SegmentZKMetadata recentSeg = mock(SegmentZKMetadata.class);
when(recentSeg.getSegmentName()).thenReturn("seg_recent");
when(recentSeg.getEndTimeMs()).thenReturn(now);
when(recentSeg.getCreationTime()).thenReturn(now);
when(recentSeg.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
segmentsZKMetadata.add(recentSeg);

final TableConfig tableConfig = createOfflineTableConfig();
List<String> expectedDeletedSegments = List.of("seg_invalid_endtime");

LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);

setupPinotHelixResourceManager(tableConfig, expectedDeletedSegments, pinotHelixResourceManager,
leadControllerManager);

when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata);
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());

// Test with fallback ENABLED
ControllerConf conf = new ControllerConf();
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
conf.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK, "true");
ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
PinotHelixResourceManager mockResourceManager = mock(PinotHelixResourceManager.class);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics,
brokerServiceHelper);
retentionManager.start();
retentionManager.run();

// Verify deleteSegments is called — setupPinotHelixResourceManager's doAnswer
// already asserts the correct segments via TestNG assertions
verify(pinotHelixResourceManager, times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
}

public static class FakePinotFs extends LocalPinotFS {

@Override
Expand Down
Loading
Loading