Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,36 @@ protected void waitForBridgeFormation() throws Exception {
}

protected void startAllBrokers() throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
final Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
final BrokerService broker = i.next().broker;
broker.start();
broker.waitUntilStarted();
}

Thread.sleep(maxSetupTime);
// Wait for all brokers to have their transport connectors ready to accept connections
// instead of using Thread.sleep which is unreliable across different machines
for (final BrokerItem brokerItem : brokerList) {
final BrokerService broker = brokerItem.broker;
assertTrue("Broker " + broker.getBrokerName() + " transport connectors ready",
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
// Verify broker is started and has transport connectors
if (!broker.isStarted() || broker.getTransportConnectors().isEmpty()) {
return false;
}

// Try to create a test connection to verify transport is accepting connections
try (final Connection testConn = brokerItem.createConnection()) {
return true;
} catch (final Exception e) {
LOG.debug("Broker " + broker.getBrokerName() + " not ready yet: " + e.getMessage());
return false;
}
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(2)));
}
}

protected BrokerService createBroker(String brokerName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,8 @@ protected void testDurablePropagation5Broker() throws Exception {
// Setup consumers
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
Thread.sleep(1000);

// let consumers propagate around the network
// let consumers propagate around the network (assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
Expand All @@ -328,9 +327,8 @@ protected void testDurablePropagation5Broker() throws Exception {
//bring online a consumer on the other side
Session ses2 = createSession("Broker_E_E");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
Thread.sleep(1000);

//there will be 2 network durables, 1 for each direction of the bridge
//there will be 2 network durables, 1 for each direction of the bridge (assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
Expand Down Expand Up @@ -384,16 +382,14 @@ protected void testDurablePropagationSpoke() throws Exception {

MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
Thread.sleep(1000);

// let consumers propagate around the network
// let consumers propagate around the network (assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);

MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
Thread.sleep(1000);

assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
Expand Down Expand Up @@ -759,7 +755,6 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
Session ses2 = createSession("Broker_E_E");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
Thread.sleep(1000);

assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
Expand All @@ -768,9 +763,8 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);

startNetworkConnectors(nc1, nc2, nc3, nc4);
Thread.sleep(1000);

// Check that the correct network durables exist
// Check that the correct network durables exist (assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, expected.get(2));
Expand All @@ -795,12 +789,10 @@ private void testDurablePropagation(int ttl, boolean dynamicOnly, boolean restar
// to test sync works ok. Things should work for all cases both dynamicOnly
// false and true because TTL info still exits and consumers are online
stopNetworkConnectors(nc1, nc2, nc3, nc4);
Thread.sleep(1000);
startNetworkConnectors(nc1, nc2, nc3, nc4);
Thread.sleep(1000);
}

// after restarting the bridges, check sync/demand are correct
// after restarting the bridges, check sync/demand are correct (assertNCDurableSubsCount waits internally)
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, expected.get(0));
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, expected.get(1));
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, expected.get(2));
Expand Down Expand Up @@ -846,12 +838,13 @@ public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
deletePersistentMessagesOnStartup = true;
String options = new String("?persistent=true&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
final String options = "?persistent=true&useJmx=false";
// Use ephemeral ports (0) to avoid port conflicts when tests run in parallel
createBroker(new URI("broker:(tcp://localhost:0)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:0)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:0)/Broker_C_C" + options));
createBroker(new URI("broker:(tcp://localhost:0)/Broker_D_D" + options));
createBroker(new URI("broker:(tcp://localhost:0)/Broker_E_E" + options));
}

@Override
Expand Down
Loading