Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,56 @@

package org.apache.ignite.internal.processors.cache;

import java.util.function.Supplier;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/**
*
*/
public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;

/** */
abstract class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
/** Cache name. */
protected static final String CACHE_NAME = "cache";

/** Cache configuration. */
private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setReadFromBackup(false)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 64));

/** Communication SPI supplier. */
protected Supplier<CommunicationSpi> communicationSpiSupplier = TestRecordingCommunicationSpi::new;

/** Discovery SPI supplier. */
protected Supplier<DiscoverySpi> discoverySpiSupplier = TcpDiscoverySpi::new;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
return super.getConfiguration(igniteInstanceName)
.setConsistentId(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL)
.setReadFromBackup(false)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 64)));
}

/** */
protected IgniteConfiguration getConfiguration(int nodeIdx, TcpDiscoverySpi discoSpi) throws Exception {
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));

cfg.setConsistentId(igniteInstanceName);
cfg.setCacheConfiguration(ccfg);
cfg.setCommunicationSpi(communicationSpiSupplier.get());
cfg.setDiscoverySpi(discoverySpiSupplier.get());
cfg.setDiscoverySpi(discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));

return cfg;
}

/**
*
*/
/** */
static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
/** Interceptor. */
protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor;
private final IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor;

/** */
NodeJoinInterceptingDiscoverySpi(IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor) {
this.interceptor = interceptor;
}

/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
Expand All @@ -80,12 +75,15 @@ static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
}
}

/**
*
*/
/** */
static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi {
/** Interceptor. */
protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
private final IgniteInClosure<DiscoveryCustomMessage> interceptor;

/** */
CustomMessageInterceptingDiscoverySpi(IgniteInClosure<DiscoveryCustomMessage> interceptor) {
this.interceptor = interceptor;
}

/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
Expand All @@ -28,31 +29,22 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
*
*/
/** */
public class ClientSlowDiscoveryTopologyChangeTest extends ClientSlowDiscoveryAbstractTest {
/**
*
*/
@Before
public void before() throws Exception {
/** {@inheritDoc} */
@Override public void beforeTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();
}

/**
*
*/
@After
public void after() throws Exception {
/** {@inheritDoc} */
@Override public void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();
Expand All @@ -75,53 +67,25 @@ public void testClientJoinAndCacheStop() throws Exception {
for (int k = 0; k < 64; k++)
crd.cache(CACHE_NAME).put(k, k);

TestRecordingCommunicationSpi clientCommSpi = new TestRecordingCommunicationSpi();

// Delay client join process.
clientCommSpi.blockMessages((node, msg) -> {
if (!(msg instanceof GridDhtPartitionsSingleMessage))
return false;
CountDownLatch cliDiscoSpiUnblockedLatch = new CountDownLatch(1);

GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;

return Optional.ofNullable(singleMsg.exchangeId())
.map(GridDhtPartitionExchangeId::topologyVersion)
.filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 0)))
.isPresent();
});

communicationSpiSupplier = () -> clientCommSpi;

CustomMessageInterceptingDiscoverySpi clientDiscoSpi = new CustomMessageInterceptingDiscoverySpi();

CountDownLatch clientDiscoSpiBlock = new CountDownLatch(1);

// Delay cache destroying on client node.
clientDiscoSpi.interceptor = (msg) -> {
if (!(msg instanceof DynamicCacheChangeBatch))
return;

DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)msg;

boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
.anyMatch(req -> req.stop() && req.cacheName().equals(CACHE_NAME));
IgniteConfiguration cliCfg = getConfiguration(3, createBlockingDiscoverySpi(cliDiscoSpiUnblockedLatch));

if (hasCacheStopReq)
U.awaitQuiet(clientDiscoSpiBlock);
};
TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cliCfg.getCommunicationSpi();

discoverySpiSupplier = () -> clientDiscoSpi;
// Delay client join process.
blockSingleMessage(commSpi);

IgniteInternalFuture<IgniteEx> clientStartFut = GridTestUtils.runAsync(() -> startClientGrid(3));
IgniteInternalFuture<IgniteEx> clientStartFut = GridTestUtils.runAsync(() -> startClientGrid(cliCfg));

// Wait till client node starts join process.
clientCommSpi.waitForBlocked();
commSpi.waitForBlocked();

// Destroy cache on server nodes.
crd.destroyCache(CACHE_NAME);

