Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-ea797a6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fix connection pool exhaustion in the CRT HTTP client where connections were not released after a request abort or timeout."
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@
<!-- Ignore usage of S3MetaRequest in CrtS3ClientUploadBenchmark. !-->
<suppress checks="Regexp"
files="software.amazon.awssdk.s3benchmarks.CrtS3ClientUploadBenchmark.java"/>

<!-- ResponseHandlerHelper has helper method closeConnection() which handles safe closing of connection -->
<suppress id="NoCrtStreamCancel"
files=".*ResponseHandlerHelper\.java$"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,15 @@
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't call HttpStreamBase.cancel() directly -->
<module name="Regexp">
<property name="id" value="NoCrtStreamCancel"/>
<property name="format" value="HttpStreamBase\s*(\.|::)\s*cancel"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Don't call HttpStreamBase.cancel() directly. Use ResponseHandlerHelper.closeConnection() or the response handler's closeConnection() method, which is idempotent and pairs cancel() with close() correctly."/>
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't implement AutoCloseable/Closeable -->
<module name="Regexp">
<property name="format" value="(class|interface).*(implements|extends).*[^\w](Closeable|AutoCloseable)[^\w]"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBase;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
Expand Down Expand Up @@ -67,12 +66,21 @@ private void doExecute(CrtAsyncRequestContext executionContext,

HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);

HttpStreamBaseResponseHandler crtResponseHandler =
CrtResponseAdapter crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

streamFuture.thenAccept(crtResponseHandler::onAcquireStream);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
Expand Down Expand Up @@ -57,13 +56,23 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk
acquireStartTime = System.nanoTime();
}

HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

