Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import java.io.IOException;
import java.security.DigestException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -140,7 +140,7 @@ public class LogCleaner implements BrokerReconfigurable {
private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
private final LogDirFailureChannel logDirFailureChannel;
private final Time time;
private final List<CleanerThread> cleaners = new ArrayList<>();
private final List<CleanerThread> cleaners = new CopyOnWriteArrayList<>();

/**
* Log cleaner configuration which may be dynamically updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -591,6 +592,48 @@ public void cleanerConfigUpdateTest(CompressionType compressionType) throws Exce
"log should have been compacted: startSize=" + startSize + " compactedSize=" + compactedSize);
}

@Test
public void testGaugeReadsAreNotAffectedByReconfigure() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @Test
    public void testGaugeReadsAreNotAffectedByReconfigure() throws Exception {
        cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS, MIN_COMPACTION_LAG, SEGMENT_SIZE);
        cleaner.startup();

        AbstractConfig config1Thread = makeReconfigureConfig(1);
        AbstractConfig config2Threads = makeReconfigureConfig(2);

        var checkError = CompletableFuture.runAsync(() -> {
            var endtime = System.currentTimeMillis() + Duration.ofSeconds(5).toMillis();
            while (System.currentTimeMillis() < endtime) {
                cleaner.maxOverCleanerThreads(t -> t.lastStats().bufferUtilization());
                cleaner.deadThreadCount();
            }
        });

        var updateCleaner = CompletableFuture.runAsync(() -> {
            var useOne = true;
            var endtime = System.currentTimeMillis() + Duration.ofSeconds(5).toMillis();
            while (System.currentTimeMillis() < endtime) {
                AbstractConfig oldCfg = useOne ? config2Threads : config1Thread;
                AbstractConfig newCfg = useOne ? config1Thread : config2Threads;
                cleaner.reconfigure(oldCfg, newCfg);
                useOne = !useOne;
            }
        });

        checkError.join();
        updateCleaner.join();
    }

cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS, MIN_COMPACTION_LAG, SEGMENT_SIZE);
cleaner.startup();

AbstractConfig config1Thread = makeReconfigureConfig(1);
AbstractConfig config2Thread = makeReconfigureConfig(2);

var checkError = CompletableFuture.runAsync(() -> {
var endtime = System.currentTimeMillis() + Duration.ofSeconds(5).toMillis();
while (System.currentTimeMillis() < endtime) {
cleaner.maxOverCleanerThreads(t -> t.lastStats().bufferUtilization());
cleaner.deadThreadCount();
}
});

var updateCleaner = CompletableFuture.runAsync(() -> {
var useOne = true;
var endtime = System.currentTimeMillis() + Duration.ofSeconds(5).toMillis();
while (System.currentTimeMillis() < endtime) {
AbstractConfig oldCfg = useOne ? config2Thread : config1Thread;
AbstractConfig newCfg = useOne ? config1Thread : config2Thread;
cleaner.reconfigure(oldCfg, newCfg);
useOne = !useOne;
}
});

checkError.join();
updateCleaner.join();
}

private AbstractConfig makeReconfigureConfig(int numThreads) {
// Extend CleanerConfig.CONFIG_DEF with message.max.bytes, which CleanerConfig(AbstractConfig)
// reads via ServerConfigs.MESSAGE_MAX_BYTES_CONFIG but which is not part of CleanerConfig's own ConfigDef.
ConfigDef configDef = new ConfigDef(CleanerConfig.CONFIG_DEF)
.define("message.max.bytes", ConfigDef.Type.INT,
DEFAULT_MAX_MESSAGE_SIZE, ConfigDef.Importance.MEDIUM, "");
Map<String, Object> props = new HashMap<>();
props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, numThreads);
return new AbstractConfig(configDef, props);
}

private void checkLastCleaned(String topic, int partitionId, long firstDirty) throws InterruptedException {
// wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
// TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
Expand Down
Loading