Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
6.0-alpha1
* Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209)
* Improve performance when calculating settled placements during range movements (CASSANDRA-21144)
* Make shadow gossip round parameters configurable for testing (CASSANDRA-21149)
* Avoid potential gossip thread deadlock during decommission (CASSANDRA-21143)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -30,6 +32,7 @@
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.SystemDistributedKeyspace;

/**
Expand All @@ -48,7 +51,9 @@ public class CompressionDictionaryScheduler implements ICompressionDictionarySch
private final String tableName;
private final String tableId;
private final ICompressionDictionaryCache cache;
private final AtomicBoolean manualTrainingInProgress = new AtomicBoolean(false);
private final AtomicBoolean trainingInProgress = new AtomicBoolean(false);
private final AtomicReference<TrainingState> lastTrainingState = new AtomicReference<>(TrainingState.notStarted());
private volatile ICompressionDictionaryTrainer activeTrainer;

private volatile ScheduledFuture<?> scheduledRefreshTask;
private volatile boolean isEnabled;
Expand Down Expand Up @@ -83,31 +88,59 @@ public void scheduleRefreshTask()
}

@Override
public void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
ColumnFamilyStore.RefViewFragment refViewFragment,
public void scheduleSSTableBasedTraining(ColumnFamilyStore.RefViewFragment refViewFragment,
CompressionParams compressionParams,
CompressionDictionaryTrainingConfig config,
Consumer<CompressionDictionary> listener,
boolean force)
{
if (!manualTrainingInProgress.compareAndSet(false, true))
if (!trainingInProgress.compareAndSet(false, true))
{
refViewFragment.close();
throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName);
}

logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables",
keyspaceName, tableName, refViewFragment.sstables.size());
ICompressionDictionaryTrainer trainer;

// Run the SSTableSamplingTask asynchronously
SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force);
ScheduledExecutors.nonPeriodicTasks.submit(task);
try
{
trainer = ICompressionDictionaryTrainer.create(keyspaceName, tableName, compressionParams);
Copy link
Copy Markdown
Contributor Author

@smiklosovic smiklosovic Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whole execution chain (from manager.train) does everything to postpone trainer creation until it is absolutely necessary and all is OK, as the instantiation of a trainer might be memory-wise very demanding (when max sample size is not trivial) as it allocates a direct ByteBuffer. We do not want to create a trainer allocating a big buffer just to throw it away if something else goes south.

trainer.setDictionaryTrainedListener(listener);
}
catch (Throwable t)
{
trainingInProgress.set(false);
refViewFragment.close();
throw t;
}

if (trainer.start(config))
{
activeTrainer = trainer;
lastTrainingState.set(trainer.getTrainingState());
logger.info("Starting SSTable-based dictionary training for {}.{} from {} SSTables",
keyspaceName, tableName, refViewFragment.sstables.size());

SSTableSamplingTask task = new SSTableSamplingTask(refViewFragment, trainer, config, force);
// trainer is eventually closed here, as well as indicating
// in manualTrainingInProgress that it was finished
ScheduledExecutors.nonPeriodicTasks.submit(task);
}
else
{
finishTraining(trainer.getTrainingState());
cleanup(refViewFragment, trainer);
}
}

/**
* Cancels the in-progress manual training task.
*/
private void cancelManualTraining()
private void finishTraining(TrainingState trainingState)
{
manualTrainingInProgress.compareAndSet(true, false);
lastTrainingState.set(trainingState);
activeTrainer = null;
trainingInProgress.compareAndSet(true, false);
}

