Skip to content

Commit 943cd68

Browse files
committed
Improve synchronization in ReactiveClientTest
1 parent bb1fe81 commit 943cd68

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.List;
4242
import java.util.Random;
4343
import java.util.concurrent.CancellationException;
44+
import java.util.concurrent.CountDownLatch;
4445
import java.util.concurrent.ExecutionException;
4546
import java.util.concurrent.Future;
4647
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,7 +76,6 @@
7576
import org.apache.hc.core5.util.Timeout;
7677
import org.hamcrest.CoreMatchers;
7778
import org.junit.jupiter.api.Assertions;
78-
import org.junit.jupiter.api.Disabled;
7979
import org.junit.jupiter.api.Test;
8080
import org.junit.jupiter.api.extension.RegisterExtension;
8181
import org.reactivestreams.Publisher;
@@ -212,17 +212,17 @@ void testRequestError() throws Exception {
212212

213213
final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
214214
future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
215-
Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
215+
Assertions.assertInstanceOf(HttpStreamResetException.class, exception.getCause());
216216
Assertions.assertSame(exceptionThrown, exception.getCause().getCause());
217217
}
218218

219219
@Test
220220
void testRequestTimeout() throws Exception {
221221
final InetSocketAddress address = startServer();
222222
final HttpAsyncRequester requester = clientResource.start();
223-
final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean();
223+
final CountDownLatch requestPublisherCancellation = new CountDownLatch(1);
224224
final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
225-
.doOnCancel(() -> requestPublisherWasCancelled.set(true));
225+
.doOnCancel(requestPublisherCancellation::countDown);
226226
final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
227227
final BasicRequestProducer request = getRequestProducer(address, producer);
228228

@@ -231,7 +231,7 @@ void testRequestTimeout() throws Exception {
231231

232232
final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
233233
future.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
234-
Assertions.assertTrue(requestPublisherWasCancelled.get());
234+
Assertions.assertTrue(requestPublisherCancellation.await(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
235235
final Throwable cause = exception.getCause();
236236
if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
237237
Assertions.assertTrue(cause instanceof SocketTimeoutException, "Expected SocketTimeoutException, but got " + cause.getClass().getName());
@@ -243,14 +243,13 @@ void testRequestTimeout() throws Exception {
243243
}
244244

245245
@Test
246-
@Disabled("Fails intermittently in GitHub Actions")
247246
void testResponseCancellation() throws Exception {
248247
final InetSocketAddress address = startServer();
249248
final HttpAsyncRequester requester = clientResource.start();
250-
final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean();
249+
final CountDownLatch requestPublisherCancellation = new CountDownLatch(1);
251250
final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
252251
final Publisher<ByteBuffer> stream = Reactive3TestUtils.produceStream(Long.MAX_VALUE, 1024, null)
253-
.doOnCancel(() -> requestPublisherWasCancelled.set(true))
252+
.doOnCancel(requestPublisherCancellation::countDown)
254253
.doOnError(requestStreamError::set);
255254
final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
256255
final BasicRequestProducer request = getRequestProducer(address, producer);
@@ -273,8 +272,8 @@ void testResponseCancellation() throws Exception {
273272
assertThat(exception, CoreMatchers.anyOf(
274273
CoreMatchers.instanceOf(CancellationException.class),
275274
CoreMatchers.instanceOf(ExecutionException.class)));
276-
Assertions.assertTrue(exception.getCause() instanceof HttpStreamResetException);
277-
Assertions.assertTrue(requestPublisherWasCancelled.get());
275+
Assertions.assertInstanceOf(HttpStreamResetException.class, exception.getCause());
276+
Assertions.assertTrue(requestPublisherCancellation.await(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit()));
278277
Assertions.assertNull(requestStreamError.get());
279278
}
280279

0 commit comments

Comments
 (0)