Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ protected void setup()
// Prepared statements
QueryProcessor.instance.preloadPreparedStatements();

// Apply overrides before re-enabling auto-compaction
setCompactionStrategyOverrides(Schema.instance.getKeyspaces());
// re-enable auto-compaction after replay, so correct disk boundaries are used
enableAutoCompaction(Schema.instance.getKeyspaces());

// start server internals
StorageService.instance.registerDaemon(this);
try
Expand Down Expand Up @@ -421,11 +426,6 @@ protected void setup()
ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS);
StorageService.instance.doAuthSetup();

// Apply overrides before re-enabling auto-compaction
setCompactionStrategyOverrides(Schema.instance.getKeyspaces());
// re-enable auto-compaction after replay, so correct disk boundaries are used
enableAutoCompaction(Schema.instance.getKeyspaces());

AuditLogManager.instance.initialize();

StorageService.instance.doAutoRepairSetup();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.ring;

import java.util.concurrent.Callable;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;

import org.junit.Test;

import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.SequenceState;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertTrue;

public class BootstrapCompactionTest extends TestBaseImpl
{
@Test
public void testCompactionEnabledDuringBootstrap() throws Exception
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;

try (Cluster cluster = init(builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withInstanceInitializer(BB::install)
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start()))
{
cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)"));
IInstanceConfig config = cluster.newInstanceConfig()
.set(Constants.KEY_DTEST_FULL_STARTUP, true)
.set("auto_bootstrap", true);

IInvokableInstance newInstance = cluster.bootstrap(config);
// BB below asserts that autocompaction is enabled at each step in the join sequence
newInstance.startup(cluster);
}
}

public static class BB
{
public static void install(ClassLoader cl, int i)
{
if (i == 3)
{
new ByteBuddy().rebase(BootstrapAndJoin.class)
.method(named("executeNext"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
}

public static SequenceState executeNext(@SuperCall Callable<SequenceState> zuper) throws Exception
{
boolean isEnabled = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getCompactionStrategyManager().isEnabled();
assertTrue("Autocompaction should be enabled during the bootstrap", isEnabled);
return zuper.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ public void testAutomaticUpgradeConcurrency() throws Exception
// inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
// sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch inFindUpgradeSSTables = new CountDownLatch(1);
AtomicInteger upgradeTaskCount = new AtomicInteger(0);
MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, inFindUpgradeSSTables, upgradeTaskCount);

CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
Expand All @@ -224,7 +225,7 @@ public void testAutomaticUpgradeConcurrency() throws Exception
// due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks
Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr));
t.start();
Thread.sleep(100); // let the thread start and grab the task
inFindUpgradeSSTables.await();
assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get());
assertFalse(r.maybeRunUpgradeTask(mgr));
assertFalse(r.maybeRunUpgradeTask(mgr));
Expand All @@ -246,8 +247,9 @@ public void testAutomaticUpgradeConcurrency2() throws Exception
// inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
// sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch inFindUpgradeSSTables = new CountDownLatch(2);
AtomicInteger upgradeTaskCount = new AtomicInteger();
MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, inFindUpgradeSSTables, upgradeTaskCount);

CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
Expand All @@ -259,7 +261,7 @@ public void testAutomaticUpgradeConcurrency2() throws Exception
t.start();
Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr));
t2.start();
Thread.sleep(100); // let the threads start and grab the task
inFindUpgradeSSTables.await();
assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get());
assertFalse(r.maybeRunUpgradeTask(mgr));
assertFalse(r.maybeRunUpgradeTask(mgr));
Expand Down Expand Up @@ -619,30 +621,34 @@ private static class MockCFS extends ColumnFamilyStore
private static class MockCFSForCSM extends ColumnFamilyStore
{
private final CountDownLatch latch;
private final CountDownLatch inFindUpgradeSSTables;
private final AtomicInteger upgradeTaskCount;

private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, CountDownLatch inFindUpgradeSSTables, AtomicInteger upgradeTaskCount)
{
super(cfs.keyspace, cfs.name, Util.newSeqGen(10), cfs.metadata.get(), cfs.getDirectories(), true, false);
this.latch = latch;
this.inFindUpgradeSSTables = inFindUpgradeSSTables;
this.upgradeTaskCount = upgradeTaskCount;
}
@Override
public CompactionStrategyManager getCompactionStrategyManager()
{
return new MockCSM(this, latch, upgradeTaskCount);
return new MockCSM(this, latch, inFindUpgradeSSTables, upgradeTaskCount);
}
}

private static class MockCSM extends CompactionStrategyManager
{
private final CountDownLatch latch;
private final AtomicInteger upgradeTaskCount;
private final CountDownLatch inFindUpgradeSSTables;

private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, CountDownLatch inFindUpgradeSSTables, AtomicInteger upgradeTaskCount)
{
super(cfs);
this.latch = latch;
this.inFindUpgradeSSTables = inFindUpgradeSSTables;
this.upgradeTaskCount = upgradeTaskCount;
}

Expand All @@ -651,6 +657,7 @@ public AbstractCompactionTask findUpgradeSSTableTask()
{
try
{
inFindUpgradeSSTables.countDown();
latch.await();
upgradeTaskCount.incrementAndGet();
}
Expand Down