/**
Expand All @@ -121,6 +154,15 @@ public void setEnabled(boolean enabled)
this.isEnabled = enabled;
}

@Override
public TrainingState getLastTrainingState()
{
ICompressionDictionaryTrainer trainer = activeTrainer;
if (trainer != null)
return trainer.getTrainingState();
return lastTrainingState.get();
}

/**
* Refreshes dictionary from system table and updates the cache.
* This method is called periodically by the scheduled refresh task.
Expand Down Expand Up @@ -153,7 +195,7 @@ public void close()
scheduledRefreshTask = null;
}

cancelManualTraining();
finishTraining(TrainingState.notStarted());
}

/**
Expand Down Expand Up @@ -195,7 +237,6 @@ public void run()
// Use the force parameter from the task
trainer.trainDictionaryAsync(force)
.addCallback((dictionary, throwable) -> {
cancelManualTraining();
if (throwable != null)
{
logger.error("SSTable-based dictionary training failed for {}.{}: {}",
Expand All @@ -206,23 +247,36 @@ public void run()
logger.info("SSTable-based dictionary training completed for {}.{}",
keyspaceName, tableName);
}

finishTraining(trainer.getTrainingState());
cleanup(refViewFragment, trainer);
});
}
catch (Exception e)
{
logger.error("Failed to sample from SSTables for {}.{}", keyspaceName, tableName, e);
cancelManualTraining();
}
finally
{
refViewFragment.close();
Copy link
Copy Markdown
Contributor Author

@smiklosovic smiklosovic Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think what we did here was too smart (same concept was there before we started to selectAndReference) because training is done asychronously, so this method returns and finally is called before the sampling is actually finished. We should close in callback, as done above, or only in case we catch exception, as done here.

Or no?

trainer.trainDictionaryAsync(force).addCallback

This "addCallback" makes synchronous call from that? I do not think so, it just registers what should be done after it is finished, but it is not a blocking call, I guess.

finishTraining(trainer.getTrainingState());
cleanup(refViewFragment, trainer);
}
}
}

private void cleanup(ColumnFamilyStore.RefViewFragment refViewFragment, ICompressionDictionaryTrainer trainer)
{
try
{
trainer.close();
}
catch (Throwable t)
{
logger.debug("Unable to close trainer.", t);
}
refViewFragment.close();
}

@VisibleForTesting
boolean isManualTrainingRunning()
boolean isTrainingRunning()
{
return manualTrainingInProgress.get();
return trainingInProgress.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@

package org.apache.cassandra.db.compression;

import java.util.Map;

import com.google.common.base.Preconditions;

import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.CompressionParams;

import static java.lang.String.format;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MIN_FREQUENCY;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME;

/**
* Configuration for dictionary training parameters.
*/
Expand All @@ -29,13 +44,15 @@ public class CompressionDictionaryTrainingConfig
public final int maxTotalSampleSize;
public final int acceptableTotalSampleSize;
public final int chunkSize;
public final int minTrainingFrequency;

private CompressionDictionaryTrainingConfig(Builder builder)
{
this.maxDictionarySize = builder.maxDictionarySize;
this.maxTotalSampleSize = builder.maxTotalSampleSize;
this.acceptableTotalSampleSize = builder.maxTotalSampleSize / 10 * 8;
this.chunkSize = builder.chunkSize;
this.minTrainingFrequency = builder.minTrainingFrequency;
}

public static Builder builder()
Expand All @@ -48,6 +65,7 @@ public static class Builder
private int maxDictionarySize = 65536; // 64KB default
private int maxTotalSampleSize = 10 * 1024 * 1024; // 10MB total
private int chunkSize = 64 * 1024; // 64KB default
private int minTrainingFrequency = 0; // in minutes

public Builder maxDictionarySize(int size)
{
Expand All @@ -67,12 +85,121 @@ public Builder chunkSize(int chunkSize)
return this;
}

public Builder minTrainingFrequency(int minTrainingFrequency)
{
this.minTrainingFrequency = minTrainingFrequency;
return this;
}

public CompressionDictionaryTrainingConfig build()
{
Preconditions.checkArgument(maxDictionarySize > 0, "maxDictionarySize must be positive");
Preconditions.checkArgument(maxTotalSampleSize > 0, "maxTotalSampleSize must be positive");
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be positive");
Preconditions.checkArgument(minTrainingFrequency >= 0, "min training frequency must be non-negative");
return new CompressionDictionaryTrainingConfig(this);
}
}

public static int getMaxDictionarySize(Map<String, String> params)
{
return validateSizeBasedTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
params.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE));
}

public static int getMaxTotalSampleSize(Map<String, String> params)
{
return validateSizeBasedTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
params.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE));
}

