-
Notifications
You must be signed in to change notification settings - Fork 3.9k
CASSANDRA-21209 Rework ZSTD dictionary compression logic to create a trainer per training #4667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ | |
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.Consumer; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
|
|
||
|
|
@@ -30,6 +32,7 @@ | |
| import org.apache.cassandra.concurrent.ScheduledExecutors; | ||
| import org.apache.cassandra.config.DatabaseDescriptor; | ||
| import org.apache.cassandra.db.ColumnFamilyStore; | ||
| import org.apache.cassandra.schema.CompressionParams; | ||
| import org.apache.cassandra.schema.SystemDistributedKeyspace; | ||
|
|
||
| /** | ||
|
|
@@ -48,7 +51,9 @@ public class CompressionDictionaryScheduler implements ICompressionDictionarySch | |
| private final String tableName; | ||
| private final String tableId; | ||
| private final ICompressionDictionaryCache cache; | ||
| private final AtomicBoolean manualTrainingInProgress = new AtomicBoolean(false); | ||
| private final AtomicBoolean trainingInProgress = new AtomicBoolean(false); | ||
| private final AtomicReference<TrainingState> lastTrainingState = new AtomicReference<>(TrainingState.notStarted()); | ||
| private volatile ICompressionDictionaryTrainer activeTrainer; | ||
|
|
||
| private volatile ScheduledFuture<?> scheduledRefreshTask; | ||
| private volatile boolean isEnabled; | ||
|
|
@@ -83,31 +88,59 @@ public void scheduleRefreshTask() | |
| } | ||
|
|
||
| @Override | ||
| public void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer, | ||
| ColumnFamilyStore.RefViewFragment refViewFragment, | ||
| public void scheduleSSTableBasedTraining(ColumnFamilyStore.RefViewFragment refViewFragment, | ||
| CompressionParams compressionParams, | ||
| CompressionDictionaryTrainingConfig config, | ||
| Consumer<CompressionDictionary> listener, | ||
| boolean force) | ||
| { | ||
| if (!manualTrainingInProgress.compareAndSet(false, true)) | ||
| if (!trainingInProgress.compareAndSet(false, true)) | ||
| { | ||
| refViewFragment.close(); | ||
| throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName); | ||
| } | ||
|
|
||
| logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables", | ||
| keyspaceName, tableName, refViewFragment.sstables.size()); | ||
| ICompressionDictionaryTrainer trainer; | ||
|
|
||
| // Run the SSTableSamplingTask asynchronously | ||
| SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force); | ||
| ScheduledExecutors.nonPeriodicTasks.submit(task); | ||
| try | ||
| { | ||
| trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, compressionParams); | ||
| trainer.setDictionaryTrainedListener(listener); | ||
| } | ||
| catch (Throwable t) | ||
| { | ||
| trainingInProgress.set(false); | ||
| refViewFragment.close(); | ||
| throw t; | ||
| } | ||
|
|
||
| if (trainer.start(config)) | ||
| { | ||
| activeTrainer = trainer; | ||
| lastTrainingState.set(trainer.getTrainingState()); | ||
| logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables", | ||
| keyspaceName, tableName, refViewFragment.sstables.size()); | ||
|
|
||
| SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force); | ||
| // trainer is eventually closed here, as well as indicating | ||
| // in manualTrainingInProgress that it was finished | ||
| ScheduledExecutors.nonPeriodicTasks.submit(task); | ||
| } | ||
| else | ||
| { | ||
| finishTraining(trainer.getTrainingState()); | ||
| cleanup(refViewFragment, trainer); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Cancels the in-progress manual training task. | ||
| */ | ||
| private void cancelManualTraining() | ||
| private void finishTraining(TrainingState trainingState) | ||
| { | ||
| manualTrainingInProgress.compareAndSet(true, false); | ||
| lastTrainingState.set(trainingState); | ||
| activeTrainer = null; | ||
| trainingInProgress.compareAndSet(true, false); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -121,6 +154,15 @@ public void setEnabled(boolean enabled) | |
| this.isEnabled = enabled; | ||
| } | ||
|
|
||
| @Override | ||
| public TrainingState getLastTrainingState() | ||
| { | ||
| ICompressionDictionaryTrainer trainer = activeTrainer; | ||
| if (trainer != null) | ||
| return trainer.getTrainingState(); | ||
| return lastTrainingState.get(); | ||
| } | ||
|
|
||
| /** | ||
| * Refreshes dictionary from system table and updates the cache. | ||
| * This method is called periodically by the scheduled refresh task. | ||
|
|
@@ -153,7 +195,7 @@ public void close() | |
| scheduledRefreshTask = null; | ||
| } | ||
|
|
||
| cancelManualTraining(); | ||
| finishTraining(TrainingState.notStarted()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -195,7 +237,6 @@ public void run() | |
| // Use the force parameter from the task | ||
| trainer.trainDictionaryAsync(force) | ||
| .addCallback((dictionary, throwable) -> { | ||
| cancelManualTraining(); | ||
| if (throwable != null) | ||
| { | ||
| logger.error("SSTable-based dictionary training failed for {}.{}: {}", | ||
|
|
@@ -206,23 +247,36 @@ public void run() | |
| logger.info("SSTable-based dictionary training completed for {}.{}", | ||
| keyspaceName, tableName); | ||
| } | ||
|
|
||
| finishTraining(trainer.getTrainingState()); | ||
| cleanup(refViewFragment, trainer); | ||
| }); | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| logger.error("Failed to sample from SSTables for {}.{}", keyspaceName, tableName, e); | ||
| cancelManualTraining(); | ||
| } | ||
| finally | ||
| { | ||
| refViewFragment.close(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think what we did here was too smart (same concept was there before we started to Or no? trainer.trainDictionaryAsync(force).addCallback This "addCallback" makes synchronous call from that? I do not think so, it just registers what should be done after it is finished, but it is not a blocking call, I guess. |
||
| finishTraining(trainer.getTrainingState()); | ||
| cleanup(refViewFragment, trainer); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void cleanup(ColumnFamilyStore.RefViewFragment refViewFragment, ICompressionDictionaryTrainer trainer) | ||
| { | ||
| try | ||
| { | ||
| trainer.close(); | ||
| } | ||
| catch (Throwable t) | ||
| { | ||
| logger.debug("Unable to close trainer.", t); | ||
| } | ||
| refViewFragment.close(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| boolean isManualTrainingRunning() | ||
| boolean isTrainingRunning() | ||
| { | ||
| return manualTrainingInProgress.get(); | ||
| return trainingInProgress.get(); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whole execution chain (from manager.train) does everything to postpone trainer creation until it is absolutely necessary and all is OK, as the instantiation of a trainer might be memory-wise very demanding (when max sample size is not trivial) as it allocates a direct ByteBuffer. We do not want to create a trainer allocating a big buffer just to throw it away if something else goes south.