// Resume client join.
clientCommSpi.stopBlock();
commSpi.stopBlock();

// Client join should succeed.
IgniteEx client = clientStartFut.get();
Expand All @@ -143,7 +107,7 @@ public void testClientJoinAndCacheStop() throws Exception {
}
finally {
// Resume processing cache destroy on client node.
clientDiscoSpiBlock.countDown();
cliDiscoSpiUnblockedLatch.countDown();
}

// Wait till cache destroyed on client node.
Expand All @@ -157,4 +121,35 @@ public void testClientJoinAndCacheStop() throws Exception {

Assert.assertNull("Cache should be destroyed on client node", client.cache(CACHE_NAME));
}

/** */
private TcpDiscoverySpi createBlockingDiscoverySpi(CountDownLatch discoSpiUnblockedLatch) {
return new CustomMessageInterceptingDiscoverySpi(msg -> {
if (!(msg instanceof DynamicCacheChangeBatch))
return;

DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)msg;

boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
.anyMatch(req -> req.stop() && req.cacheName().equals(CACHE_NAME));

if (hasCacheStopReq)
U.awaitQuiet(discoSpiUnblockedLatch);
});
}

/** */
private void blockSingleMessage(TestRecordingCommunicationSpi commSpi) {
commSpi.blockMessages((node, msg) -> {
if (!(msg instanceof GridDhtPartitionsSingleMessage))
return false;

GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;

return Optional.ofNullable(singleMsg.exchangeId())
.map(GridDhtPartitionExchangeId::topologyVersion)
.filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 0)))
.isPresent();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -204,9 +201,9 @@ private static boolean shouldBeTested(TransactionConcurrency concurrency, Transa
/**
* Interface to work with cache operations within transaction.
*/
private static interface TestTransaction<K, V> {
private interface TestTransaction<K, V> {
/** Possible operations. */
static int POSSIBLE_OPERATIONS = 5;
int POSSIBLE_OPERATIONS = 5;

/**
* @param key Key.
Expand Down Expand Up @@ -347,7 +344,7 @@ public void consistencyCheck() {
public IgniteInClosure<TestTransaction<?, ?>> operation;

/** Client disco spi block. */
private CountDownLatch clientDiscoSpiBlock;
private CountDownLatch cliDiscoSpiUnblockedLatch;

/** Client node to perform operations. */
private IgniteEx clnt;
Expand All @@ -368,34 +365,25 @@ public void consistencyCheck() {
cleanPersistenceDir();
}

/** */
@Before
public void before() throws Exception {
NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new NodeJoinInterceptingDiscoverySpi();

clientDiscoSpiBlock = new CountDownLatch(1);
/** {@inheritDoc} */
@Override public void beforeTest() throws Exception {
cliDiscoSpiUnblockedLatch = new CountDownLatch(1);

// Delay node join of second client.
clientDiscoSpi.interceptor = msg -> {
NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new NodeJoinInterceptingDiscoverySpi(msg -> {
if (msg.nodeId().toString().endsWith("2"))
U.awaitQuiet(clientDiscoSpiBlock);
};

discoverySpiSupplier = () -> clientDiscoSpi;
U.awaitQuiet(cliDiscoSpiUnblockedLatch);
});

clnt = startClientGrid(1);
clnt = startClientGrid(getConfiguration(1, clientDiscoSpi));

for (int k = 0; k < 64; k++)
clnt.cache(CACHE_NAME).put(k, 0);

discoverySpiSupplier = TcpDiscoverySpi::new;

startClientGrid(2);
}

/** */
@After
public void after() throws Exception {
/** {@inheritDoc} */
@Override public void afterTest() throws Exception {
// Stop client nodes.
stopGrid(1);
stopGrid(2);
Expand All @@ -421,7 +409,7 @@ public void testTransactionRemap() throws Exception {
// Expected.
}
finally {
clientDiscoSpiBlock.countDown();
cliDiscoSpiUnblockedLatch.countDown();
}

// After resume second client join, transaction should succesfully await new affinity and commit.
Expand Down Expand Up @@ -451,7 +439,7 @@ public void testTransactionRemapWithTimeout() throws Exception {
// Expected.
}
finally {
clientDiscoSpiBlock.countDown();
cliDiscoSpiUnblockedLatch.countDown();
}

// After resume second client join, transaction should be timed out and rolled back.
Expand Down
Loading