Skip to content

Commit f65b970

Browse files
jniocheclaude
andauthored
Replace Thread.sleep and Testing.whileTimeout with Awaitility in tests (#8455)
Replace hand-rolled polling loops and Testing.whileTimeout calls with Awaitility across 6 test files. This makes async waits exit as soon as the condition is met instead of sleeping for a fixed duration, improving both test speed and reliability on slow CI machines. Files changed: - RocksDbStoreTest: polling loop in waitForInsertFinish - DRPCTest: deadline loop in getNextAvailableRequest - AsyncLocalizerTest: polling loop waiting for file deletion - AuthTest: Testing.whileTimeout waiting for ThriftServer - MetricsIntegrationTest: Testing.whileTimeout in waitForAtLeastNBuckets - NettyTest: waitUntilReady, waitForNotNull, doTestLoad, doTestBatch The deliberate Thread.sleep(100) in NettyTest.doTestServerDelayed is left unchanged as it simulates a delayed server start, not polling. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 855a72a commit f65b970

6 files changed

Lines changed: 69 additions & 84 deletions

File tree

storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.hamcrest.Matchers.lessThan;
2222

2323
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
2425
import java.util.ArrayList;
2526
import java.util.Arrays;
2627
import java.util.Collections;
@@ -45,6 +46,7 @@
4546
import org.apache.storm.messaging.TaskMessage;
4647
import org.apache.storm.messaging.TransportFactory;
4748
import org.apache.storm.utils.Utils;
49+
import org.awaitility.Awaitility;
4850
import org.junit.jupiter.api.Test;
4951
import org.slf4j.Logger;
5052
import org.slf4j.LoggerFactory;
@@ -62,43 +64,24 @@ public class NettyTest {
6264
* manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the
6365
* first messages being sent between the client and the server, which will fail the tests.
6466
*/
65-
private void waitUntilReady(IConnection... connections) throws Exception {
67+
private void waitUntilReady(IConnection... connections) {
6668
LOG.info("Waiting until all Netty connections are ready...");
67-
int intervalMs = 10;
68-
int maxWaitMs = 5000;
69-
int waitedMs = 0;
70-
while (true) {
71-
if (Arrays.stream(connections)
72-
.allMatch(WorkerState::isConnectionReady)) {
73-
LOG.info("All Netty connections are ready");
74-
break;
75-
}
76-
if (waitedMs > maxWaitMs) {
77-
throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms");
78-
}
79-
Thread.sleep(intervalMs);
80-
waitedMs += intervalMs;
81-
}
69+
Awaitility.await("all Netty connections to be ready")
70+
.atMost(5, TimeUnit.SECONDS)
71+
.pollInterval(10, TimeUnit.MILLISECONDS)
72+
.until(() -> Arrays.stream(connections).allMatch(WorkerState::isConnectionReady));
73+
LOG.info("All Netty connections are ready");
8274
}
8375

8476
private IConnectionCallback mkConnectionCallback(Consumer<TaskMessage> myFn) {
8577
return (batch) -> batch.forEach(myFn::accept);
8678
}
8779

88-
private Runnable sleep() {
89-
return () -> {
90-
try {
91-
Thread.sleep(10);
92-
} catch (InterruptedException e) {
93-
throw Utils.wrapInRuntime(e);
94-
}
95-
};
96-
}
97-
9880
private void waitForNotNull(AtomicReference<TaskMessage> response) {
99-
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
100-
() -> response.get() == null,
101-
sleep());
81+
Awaitility.await("response to be non-null")
82+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
83+
.pollInterval(10, TimeUnit.MILLISECONDS)
84+
.until(() -> response.get() != null);
10285
}
10386

10487
private void send(IConnection client, int taskId, byte[] messageBytes) {
@@ -199,9 +182,10 @@ private void doTestLoad(Map<String, Object> stormConf) throws Exception {
199182
List<Integer> tasks = new ArrayList<>();
200183
tasks.add(1);
201184
tasks.add(2);
202-
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
203-
() -> client.getLoad(tasks).isEmpty(),
204-
sleep());
185+
Awaitility.await("client to receive load metrics")
186+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
187+
.pollInterval(10, TimeUnit.MILLISECONDS)
188+
.until(() -> !client.getLoad(tasks).isEmpty());
205189
Map<Integer, Load> load = client.getLoad(tasks);
206190
assertThat(load.get(1).getBoltLoad(), is(0.0));
207191
assertThat(load.get(2).getBoltLoad(), is(1.0));
@@ -326,11 +310,12 @@ private void doTestBatch(Map<String, Object> stormConf) throws Exception {
326310
IntStream.range(1, numMessages)
327311
.forEach(i -> send(client, taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
328312

329-
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
330-
() -> responses.size() < numMessages - 1,
331-
() -> {
313+
Awaitility.await("all batch messages to be received")
314+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
315+
.pollInterval(10, TimeUnit.MILLISECONDS)
316+
.until(() -> {
332317
LOG.info("{} of {} received", responses.size(), numMessages - 1);
333-
sleep().run();
318+
return responses.size() >= numMessages - 1;
334319
});
335320
IntStream.range(1, numMessages)
336321
.forEach(i -> assertThat(new String(responses.get(i - 1).message(), StandardCharsets.UTF_8), is(String.valueOf(i))));

storm-core/test/jvm/org/apache/storm/metric/MetricsIntegrationTest.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,30 +87,32 @@ private static Map<String, Object> metricsConf() {
8787
}
8888

8989
private static void waitForAtLeastNBuckets(int n, String compId, String metricName,
90-
LocalCluster cluster) throws Exception {
91-
Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
92-
() -> {
90+
LocalCluster cluster) {
91+
Awaitility.with()
92+
.pollInterval(10, TimeUnit.MILLISECONDS)
93+
.conditionEvaluationListener(condition -> {
94+
try {
95+
cluster.advanceClusterTime(1);
96+
} catch (InterruptedException e) {
97+
Thread.currentThread().interrupt();
98+
}
99+
})
100+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
101+
.until(() -> {
93102
Map<Integer, Collection<Object>> taskIdToBuckets =
94103
FakeMetricConsumer.getTaskIdToBuckets(compId, metricName);
95104
if (n != 0 && taskIdToBuckets == null) {
96-
return true;
105+
return false;
97106
}
98107
if (taskIdToBuckets == null) {
99-
return false;
108+
return true;
100109
}
101110
for (Collection<Object> buckets : taskIdToBuckets.values()) {
102111
if (buckets.size() < n) {
103-
return true;
112+
return false;
104113
}
105114
}
106-
return false;
107-
},
108-
() -> {
109-
try {
110-
cluster.advanceClusterTime(1);
111-
} catch (InterruptedException e) {
112-
Thread.currentThread().interrupt();
113-
}
115+
return true;
114116
});
115117
}
116118

storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.Future;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicReference;
2930
import javax.security.auth.Subject;
3031
import org.apache.storm.Config;
3132
import org.apache.storm.generated.AuthorizationException;
@@ -39,6 +40,7 @@
3940
import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer.AclFunctionEntry;
4041
import org.apache.storm.security.auth.authorizer.DenyAuthorizer;
4142
import org.apache.storm.utils.Time;
43+
import org.awaitility.Awaitility;
4244
import org.junit.jupiter.api.AfterAll;
4345
import org.junit.jupiter.api.Test;
4446

@@ -66,18 +68,20 @@ public static void close() {
6668
exec.shutdownNow();
6769
}
6870

69-
public static DRPCRequest getNextAvailableRequest(DRPC server, String func) throws Exception {
70-
DRPCRequest request = null;
71-
long timedout = System.currentTimeMillis() + 5_000;
72-
while (System.currentTimeMillis() < timedout) {
73-
request = server.fetchRequest(func);
74-
if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
75-
return request;
76-
}
77-
Thread.sleep(1);
78-
}
79-
fail("Test timed out waiting for a request on " + func);
80-
return request;
71+
public static DRPCRequest getNextAvailableRequest(DRPC server, String func) {
72+
AtomicReference<DRPCRequest> result = new AtomicReference<>();
73+
Awaitility.await("DRPC request on " + func)
74+
.atMost(5, TimeUnit.SECONDS)
75+
.pollInterval(1, TimeUnit.MILLISECONDS)
76+
.until(() -> {
77+
DRPCRequest req = server.fetchRequest(func);
78+
if (req != null && req.get_request_id() != null && !req.get_request_id().isEmpty()) {
79+
result.set(req);
80+
return true;
81+
}
82+
return false;
83+
});
84+
return result.get();
8185
}
8286

8387
@Test

storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.storm.utils.ServerUtils;
5959
import org.apache.storm.utils.Time;
6060
import org.apache.storm.utils.Utils;
61+
import org.awaitility.Awaitility;
6162
import org.junit.jupiter.api.Test;
6263
import org.mockito.Mockito;
6364
import org.slf4j.Logger;
@@ -737,10 +738,10 @@ public void testMultipleKeysOneUser() throws Exception {
737738

738739
lrsrcSet = localizer.getUserFiles().get(user1);
739740
assertEquals(2, lrsrcSet.size(), "local resource set size wrong");
740-
long end = System.currentTimeMillis() + 100;
741-
while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
742-
Thread.sleep(1);
743-
}
741+
Awaitility.await("keyFile2 to be deleted")
742+
.atMost(5, TimeUnit.SECONDS)
743+
.pollInterval(1, TimeUnit.MILLISECONDS)
744+
.until(() -> !keyFile2.exists());
744745
assertTrue(keyFile.exists(), "blob deleted");
745746
assertFalse(keyFile2.exists(), "blob not deleted");
746747
assertTrue(keyFile3.exists(), "blob deleted");

storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.storm.metricstore.MetricException;
2727
import org.apache.storm.metricstore.MetricStore;
2828
import org.apache.storm.metricstore.MetricStoreConfig;
29+
import org.awaitility.Awaitility;
2930
import org.junit.jupiter.api.AfterAll;
3031
import org.junit.jupiter.api.BeforeAll;
3132
import org.junit.jupiter.api.Test;
@@ -313,15 +314,11 @@ public void testMetricCleanup() throws Exception {
313314
assertTrue(list.contains(m2));
314315
}
315316

316-
private void waitForInsertFinish(Metric m) throws Exception {
317+
private void waitForInsertFinish(Metric m) {
317318
Metric last = new Metric(m);
318-
int attempts = 0;
319-
do {
320-
Thread.sleep(1);
321-
attempts++;
322-
if (attempts > 5000) {
323-
throw new Exception("Insertion timing out");
324-
}
325-
} while (!store.populateValue(last));
319+
Awaitility.await("metric insertion to complete")
320+
.atMost(5, java.util.concurrent.TimeUnit.SECONDS)
321+
.pollInterval(1, java.util.concurrent.TimeUnit.MILLISECONDS)
322+
.until(() -> store.populateValue(last));
326323
}
327324
}

storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
4242
import org.apache.storm.testing.InProcessZookeeper;
4343
import org.apache.storm.thrift.transport.TTransportException;
44+
import org.awaitility.Awaitility;
4445
import org.apache.storm.utils.ConfigUtils;
4546
import org.apache.storm.utils.NimbusClient;
4647
import org.apache.storm.utils.Time;
@@ -143,15 +144,10 @@ public static void withServer(String loginCfg,
143144
LOG.info("Starting Serving...");
144145
server.serve();
145146
}).start();
146-
Testing.whileTimeout(
147-
() -> !server.isServing(),
148-
() -> {
149-
try {
150-
Time.sleep(100);
151-
} catch (InterruptedException e) {
152-
//Ignored
153-
}
154-
});
147+
Awaitility.await("ThriftServer to start serving")
148+
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
149+
.pollInterval(100, TimeUnit.MILLISECONDS)
150+
.until(server::isServing);
155151
try {
156152
LOG.info("Starting to run {}", body);
157153
body.accept(server, conf);

0 commit comments

Comments
 (0)