Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "CloudWatch Metric Publisher",
"contributor": "",
"description": "Add `taskQueue` configuration option to `CloudWatchMetricPublisher.Builder` to allow customizing the internal executor queue. This enables high-throughput applications to use a larger queue to prevent dropped metrics."
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,10 @@
@SdkPublicApi
public final class CloudWatchMetricPublisher implements MetricPublisher {
/**
* The maximum queue size for the internal {@link #executor} that is used to aggregate metric data and upload it to
* The default maximum queue size for the internal {@link #executor} that is used to aggregate metric data and upload it to
* CloudWatch. If this value is too high, memory is wasted. If this value is too low, metrics could be dropped.
*
* This value is not currently configurable, because it's unlikely that this is a value that customers should need to modify.
* If customers really need control over this value, we might consider letting them instead configure the
* {@link BlockingQueue} used on the executor. The value here depends on the type of {@code BlockingQueue} in use, and
* we should probably not indirectly couple people to the type of blocking queue we're using.
*/
private static final int MAXIMUM_TASK_QUEUE_SIZE = 128;
private static final int DEFAULT_TASK_QUEUE_SIZE = 128;

private static final String DEFAULT_NAMESPACE = "AwsSdk/JavaSdk2";
private static final int DEFAULT_MAXIMUM_CALLS_PER_UPLOAD = 10;
Expand Down Expand Up @@ -241,7 +236,7 @@ private CloudWatchMetricPublisher(Builder builder) {

// Do not increase above 1 thread: access to MetricCollectionAggregator is not thread safe.
this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(MAXIMUM_TASK_QUEUE_SIZE),
resolveTaskQueue(builder),
threadFactory);

long flushFrequencyInMillis = resolveUploadFrequency(builder).toMillis();
Expand Down Expand Up @@ -285,6 +280,10 @@ private int resolveMaximumCallsPerUpload(Builder builder) {
return builder.maximumCallsPerUpload == null ? DEFAULT_MAXIMUM_CALLS_PER_UPLOAD : builder.maximumCallsPerUpload;
}

private BlockingQueue<Runnable> resolveTaskQueue(Builder builder) {
return builder.taskQueue == null ? new ArrayBlockingQueue<>(DEFAULT_TASK_QUEUE_SIZE) : builder.taskQueue;
}

@Override
public void publish(MetricCollection metricCollection) {
try {
Expand Down Expand Up @@ -395,6 +394,7 @@ public static final class Builder {
private Collection<MetricCategory> metricCategories;
private MetricLevel metricLevel;
private Collection<SdkMetric<?>> detailedMetrics;
private BlockingQueue<Runnable> taskQueue;

private Builder() {
}
Expand Down Expand Up @@ -467,6 +467,22 @@ public Builder maximumCallsPerUpload(Integer maximumCallsPerUpload) {
return this;
}

/**
* Configure the {@link BlockingQueue} used by the internal executor for queuing metric aggregation and upload tasks.
*
* <p>If this is not specified, a blocking queue with a capacity of 128 is used.
*
* <p>High-throughput applications may need a larger queue to prevent dropped metrics. When the queue is full, new
* metrics are dropped and a warning is logged.
*
* @param taskQueue the blocking queue to use for the internal executor
* @return This object for method chaining.
*/
public Builder taskQueue(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
return this;
}

/**
* Configure the {@link SdkMetric}s that are used to define the {@link Dimension}s metrics are aggregated under.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
Expand Down Expand Up @@ -268,6 +270,18 @@ public void detailedMetricsSettingIsHonored() {
assertThat(availableConcurrency.statisticValues()).isNull();
}

@Test
public void taskQueueSettingIsHonored() {
ArrayBlockingQueue<Runnable> customQueue = Mockito.spy(new ArrayBlockingQueue<>(128));
try (CloudWatchMetricPublisher publisher = publisherBuilder.taskQueue(customQueue).build()) {
MetricCollector collector = newCollector();
collector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(new FixedTimeMetricCollection(collector.collect()));
}

Mockito.verify(customQueue, Mockito.atLeastOnce()).offer(any(Runnable.class));
}

private MetricDatum getDatum(PutMetricDataRequest call, SdkMetric<?> metric) {
return call.metricData().stream().filter(m -> m.metricName().equals(metric.name())).findAny().get();
}
Expand Down
Loading