Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,9 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) {
new MessageFactoryProvider[] { new DiscoveryMessageFactory() });

impl.spiStart(igniteInstanceName);

if (ipFinder instanceof TcpDiscoveryMulticastIpFinder)
throw new IllegalStateException("Multicast IP finder is used");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public class CacheFutureExceptionSelfTest extends GridCommonAbstractTest {

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return new IgniteConfiguration()
.setIgniteInstanceName(igniteInstanceName)
return super.getConfiguration(igniteInstanceName)
.setFailureHandler(new StopNodeFailureHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,75 +17,79 @@

package org.apache.ignite.internal.processors.cache;

import java.util.function.Supplier;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/**
*
*/
public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;

/** */
abstract class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
/** Cache name. */
protected static final String CACHE_NAME = "cache";

/** Cache configuration. */
private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setReadFromBackup(false)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 64));

/** Communication SPI supplier. */
protected Supplier<CommunicationSpi> communicationSpiSupplier = TestRecordingCommunicationSpi::new;

/** Discovery SPI supplier. */
protected Supplier<DiscoverySpi> discoverySpiSupplier = TcpDiscoverySpi::new;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setConsistentId(igniteInstanceName);
cfg.setCacheConfiguration(ccfg);
cfg.setCommunicationSpi(communicationSpiSupplier.get());
cfg.setDiscoverySpi(discoverySpiSupplier.get());
return super.getConfiguration(igniteInstanceName)
.setConsistentId(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi())
.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL)
.setReadFromBackup(false)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 64)));
}

return cfg;
/** */
protected IgniteConfiguration getConfiguration(int nodeIdx, TcpDiscoverySpi discoSpi) throws Exception {
return getConfiguration(getTestIgniteInstanceName(nodeIdx))
.setDiscoverySpi(discoSpi);
}

/**
*
*/
/** */
static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
/** Interceptor. */
protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor;
private final IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor;

/** */
private NodeJoinInterceptingDiscoverySpi(IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor) {
this.interceptor = interceptor;
}

/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && interceptor != null)
interceptor.apply((TcpDiscoveryNodeAddFinishedMessage)msg);
}

/** */
static NodeJoinInterceptingDiscoverySpi create(IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> interceptor) {
NodeJoinInterceptingDiscoverySpi spi = new NodeJoinInterceptingDiscoverySpi(interceptor);

spi.setIpFinder(sharedStaticIpFinder);

return spi;
}
}

/**
*
*/
/** */
static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi {
/** Interceptor. */
protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
private final IgniteInClosure<DiscoveryCustomMessage> interceptor;

/** */
private CustomMessageInterceptingDiscoverySpi(IgniteInClosure<DiscoveryCustomMessage> interceptor) {
this.interceptor = interceptor;
}

/** {@inheritDoc} */
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
Expand All @@ -106,5 +110,14 @@ static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi {
if (interceptor != null)
interceptor.apply(U.unwrapCustomMessage(cm.message()));
}

/** */
static CustomMessageInterceptingDiscoverySpi create(IgniteInClosure<DiscoveryCustomMessage> interceptor) {
CustomMessageInterceptingDiscoverySpi spi = new CustomMessageInterceptingDiscoverySpi(interceptor);

spi.setIpFinder(sharedStaticIpFinder);

return spi;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
Expand All @@ -28,31 +29,22 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
*
*/
/** */
public class ClientSlowDiscoveryTopologyChangeTest extends ClientSlowDiscoveryAbstractTest {
/**
*
*/
@Before
public void before() throws Exception {
/** {@inheritDoc} */
@Override public void beforeTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();
}

/**
*
*/
@After
public void after() throws Exception {
/** {@inheritDoc} */
@Override public void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();
Expand All @@ -75,53 +67,24 @@ public void testClientJoinAndCacheStop() throws Exception {
for (int k = 0; k < 64; k++)
crd.cache(CACHE_NAME).put(k, k);

TestRecordingCommunicationSpi clientCommSpi = new TestRecordingCommunicationSpi();

// Delay client join process.
clientCommSpi.blockMessages((node, msg) -> {
if (!(msg instanceof GridDhtPartitionsSingleMessage))
return false;
CountDownLatch cliDiscoSpiUnblockedLatch = new CountDownLatch(1);

GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;

return Optional.ofNullable(singleMsg.exchangeId())
.map(GridDhtPartitionExchangeId::topologyVersion)
.filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 0)))
.isPresent();
});

communicationSpiSupplier = () -> clientCommSpi;

CustomMessageInterceptingDiscoverySpi clientDiscoSpi = new CustomMessageInterceptingDiscoverySpi();

CountDownLatch clientDiscoSpiBlock = new CountDownLatch(1);

// Delay cache destroying on client node.
clientDiscoSpi.interceptor = (msg) -> {
if (!(msg instanceof DynamicCacheChangeBatch))
return;

DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)msg;

boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
.anyMatch(req -> req.stop() && req.cacheName().equals(CACHE_NAME));
IgniteConfiguration cliCfg = getConfiguration(3, createBlockingDiscoverySpi(cliDiscoSpiUnblockedLatch));

if (hasCacheStopReq)
U.awaitQuiet(clientDiscoSpiBlock);
};
TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi)cliCfg.getCommunicationSpi();

discoverySpiSupplier = () -> clientDiscoSpi;
blockSingleMessage(commSpi);

IgniteInternalFuture<IgniteEx> clientStartFut = GridTestUtils.runAsync(() -> startClientGrid(3));
IgniteInternalFuture<IgniteEx> clientStartFut = GridTestUtils.runAsync(() -> startClientGrid(cliCfg));

// Wait till client node starts join process.
clientCommSpi.waitForBlocked();
commSpi.waitForBlocked();

// Destroy cache on server nodes.
crd.destroyCache(CACHE_NAME);

// Resume client join.
clientCommSpi.stopBlock();
commSpi.stopBlock();

// Client join should succeed.
IgniteEx client = clientStartFut.get();
Expand All @@ -143,7 +106,7 @@ public void testClientJoinAndCacheStop() throws Exception {
}
finally {
// Resume processing cache destroy on client node.
clientDiscoSpiBlock.countDown();
cliDiscoSpiUnblockedLatch.countDown();
}

// Wait till cache destroyed on client node.
Expand All @@ -157,4 +120,36 @@ public void testClientJoinAndCacheStop() throws Exception {

Assert.assertNull("Cache should be destroyed on client node", client.cache(CACHE_NAME));
}

/** */
private TcpDiscoverySpi createBlockingDiscoverySpi(CountDownLatch discoSpiUnblockedLatch) {
return CustomMessageInterceptingDiscoverySpi.create(msg -> {
if (!(msg instanceof DynamicCacheChangeBatch))
return;

DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)msg;

boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
.anyMatch(req -> req.stop() && req.cacheName().equals(CACHE_NAME));

if (hasCacheStopReq)
U.awaitQuiet(discoSpiUnblockedLatch);
});
}

/** */
private void blockSingleMessage(TestRecordingCommunicationSpi commSpi) {
// Delay client join process.
commSpi.blockMessages((node, msg) -> {
if (!(msg instanceof GridDhtPartitionsSingleMessage))
return false;

GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg;

return Optional.ofNullable(singleMsg.exchangeId())
.map(GridDhtPartitionExchangeId::topologyVersion)
.filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 0)))
.isPresent();
});
}
}
Loading
Loading