-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for handling scenarios where end time is invalid during RetentionManager run #18148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d456597
7add7d2
a6e982f
bba038a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { | |
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class); | ||
| private volatile boolean _isHybridTableRetentionStrategyEnabled; | ||
| private volatile boolean _useCreationTimeFallbackForRetention; | ||
| private final BrokerServiceHelper _brokerServiceHelper; | ||
|
|
||
| public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, | ||
|
|
@@ -97,6 +98,7 @@ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, | |
| _untrackedSegmentsRetentionTimeInDays = config.getUntrackedSegmentsRetentionTimeInDays(); | ||
| _agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize(); | ||
| _isHybridTableRetentionStrategyEnabled = config.isHybridTableRetentionStrategyEnabled(); | ||
| _useCreationTimeFallbackForRetention = config.isRetentionCreationTimeFallbackEnabled(); | ||
| _brokerServiceHelper = brokerServiceHelper; | ||
| LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", getIntervalInSeconds()); | ||
| } | ||
|
|
@@ -146,7 +148,7 @@ private void manageRetentionForTable(TableConfig tableConfig) { | |
| RetentionStrategy retentionStrategy; | ||
| try { | ||
| retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()), | ||
| Long.parseLong(retentionTimeValue)); | ||
| Long.parseLong(retentionTimeValue), _useCreationTimeFallbackForRetention); | ||
| } catch (Exception e) { | ||
| LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue, | ||
| tableNameWithType); | ||
|
|
@@ -534,11 +536,32 @@ public void onChange(Set<String> changedConfigs, Map<String, String> clusterConf | |
| updateHybridTableRetentionStrategyEnabled( | ||
| clusterConfigs.get(ControllerConf.ENABLE_HYBRID_TABLE_RETENTION_STRATEGY)); | ||
| } | ||
|
|
||
| if (changedConfigs.contains( | ||
| ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)) { | ||
| updateRetentionCreationTimeFallbackEnabled( | ||
| clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)); | ||
| } | ||
| } | ||
|
|
||
| private void updateUntrackedSegmentDeletionEnabled(String newValue) { | ||
| boolean oldValue = _untrackedSegmentDeletionEnabled; | ||
|
|
||
| // When the cluster config key is deleted, newValue will be null. Reset to default. | ||
| boolean defaultValue = | ||
| ControllerConf.ControllerPeriodicTasksConf.DEFAULT_ENABLE_UNTRACKED_SEGMENT_DELETION; | ||
| if (newValue == null) { | ||
| if (oldValue != defaultValue) { | ||
| _untrackedSegmentDeletionEnabled = defaultValue; | ||
| LOGGER.info("Cluster config for untrackedSegmentDeletionEnabled was removed, " | ||
| + "reverting from {} to default ({})", oldValue, defaultValue); | ||
| } else { | ||
| LOGGER.info("Cluster config for untrackedSegmentDeletionEnabled was removed, " | ||
| + "already at default ({})", defaultValue); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Validate that the value is a proper boolean string | ||
| if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) { | ||
| LOGGER.warn("Invalid value for untrackedSegmentDeletionEnabled: {}, keeping current value: {}", newValue, | ||
|
|
@@ -557,6 +580,21 @@ private void updateUntrackedSegmentDeletionEnabled(String newValue) { | |
|
|
||
| private void updateUntrackedSegmentsRetentionTimeInDays(String newValue) { | ||
| int oldValue = _untrackedSegmentsRetentionTimeInDays; | ||
|
|
||
| // When the cluster config key is deleted, newValue will be null. Reset to default. | ||
| if (newValue == null) { | ||
| int defaultValue = ControllerConf.ControllerPeriodicTasksConf.DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS; | ||
| if (oldValue != defaultValue) { | ||
| _untrackedSegmentsRetentionTimeInDays = defaultValue; | ||
| LOGGER.info("Cluster config for untrackedSegmentsRetentionTimeInDays was removed, " | ||
| + "reverting from {} to default ({})", oldValue, defaultValue); | ||
| } else { | ||
| LOGGER.info("Cluster config for untrackedSegmentsRetentionTimeInDays was removed, " | ||
| + "already at default ({})", defaultValue); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| int parsedValue = Integer.parseInt(newValue); | ||
| if (parsedValue <= 0) { | ||
|
|
@@ -577,22 +615,79 @@ private void updateUntrackedSegmentsRetentionTimeInDays(String newValue) { | |
|
|
||
| private void updateHybridTableRetentionStrategyEnabled(String newValue) { | ||
| boolean oldValue = _isHybridTableRetentionStrategyEnabled; | ||
| try { | ||
| boolean parsedValue = Boolean.parseBoolean(newValue); | ||
| if (oldValue == parsedValue) { | ||
| LOGGER.info("No change in isHybridTableRetentionStrategyEnabled, current value: {}", oldValue); | ||
|
|
||
| // When the cluster config key is deleted, newValue will be null. Reset to default. | ||
| boolean defaultValue = ControllerConf.DEFAULT_ENABLE_HYBRID_TABLE_RETENTION_STRATEGY; | ||
| if (newValue == null) { | ||
| if (oldValue != defaultValue) { | ||
| _isHybridTableRetentionStrategyEnabled = defaultValue; | ||
| LOGGER.info("Cluster config for isHybridTableRetentionStrategyEnabled was removed, " | ||
| + "reverting from {} to default ({})", oldValue, defaultValue); | ||
| } else { | ||
| _isHybridTableRetentionStrategyEnabled = parsedValue; | ||
| LOGGER.info("Updated isHybridTableRetentionStrategyEnabled from {} to {}", oldValue, parsedValue); | ||
| LOGGER.info("Cluster config for isHybridTableRetentionStrategyEnabled was removed, " | ||
| + "already at default ({})", defaultValue); | ||
| } | ||
| } catch (Exception e) { | ||
| return; | ||
| } | ||
|
|
||
| // Validate that the value is a proper boolean string | ||
| if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) { | ||
| LOGGER.warn("Invalid value for isHybridTableRetentionStrategyEnabled: {}, keeping current value: {}", newValue, | ||
| oldValue); | ||
| return; | ||
| } | ||
|
|
||
| boolean parsedValue = Boolean.parseBoolean(newValue); | ||
| if (oldValue == parsedValue) { | ||
| LOGGER.info("No change in isHybridTableRetentionStrategyEnabled, current value: {}", oldValue); | ||
| } else { | ||
| _isHybridTableRetentionStrategyEnabled = parsedValue; | ||
| LOGGER.info("Updated isHybridTableRetentionStrategyEnabled from {} to {}", oldValue, parsedValue); | ||
| } | ||
| } | ||
|
|
||
| private void updateRetentionCreationTimeFallbackEnabled(String newValue) { | ||
| boolean oldValue = _useCreationTimeFallbackForRetention; | ||
|
|
||
| // When the cluster config key is deleted, newValue will be null. | ||
| // Reset to default since this flag gates destructive retention deletion. | ||
| boolean defaultValue = | ||
| ControllerConf.ControllerPeriodicTasksConf.DEFAULT_ENABLE_RETENTION_CREATION_TIME_FALLBACK; | ||
| if (newValue == null) { | ||
| if (oldValue != defaultValue) { | ||
| _useCreationTimeFallbackForRetention = defaultValue; | ||
| LOGGER.info("Cluster config for retentionCreationTimeFallbackEnabled was removed, " | ||
| + "reverting from {} to default ({})", oldValue, defaultValue); | ||
| } else { | ||
| LOGGER.info("Cluster config for retentionCreationTimeFallbackEnabled was removed, " | ||
| + "already at default ({})", defaultValue); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Validate that the value is a proper boolean string | ||
| if (!"true".equalsIgnoreCase(newValue) && !"false".equalsIgnoreCase(newValue)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When this cluster config key is deleted,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Made changes for other config for this class as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ^^ @xiangfu0, is this okay. If so, I will merge. |
||
| LOGGER.warn("Invalid value for retentionCreationTimeFallbackEnabled: {}, keeping current value: {}", newValue, | ||
| oldValue); | ||
| return; | ||
| } | ||
|
|
||
| boolean parsedValue = Boolean.parseBoolean(newValue); | ||
| if (oldValue == parsedValue) { | ||
| LOGGER.info("No change in retentionCreationTimeFallbackEnabled, current value: {}", oldValue); | ||
| } else { | ||
| _useCreationTimeFallbackForRetention = parsedValue; | ||
| LOGGER.info("Updated retentionCreationTimeFallbackEnabled from {} to {}", oldValue, parsedValue); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public boolean isUntrackedSegmentDeletionEnabled() { | ||
| return _untrackedSegmentDeletionEnabled; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public boolean isRetentionCreationTimeFallbackEnabled() { | ||
| return _useCreationTimeFallbackForRetention; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets extract this into a util method to be used where this is duplicated here.