Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.lessThan;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -45,6 +46,7 @@
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.utils.Utils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,43 +64,24 @@ public class NettyTest {
* manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the
* first messages being sent between the client and the server, which will fail the tests.
*/
private void waitUntilReady(IConnection... connections) throws Exception {
private void waitUntilReady(IConnection... connections) {
LOG.info("Waiting until all Netty connections are ready...");
int intervalMs = 10;
int maxWaitMs = 5000;
int waitedMs = 0;
while (true) {
if (Arrays.stream(connections)
.allMatch(WorkerState::isConnectionReady)) {
LOG.info("All Netty connections are ready");
break;
}
if (waitedMs > maxWaitMs) {
throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms");
}
Thread.sleep(intervalMs);
waitedMs += intervalMs;
}
Awaitility.await("all Netty connections to be ready")
.atMost(5, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> Arrays.stream(connections).allMatch(WorkerState::isConnectionReady));
LOG.info("All Netty connections are ready");
}

private IConnectionCallback mkConnectionCallback(Consumer<TaskMessage> myFn) {
return (batch) -> batch.forEach(myFn::accept);
}

private Runnable sleep() {
return () -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw Utils.wrapInRuntime(e);
}
};
}

private void waitForNotNull(AtomicReference<TaskMessage> response) {
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
() -> response.get() == null,
sleep());
Awaitility.await("response to be non-null")
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> response.get() != null);
}

private void send(IConnection client, int taskId, byte[] messageBytes) {
Expand Down Expand Up @@ -199,9 +182,10 @@ private void doTestLoad(Map<String, Object> stormConf) throws Exception {
List<Integer> tasks = new ArrayList<>();
tasks.add(1);
tasks.add(2);
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
() -> client.getLoad(tasks).isEmpty(),
sleep());
Awaitility.await("client to receive load metrics")
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> !client.getLoad(tasks).isEmpty());
Map<Integer, Load> load = client.getLoad(tasks);
assertThat(load.get(1).getBoltLoad(), is(0.0));
assertThat(load.get(2).getBoltLoad(), is(1.0));
Expand Down Expand Up @@ -326,11 +310,12 @@ private void doTestBatch(Map<String, Object> stormConf) throws Exception {
IntStream.range(1, numMessages)
.forEach(i -> send(client, taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8)));

Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
() -> responses.size() < numMessages - 1,
() -> {
Awaitility.await("all batch messages to be received")
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> {
LOG.info("{} of {} received", responses.size(), numMessages - 1);
sleep().run();
return responses.size() >= numMessages - 1;
});
IntStream.range(1, numMessages)
.forEach(i -> assertThat(new String(responses.get(i - 1).message(), StandardCharsets.UTF_8), is(String.valueOf(i))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,32 @@ private static Map<String, Object> metricsConf() {
}

private static void waitForAtLeastNBuckets(int n, String compId, String metricName,
LocalCluster cluster) throws Exception {
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
() -> {
LocalCluster cluster) {
Awaitility.with()
.pollInterval(10, TimeUnit.MILLISECONDS)
.conditionEvaluationListener(condition -> {
try {
cluster.advanceClusterTime(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.until(() -> {
Map<Integer, Collection<Object>> taskIdToBuckets =
FakeMetricConsumer.getTaskIdToBuckets(compId, metricName);
if (n != 0 && taskIdToBuckets == null) {
return true;
return false;
}
if (taskIdToBuckets == null) {
return false;
return true;
}
for (Collection<Object> buckets : taskIdToBuckets.values()) {
if (buckets.size() < n) {
return true;
return false;
}
}
return false;
},
() -> {
try {
cluster.advanceClusterTime(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.generated.AuthorizationException;
Expand All @@ -39,6 +40,7 @@
import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer.AclFunctionEntry;
import org.apache.storm.security.auth.authorizer.DenyAuthorizer;
import org.apache.storm.utils.Time;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -66,18 +68,20 @@ public static void close() {
exec.shutdownNow();
}

public static DRPCRequest getNextAvailableRequest(DRPC server, String func) throws Exception {
DRPCRequest request = null;
long timedout = System.currentTimeMillis() + 5_000;
while (System.currentTimeMillis() < timedout) {
request = server.fetchRequest(func);
if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
return request;
}
Thread.sleep(1);
}
fail("Test timed out waiting for a request on " + func);
return request;
public static DRPCRequest getNextAvailableRequest(DRPC server, String func) {
AtomicReference<DRPCRequest> result = new AtomicReference<>();
Awaitility.await("DRPC request on " + func)
.atMost(5, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.MILLISECONDS)
.until(() -> {
DRPCRequest req = server.fetchRequest(func);
if (req != null && req.get_request_id() != null && !req.get_request_id().isEmpty()) {
result.set(req);
return true;
}
return false;
});
return result.get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -737,10 +738,10 @@ public void testMultipleKeysOneUser() throws Exception {

lrsrcSet = localizer.getUserFiles().get(user1);
assertEquals(2, lrsrcSet.size(), "local resource set size wrong");
long end = System.currentTimeMillis() + 100;
while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
Thread.sleep(1);
}
Awaitility.await("keyFile2 to be deleted")
.atMost(5, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.MILLISECONDS)
.until(() -> !keyFile2.exists());
assertTrue(keyFile.exists(), "blob deleted");
assertFalse(keyFile2.exists(), "blob not deleted");
assertTrue(keyFile3.exists(), "blob deleted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.storm.metricstore.MetricException;
import org.apache.storm.metricstore.MetricStore;
import org.apache.storm.metricstore.MetricStoreConfig;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -313,15 +314,11 @@ public void testMetricCleanup() throws Exception {
assertTrue(list.contains(m2));
}

private void waitForInsertFinish(Metric m) throws Exception {
private void waitForInsertFinish(Metric m) {
Metric last = new Metric(m);
int attempts = 0;
do {
Thread.sleep(1);
attempts++;
if (attempts > 5000) {
throw new Exception("Insertion timing out");
}
} while (!store.populateValue(last));
Awaitility.await("metric insertion to complete")
.atMost(5, java.util.concurrent.TimeUnit.SECONDS)
.pollInterval(1, java.util.concurrent.TimeUnit.MILLISECONDS)
.until(() -> store.populateValue(last));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
import org.apache.storm.testing.InProcessZookeeper;
import org.apache.storm.thrift.transport.TTransportException;
import org.awaitility.Awaitility;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Time;
Expand Down Expand Up @@ -143,15 +144,10 @@ public static void withServer(String loginCfg,
LOG.info("Starting Serving...");
server.serve();
}).start();
Testing.whileTimeout(
() -> !server.isServing(),
() -> {
try {
Time.sleep(100);
} catch (InterruptedException e) {
//Ignored
}
});
Awaitility.await("ThriftServer to start serving")
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(server::isServing);
try {
LOG.info("Starting to run {}", body);
body.accept(server, conf);
Expand Down