public static int getMinTrainingFrequency(Map<String, String> params)
{
return validateDurationBasedTrainingParameter(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
params.getOrDefault(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
DEFAULT_TRAINING_MIN_FREQUENCY));
}

public static int getMaxDictionarySizeWithUserSuppliedParams(CompressionParams compressionParams, Map<String, String> parameters)
{
return internalTrainingParameterResolution(compressionParams,
parameters.get(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME),
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE);
}

public static int getMaxTotalSampleSizeWithUserSuppliedParams(CompressionParams compressionParams, Map<String, String> parameters)
{
return internalTrainingParameterResolution(compressionParams,
parameters.get(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME),
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE);
}

private static int internalTrainingParameterResolution(CompressionParams compressionParams,
String userSuppliedValue,
String parameterName,
String defaultParameterValue)
{
String resolvedValue = null;
try
{
if (userSuppliedValue == null)
resolvedValue = compressionParams.getOtherOptions().getOrDefault(parameterName, defaultParameterValue);
else
resolvedValue = userSuppliedValue;

return new DataStorageSpec.IntBytesBound(resolvedValue).toBytes();
}
catch (Throwable t)
{
throw new IllegalArgumentException(String.format("Invalid value for %s: %s", parameterName, resolvedValue));
}
}

/**
* Validates value of a parameter for training purposes. The value to validate should
* be accepted by {@link DataStorageSpec.IntKibibytesBound}. This method is used upon validation
* of input parameters in the implementations of dictionary compressor.
*
* @param parameterName name of a parameter to validate
* @param resolvedValue value to validate
* @return resolved value in bytes
*/
static int validateSizeBasedTrainingParameter(String parameterName, String resolvedValue)
{
try
{
return new DataStorageSpec.IntBytesBound(resolvedValue).toBytes();
}
catch (Throwable t)
{
throw new ConfigurationException(format("Unable to set value to parameter %s: %s. Reason: %s",
parameterName, resolvedValue, t.getMessage()));
}
}

/**
* Validates value of a parameter for training purposes. The value to validate should
* be accepted by {@link DurationSpec.IntMinutesBound}. This method is used upon validation of input parameters
* in the implementation of dictionary compressor.
*
* @param parameterName name of a parameter to validate
* @param resolvedValue value to validate
* @return resolved value in minutes
*/
static int validateDurationBasedTrainingParameter(String parameterName, String resolvedValue)
{
try
{
return new DurationSpec.IntMinutesBound(resolvedValue).toMinutes();
}
catch (Throwable t)
{
throw new ConfigurationException(format("Unable to set value to parameter %s: %s. Reason: %s",
parameterName, resolvedValue, t.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.cassandra.db.compression;

import java.util.function.Consumer;

import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
import org.apache.cassandra.schema.CompressionParams;

/**
* Interface for managing scheduled tasks for compression dictionary operations.
Expand All @@ -38,15 +41,24 @@ public interface ICompressionDictionaryScheduler extends AutoCloseable
/**
* Schedules SSTable-based training that samples from existing SSTables.
*
* @param trainer the trainer to use
* A caller of this method should ensure that SSTables referred in {@code refViewFragment} are closed
* eventually, either directly at the end of that method or by other means, when training is running
* asynchronously.
*
* A caller of this method might assume that {@code trainer} might be closed after this method finishes, either
* directly in this method or indirectly when training is running asynchronously.
*
* @param refViewFragment the view of SSTables to sample from
* @param compressionParams parameters for compression
* @param config the training configuration
* @param listener listener invoked when a dictionary is trained
* @param force force the dictionary training even if there are not enough samples
* @throws IllegalStateException if training is already in progress
*/
void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
RefViewFragment refViewFragment,
void scheduleSSTableBasedTraining(RefViewFragment refViewFragment,
CompressionParams compressionParams,
CompressionDictionaryTrainingConfig config,
Consumer<CompressionDictionary> listener,
boolean force);

/**
Expand All @@ -55,4 +67,6 @@ void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer trainer,
* @param enabled whether the scheduler should be enabled
*/
void setEnabled(boolean enabled);

TrainingState getLastTrainingState();
}
Loading
Loading