Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-fad0dab.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Add `WRITE_THROUGHPUT` metric to measure request body upload speed (bytes/sec). This metric is reported at the API call attempt level for requests with a body."
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ public final class SdkInternalExecutionAttribute extends SdkExecutionAttribute {
public static final ExecutionAttribute<AtomicLong> RESPONSE_BYTES_READ =
new ExecutionAttribute<>("ResponseBytesRead");

/**
* The running count of bytes in the request body that have been written by the client. This is updated atomically as the
* request is being sent.
* <p>
* This attribute is set before every API call attempt.
*/
public static final ExecutionAttribute<AtomicLong> REQUEST_BYTES_WRITTEN =
new ExecutionAttribute<>("RequestBytesWritten");

/**
* The nano time when the first byte of the request body was written. This is set atomically on the first read from the
* request body stream.
* <p>
* This attribute is set before every API call attempt.
*/
public static final ExecutionAttribute<AtomicLong> REQUEST_BODY_FIRST_BYTE_WRITTEN_NANO_TIME =
new ExecutionAttribute<>("RequestBodyFirstByteWrittenNanoTime");

/**
* Nano time when the last byte of the request body was written. Used to calculate WRITE_THROUGHPUT metric.
* <p>
* This attribute is set before every API call attempt.
*/
public static final ExecutionAttribute<AtomicLong> REQUEST_BODY_LAST_BYTE_WRITTEN_NANO_TIME =
new ExecutionAttribute<>("RequestBodyLastByteWrittenNanoTime");

/**
* The auth scheme provider used to resolve the auth scheme for a request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public Response<OutputT> execute(SdkHttpFullRequest input, RequestExecutionConte
reportBackoffDelay(context);

resetBytesRead(context);
resetBytesWritten(context);
try {
Response<OutputT> response = wrapped.execute(input, context);
collectHttpMetrics(apiCallAttemptMetrics, response.httpResponse());
Expand All @@ -69,6 +70,14 @@ private void resetBytesRead(RequestExecutionContext context) {
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ, new AtomicLong(0));
}

private void resetBytesWritten(RequestExecutionContext context) {
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.REQUEST_BYTES_WRITTEN, new AtomicLong(0));
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_FIRST_BYTE_WRITTEN_NANO_TIME,
new AtomicLong(0));
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_LAST_BYTE_WRITTEN_NANO_TIME,
new AtomicLong(0));
}

private void reportBackoffDelay(RequestExecutionContext context) {
Duration lastBackoffDelay = context.executionAttributes().getAttribute(RetryableStageHelper.LAST_BACKOFF_DELAY_DURATION);
if (lastBackoffDelay != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.http.pipeline.stages;

import java.time.Duration;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.Response;
Expand Down Expand Up @@ -64,11 +65,26 @@ private void collectMetrics(RequestExecutionContext context) {
long ttlb = now - attemptStartTime;
attemptMetricCollector.reportMetric(CoreMetric.TIME_TO_LAST_BYTE, Duration.ofNanos(ttlb));

long responseBytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
long responseReadStart = MetricUtils.responseHeadersReadEndNanoTime(context).getAsLong();
double throughput = MetricUtils.bytesPerSec(responseBytesRead, responseReadStart, now);

attemptMetricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, throughput);
long responseBytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
double readThroughput = MetricUtils.bytesPerSec(responseBytesRead, responseReadStart, now);
attemptMetricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, readThroughput);

reportWriteThroughput(context, attemptMetricCollector);
}

private void reportWriteThroughput(RequestExecutionContext context, MetricCollector attemptMetricCollector) {
OptionalLong firstByteWrittenTimeOpt = MetricUtils.requestBodyFirstByteWrittenNanoTime(context);
OptionalLong requestBytesWrittenOpt = MetricUtils.apiCallAttemptRequestBytesWritten(context);
if (firstByteWrittenTimeOpt.isPresent() && requestBytesWrittenOpt.isPresent()) {
long lastByteWrittenTime = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_LAST_BYTE_WRITTEN_NANO_TIME).get();
double writeThroughput = MetricUtils.bytesPerSec(requestBytesWrittenOpt.getAsLong(),
firstByteWrittenTimeOpt.getAsLong(),
lastByteWrittenTime);
attemptMetricCollector.reportMetric(CoreMetric.WRITE_THROUGHPUT, writeThroughput);
}
}

private SdkHttpFullResponse trackBytesRead(SdkHttpFullResponse httpFullResponse, RequestExecutionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

import static software.amazon.awssdk.http.Header.CONTENT_LENGTH;

import java.io.InputStream;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.InterruptMonitor;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.metrics.BytesWrittenTrackingInputStream;
import software.amazon.awssdk.core.internal.util.MetricUtils;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.http.ContentStreamProvider;
Expand Down Expand Up @@ -72,7 +75,7 @@ private HttpExecuteResponse executeHttpRequest(SdkHttpFullRequest request, Reque

MetricCollector httpMetricCollector = MetricUtils.createHttpMetricsCollector(context);

request = enforceContentLengthIfPresent(request);
request = wrapRequestContentStream(request, context);

ExecutableHttpRequest requestCallable = sdkHttpClient
.prepareRequest(HttpExecuteRequest.builder()
Expand All @@ -97,33 +100,42 @@ private HttpExecuteResponse executeHttpRequest(SdkHttpFullRequest request, Reque
return measuredExecute.left();
}

private static long updateMetricCollectionAttributes(RequestExecutionContext context) {
long now = System.nanoTime();
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.API_CALL_ATTEMPT_START_NANO_TIME,
now);
return now;
}

private static SdkHttpFullRequest enforceContentLengthIfPresent(SdkHttpFullRequest request) {
Optional<ContentStreamProvider> requestContentStreamProviderOptional = request.contentStreamProvider();

if (!requestContentStreamProviderOptional.isPresent()) {
private SdkHttpFullRequest wrapRequestContentStream(SdkHttpFullRequest request, RequestExecutionContext context) {
Optional<ContentStreamProvider> contentStreamProvider = request.contentStreamProvider();
if (!contentStreamProvider.isPresent()) {
return request;
}

AtomicLong bytesWritten = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.REQUEST_BYTES_WRITTEN);
AtomicLong firstByteWrittenTime = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_FIRST_BYTE_WRITTEN_NANO_TIME);
AtomicLong lastByteWrittenTime = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_LAST_BYTE_WRITTEN_NANO_TIME);

Optional<Long> contentLength = contentLength(request);
if (!contentLength.isPresent()) {
LOG.debug(() -> String.format("Request contains a body but does not have a Content-Length header. Not validating "
+ "the amount of data sent to the service: %s", request));
return request;
}

ContentStreamProvider requestContentProvider = requestContentStreamProviderOptional.get();
ContentStreamProvider lengthVerifyingProvider = () -> new LengthAwareInputStream(requestContentProvider.newStream(),
contentLength.get());
return request.toBuilder()
.contentStreamProvider(lengthVerifyingProvider)
.build();
ContentStreamProvider wrapped = () -> {
InputStream stream = contentStreamProvider.get().newStream();
stream = new BytesWrittenTrackingInputStream(stream, bytesWritten, firstByteWrittenTime, lastByteWrittenTime);
if (contentLength.isPresent()) {
stream = new LengthAwareInputStream(stream, contentLength.get());
}
return stream;
};

return request.toBuilder().contentStreamProvider(wrapped).build();
}

private static long updateMetricCollectionAttributes(RequestExecutionContext context) {
long now = System.nanoTime();
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.API_CALL_ATTEMPT_START_NANO_TIME,
now);
return now;
}

private static Optional<Long> contentLength(SdkHttpFullRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.metrics;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.io.SdkFilterInputStream;

/**
* An input stream that tracks the number of bytes read from it. When the HTTP client reads from this stream to send the request
* body, we count those bytes as "written" to the service.
*/
@SdkInternalApi
public final class BytesWrittenTrackingInputStream extends SdkFilterInputStream {
private final AtomicLong bytesWritten;
private final AtomicLong firstByteWrittenTime;
private final AtomicLong lastByteWrittenTime;

public BytesWrittenTrackingInputStream(InputStream in, AtomicLong bytesWritten, AtomicLong firstByteWrittenTime,
AtomicLong lastByteWrittenTime) {
super(in);
this.bytesWritten = bytesWritten;
this.firstByteWrittenTime = firstByteWrittenTime;
this.lastByteWrittenTime = lastByteWrittenTime;
}

@Override
public int read() throws IOException {
recordFirstByteWritten();
int read = super.read();
if (read >= 0) {
bytesWritten.incrementAndGet();
lastByteWrittenTime.set(System.nanoTime());
}
return read;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
recordFirstByteWritten();
int read = super.read(b, off, len);
updateBytesWritten(read);
return read;
}

@Override
public long skip(long n) throws IOException {
recordFirstByteWritten();
long skipped = super.skip(n);
updateBytesWritten(skipped);
return skipped;
}

private void recordFirstByteWritten() {
firstByteWrittenTime.compareAndSet(0, System.nanoTime());
}

private void updateBytesWritten(long bytesRead) {
if (bytesRead > 0) {
bytesWritten.addAndGet(bytesRead);
lastByteWrittenTime.set(System.nanoTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ public static OptionalLong apiCallAttemptResponseBytesRead(RequestExecutionConte
return OptionalLong.of(read.get());
}

public static OptionalLong apiCallAttemptRequestBytesWritten(RequestExecutionContext context) {
AtomicLong written = context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.REQUEST_BYTES_WRITTEN);
if (written == null || written.get() == 0) {
return OptionalLong.empty();
}
return OptionalLong.of(written.get());
}

public static OptionalLong requestBodyFirstByteWrittenNanoTime(RequestExecutionContext context) {
AtomicLong firstByteWrittenTime = context.executionAttributes()
.getAttribute(SdkInternalExecutionAttribute.REQUEST_BODY_FIRST_BYTE_WRITTEN_NANO_TIME);
if (firstByteWrittenTime == null || firstByteWrittenTime.get() == 0) {
return OptionalLong.empty();
}
return OptionalLong.of(firstByteWrittenTime.get());
}

public static OptionalLong responseHeadersReadEndNanoTime(RequestExecutionContext context) {
Long startTime = context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.HEADERS_READ_END_NANO_TIME);
if (startTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,23 @@ public final class CoreMetric {
public static final SdkMetric<Double> READ_THROUGHPUT =
metric("ReadThroughput", Double.class, MetricLevel.TRACE);

/**
* The write throughput of the client, defined as {@code NumberOfRequestBytesWritten / (LastReadTime - WriteStartTime)},
* where WriteStartTime is when the first byte is read from the request body and LastReadTime is when the last byte is
* read. This value is in bytes per second.
* <p>
* This metric measures the rate at which bytes are read from the request body stream. It excludes connection setup,
* TLS handshake time, and server processing time.
* <p>
* Note: This metric does not account for buffering in the HTTP client layer. The actual network transmission rate may
* be lower if the HTTP client buffers data before sending. This metric represents an upper bound of the network
* throughput.
* <p>
* This metric is only reported for requests that have a body.
*/
public static final SdkMetric<Double> WRITE_THROUGHPUT =
metric("WriteThroughput", Double.class, MetricLevel.TRACE);

/**
* The duration of time it took to resolve the endpoint used for the API call.
*/
Expand Down
Loading
Loading