streamFuture.thenAccept(crtResponseHandler::onAcquireStream);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((streamBase, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public CrtResponseAdapter(CompletableFuture<Void> completionFuture,
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder);
}

public static HttpStreamBaseResponseHandler toCrtResponseHandler(
public static CrtResponseAdapter toCrtResponseHandler(
CompletableFuture<Void> requestFuture,
SdkAsyncHttpResponseHandler responseHandler) {
return new CrtResponseAdapter(requestFuture, responseHandler);
Expand Down Expand Up @@ -107,6 +107,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
responseHandlerHelper.onResponseComplete();
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete();
} else {
Expand Down Expand Up @@ -145,4 +146,12 @@ private void callResponseHandlerOnError(Throwable error) {
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
}
}

public void onAcquireStream(HttpStreamBase stream) {
responseHandlerHelper.onAcquireStream(stream);
}

public void closeConnection() {
responseHandlerHelper.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
responseHandlerHelper.onResponseComplete();
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete();
} else {
Expand All @@ -132,4 +133,12 @@ private void onSuccessfulResponseComplete() {
simplePublisher.complete();
responseHandlerHelper.releaseConnection();
}

public void onAcquireStream(HttpStreamBase stream) {
responseHandlerHelper.onAcquireStream(stream);
}

public void closeConnection() {
responseHandlerHelper.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,27 @@ public class ResponseHandlerHelper {
private final SdkHttpResponse.Builder responseBuilder;
private HttpStreamBase stream;
private boolean streamClosed;
private boolean streamCompleted;
private final Object streamLock = new Object();

public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) {
this.responseBuilder = responseBuilder;
}

public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
/**
* Set the stream reference as soon as it is acquired from the pool, so that closeConnection can
* cancel it even if onResponseHeaders has not yet fired (e.g. the server is unresponsive).
*/
public void onAcquireStream(HttpStreamBase stream) {
synchronized (streamLock) {
if (this.stream == null) {
this.stream = stream;
}
}
}

public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
onAcquireStream(stream);
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
Expand Down Expand Up @@ -73,16 +82,32 @@ public void releaseConnection() {
}
}

/**
* Called when CRT fires onResponseComplete. After this, {@link #closeConnection()} skips
* {@code cancel()} because per {@link software.amazon.awssdk.crt.http.HttpStreamBase#cancel()}
* javadoc: "if the stream is already completing for other reasons, this call will have no effect."
*/
public void onResponseComplete() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to add the streamCompleted check because without it, H2ErrorTest crashes with a SIGSEGV on the CRT side. The pool-exhaustion fix adds a second closeConnection() call through requestFuture.whenComplete. Combined with the existing call in onFailedResponseComplete, this results in cancel() being called after close() has already freed the native stream,
causing the crash. Setting streamCompleted = true before onFailedResponseComplete runs ensures closeConnection() skips cancel() when CRT has already completed the stream,
honoring the HttpStreamBase.cancel() javadoc contract: "if the stream is already completing for other reasons, this call will have no effect."

synchronized (streamLock) {
streamCompleted = true;
}
}

/**
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
* connection pool. This should be called on error paths or when the stream is aborted before the response is
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
* <p>
* If CRT has already completed the stream via {@link #onResponseComplete()}, {@code cancel()} is skipped
* to avoid a native use-after-free, but {@code close()} is still called to release the Java-side handle.
*/
public void closeConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
stream.cancel();
if (!streamCompleted) {
stream.cancel();
}
stream.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.http.HttpTestUtils.createProvider;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;

import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.RecordingResponseHandler;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.metrics.MetricCollector;

/**
* Verifies connection pool behavior when requests are aborted in between.
*/
public class AwsCrtHttpClientAbortBehaviorTest {

@RegisterExtension
static WireMockExtension mockServer = WireMockExtension.newInstance()
.options(wireMockConfig().dynamicPort())
.build();

private static ScheduledExecutorService scheduler;

@BeforeAll
static void setup() {
scheduler = Executors.newScheduledThreadPool(1);
}

@AfterAll
static void tearDown() {
scheduler.shutdown();
}

/**
* Verifies that aborting in-flight requests evicts connections from the pool —
* the next request succeeds and LEASED_CONCURRENCY is 1.
*/
@Test
void syncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
URI uri = URI.create("http://localhost:" + wm.getHttpPort());

try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(3).build()) {
stubUnresponsiveServer();
executeAndAbort(client, uri, 3);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why do we need to send 3 requests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use 3 to match maxConcurrency(3) and verify all 3 dead connections are evicted and the pool can accept 3 new requests.


// allow cancel() callbacks to complete before asserting pool state
Thread.sleep(200);

stubResponsiveServer();
int successCount = 0;
MetricCollector collector = MetricCollector.create("test");
for (int i = 0; i < 3; i++) {
try {
client.prepareRequest(syncRequest(uri, i == 0 ? collector : null)).call();
successCount++;
} catch (Exception e) {
// connection not evicted
}
}

assertThat(successCount).as("%d/%d requests succeeded after aborts", successCount, 3).isEqualTo(3);
assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
.as("LEASED_CONCURRENCY must be 1 after aborts, not %d", 4)
.containsExactly(1);
}
}

/**
* Verifies that when an async request future completes exceptionally, the connection is
* evicted from the pool and LEASED_CONCURRENCY is 1 for the next request.
*/
@Test
void asyncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
URI uri = URI.create("http://localhost:" + wm.getHttpPort());

try (SdkAsyncHttpClient client = AwsCrtAsyncHttpClient.builder().maxConcurrency(3).build()) {
stubUnresponsiveServer();
for (int i = 0; i < 3; i++) {
RecordingResponseHandler recorder = new RecordingResponseHandler();
CompletableFuture<Void> future = client.execute(AsyncExecuteRequest.builder()
.request(createRequest(uri))
.requestContentPublisher(createProvider(""))
.responseHandler(recorder)
.build());
// abort() equivalent for async: complete the future exceptionally after stream is acquired
scheduler.schedule(() -> future.completeExceptionally(new RuntimeException("timeout")),
100, TimeUnit.MILLISECONDS);
try {
future.get(2, TimeUnit.SECONDS);
} catch (Exception e) {
// expected
}
// wait for the response handler to finish so cancel() has completed before next iteration
try {
recorder.completeFuture().get(2, TimeUnit.SECONDS);
} catch (Exception e) {
// expected — handler receives the error
}
}

stubResponsiveServer();
MetricCollector collector = MetricCollector.create("test");
RecordingResponseHandler recorder = new RecordingResponseHandler();
client.execute(AsyncExecuteRequest.builder()
.request(createRequest(uri))
.requestContentPublisher(createProvider(""))
.responseHandler(recorder)
.metricCollector(collector)
.build());
recorder.completeFuture().get(5, TimeUnit.SECONDS);

assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
.as("LEASED_CONCURRENCY must be 1 after exceptionally-completed futures, not %d", 4)
.containsExactly(1);
}
}

private void executeAndAbort(SdkHttpClient client, URI uri, int count) {
for (int i = 0; i < count; i++) {
ExecutableHttpRequest req = client.prepareRequest(syncRequest(uri, null));
// abort() must be called from another thread while call() is blocking
scheduler.schedule(req::abort, 100, TimeUnit.MILLISECONDS);
try {
req.call();
} catch (Exception e) {
// expected — aborted
}
}
}

private HttpExecuteRequest syncRequest(URI uri, MetricCollector collector) {
HttpExecuteRequest.Builder builder = HttpExecuteRequest.builder()
.request(createRequest(uri))
.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
if (collector != null) {
builder.metricCollector(collector);
}
return builder.build();
}

private void stubUnresponsiveServer() {
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(5000).withBody("slow")));
}

private void stubResponsiveServer() {
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withStatus(200).withBody("OK")));
}
}
Loading
Loading