Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon CloudWatch EMF Metric Publisher",
"contributor": "humanzz",
"description": "Add `propertiesSupplier` to `EmfMetricLoggingPublisher.Builder`, enabling users to enrich EMF records with custom key-value properties that are searchable in CloudWatch Logs Insights. See [#6595](https://github.com/aws/aws-sdk-java-v2/issues/6595)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.Immutable;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
Expand Down Expand Up @@ -82,6 +84,7 @@ private EmfMetricLoggingPublisher(Builder builder) {
.dimensions(builder.dimensions)
.metricLevel(builder.metricLevel)
.metricCategories(builder.metricCategories)
.propertiesSupplier(builder.propertiesSupplier)
.build();

this.metricConverter = new MetricEmfConverter(config);
Expand Down Expand Up @@ -123,6 +126,7 @@ public static final class Builder {
private Collection<SdkMetric<String>> dimensions;
private Collection<MetricCategory> metricCategories;
private MetricLevel metricLevel;
private Supplier<Map<String, String>> propertiesSupplier;

private Builder() {
}
Expand Down Expand Up @@ -217,6 +221,27 @@ public Builder metricLevel(MetricLevel metricLevel) {
}


/**
* Configure a supplier of custom properties to include in each EMF record.
* The supplier is invoked on each {@link #publish(MetricCollection)} call,
* and the returned map entries are written as top-level key-value pairs
* in the EMF JSON output. These appear as searchable fields in
* CloudWatch Logs Insights.
*
* <p>Keys that collide with reserved EMF fields ({@code _aws}), configured
* dimension names, or reported metric names are silently skipped.
*
* <p>If this is not specified, no custom properties are added.
*
* @param propertiesSupplier a supplier returning a map of property names to values,
* or {@code null} to disable custom properties
* @return this builder
*/
public Builder propertiesSupplier(Supplier<Map<String, String>> propertiesSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what is the advantage of taking a Supplier as opposed to a Map directly in this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the properties are not necessarily static and known at publisher initialization time. An example is if I want to include a request id to correlate to other log lines/metrics within the same overall request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense but it seems like it would be hard in many scenarios to correctly determine that data, especially in a multi tenant or highly concurrent environment given that it doesn't have take any input.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm taking inspiration but simplifying what's happening in aws-embedded-metrics-java for injecting ambient context into EMF output.

In that library, the Environment interface defines a configureContext(MetricsContext) method that each environment implementation uses to enrich EMF records with runtime properties. For example, LambdaEnvironment.configureContext() injects executionEnvironment, functionVersion, logStreamId, and traceId by reading environment variables — notably, it also takes no per-request input, it just reads from ambient state. These properties end up as top-level keys in the EMF JSON.

The Supplier<Map<String, String>> is the same concept, just simplified. Instead of requiring users to implement a full Environment interface, we provide a lightweight Supplier callback that does the same thing — provide ambient context at publish time. The supplier implementer would have the responsibility to define their own strategy for dealing with multi-tenancy or concurrency, if required — though in Lambda environments, where this publisher makes the most sense, each invocation runs in its own execution context so ambient state is naturally request-scoped.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, another thing I wanna call out — I went with Map<String, String> rather than Map<String, Object>. The EMF spec allows any valid JSON type for root node members, and the aws-embedded-metrics-java library uses Map<String, Object> for its putProperty. I chose String to keep the implementation simple — the primary use case is contextual metadata like request IDs and trace IDs, and String covers that well. Using Object would require type-dispatching in the serializer since JsonWriter has separate overloads for String, long, double, boolean, etc., plus handling for unsupported types. If needed, we can always widen to Object later without breaking backward compatibility (narrowing would be a breaking change). Happy to change if you'd prefer Object from the start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed reply!

  1. I did take at the interface you liked previously, and I agree on the Map<String, String> over Map<String, Object> as it does greatly simplify the interface while addressing what we foresee as the majority use case. And as you said, it's not a one-way door.
  2. I agree that in a Lambda environment, the Supplier is more than sufficient; I was imagining other scenarios where the interface might benefit from information in order to compute the correct map values to enter. One thing I could think of is if a user may want to emit some high cardinality information like ServiceEndpoint as a property, which would be difficult to plumb through with this interface. I think even in this case though this isn't a one way door since we could overload it with a Function or something similar in the future. Let me take this discussion back to the rest of the team to get their input and get back to you.

this.propertiesSupplier = propertiesSupplier;
return this;
}

/**
* Build a {@link EmfMetricLoggingPublisher} using the configuration currently configured on this publisher.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -43,13 +45,17 @@ public final class EmfMetricConfiguration {
private final Set<SdkMetric<String>> dimensions;
private final Collection<MetricCategory> metricCategories;
private final MetricLevel metricLevel;
private final Supplier<Map<String, String>> propertiesSupplier;

private EmfMetricConfiguration(Builder builder) {
this.namespace = builder.namespace == null ? DEFAULT_NAMESPACE : builder.namespace;
this.logGroupName = Validate.paramNotNull(resolveLogGroupName(builder), "logGroupName");
this.dimensions = builder.dimensions == null ? DEFAULT_DIMENSIONS : new HashSet<>(builder.dimensions);
this.metricCategories = builder.metricCategories == null ? DEFAULT_CATEGORIES : new HashSet<>(builder.metricCategories);
this.metricLevel = builder.metricLevel == null ? DEFAULT_METRIC_LEVEL : builder.metricLevel;
this.propertiesSupplier = builder.propertiesSupplier == null
? Collections::emptyMap
: builder.propertiesSupplier;
}


Expand All @@ -59,6 +65,7 @@ public static class Builder {
private Collection<SdkMetric<String>> dimensions;
private Collection<MetricCategory> metricCategories;
private MetricLevel metricLevel;
private Supplier<Map<String, String>> propertiesSupplier;

public Builder namespace(String namespace) {
this.namespace = namespace;
Expand All @@ -85,6 +92,11 @@ public Builder metricLevel(MetricLevel metricLevel) {
return this;
}

public Builder propertiesSupplier(Supplier<Map<String, String>> propertiesSupplier) {
this.propertiesSupplier = propertiesSupplier;
return this;
}

public EmfMetricConfiguration build() {
return new EmfMetricConfiguration(this);
}
Expand All @@ -110,6 +122,10 @@ public MetricLevel metricLevel() {
return metricLevel;
}

public Supplier<Map<String, String>> propertiesSupplier() {
return propertiesSupplier;
}

private String resolveLogGroupName(Builder builder) {
return builder.logGroupName != null ? builder.logGroupName :
SystemSettingUtils.resolveEnvironmentVariable("AWS_LAMBDA_LOG_GROUP_NAME").orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.metrics.MetricCategory;
Expand Down Expand Up @@ -61,17 +64,21 @@ public class MetricEmfConverter {
*/
private static final int MAX_METRIC_NUM = 100;

private static final String AWS_METADATA_KEY = "_aws";

private static final Logger logger = Logger.loggerFor(MetricEmfConverter.class);
private final List<String> dimensions = new ArrayList<>();
private final EmfMetricConfiguration config;
private final boolean metricCategoriesContainsAll;
private final Clock clock;
private final Supplier<Map<String, String>> propertiesSupplier;

@SdkTestInternalApi
public MetricEmfConverter(EmfMetricConfiguration config, Clock clock) {
this.config = config;
this.clock = clock;
this.metricCategoriesContainsAll = config.metricCategories().contains(MetricCategory.ALL);
this.propertiesSupplier = config.propertiesSupplier();
}

public MetricEmfConverter(EmfMetricConfiguration config) {
Expand Down Expand Up @@ -136,7 +143,18 @@ public List<String> convertMetricCollectionToEmf(MetricCollection metricCollecti
}
}

return createEmfStrings(aggregatedMetrics);
Map<String, String> properties = resolveProperties();
return createEmfStrings(aggregatedMetrics, properties);
}

private Map<String, String> resolveProperties() {
try {
Map<String, String> result = propertiesSupplier.get();
return result == null ? Collections.emptyMap() : result;
} catch (Exception e) {
logger.warn(() -> "Properties supplier threw an exception, publishing without custom properties", e);
return Collections.emptyMap();
}
}

/**
Expand Down Expand Up @@ -188,7 +206,8 @@ private void processAndWriteValue(JsonWriter jsonWriter, MetricRecord<?> mRecord
}
}

private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> aggregatedMetrics) {
private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> aggregatedMetrics,
Map<String, String> properties) {
List<String> emfStrings = new ArrayList<>();
Map<SdkMetric<?>, List<MetricRecord<?>>> currentMetricBatch = new HashMap<>();

Expand All @@ -204,35 +223,55 @@ private List<String> createEmfStrings(Map<SdkMetric<?>, List<MetricRecord<?>>> a
}

if (currentMetricBatch.size() == MAX_METRIC_NUM) {
emfStrings.add(createEmfString(currentMetricBatch));
emfStrings.add(createEmfString(currentMetricBatch, properties));
currentMetricBatch = new HashMap<>();
}

currentMetricBatch.put(metric, records);
}

emfStrings.add(createEmfString(currentMetricBatch));
emfStrings.add(createEmfString(currentMetricBatch, properties));

return emfStrings;
}


private String createEmfString(Map<SdkMetric<?>, List<MetricRecord<?>>> metrics) {
private String createEmfString(Map<SdkMetric<?>, List<MetricRecord<?>>> metrics,
Map<String, String> properties) {

JsonWriter jsonWriter = JsonWriter.create();
jsonWriter.writeStartObject();

writeAwsObject(jsonWriter, metrics.keySet());
writeMetricValues(jsonWriter, metrics);
writeCustomProperties(jsonWriter, properties, metrics.keySet());

jsonWriter.writeEndObject();

return new String(jsonWriter.getBytes(), StandardCharsets.UTF_8);

}

private void writeCustomProperties(JsonWriter jsonWriter, Map<String, String> properties,
Set<SdkMetric<?>> metrics) {
if (properties.isEmpty()) {
return;
}
Set<String> reservedKeys = new HashSet<>();
reservedKeys.add(AWS_METADATA_KEY);
for (SdkMetric<?> metric : metrics) {
reservedKeys.add(metric.name());
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!reservedKeys.contains(entry.getKey())) {
jsonWriter.writeFieldName(entry.getKey());
jsonWriter.writeValue(entry.getValue());
}
}
}

private void writeAwsObject(JsonWriter jsonWriter, Set<SdkMetric<?>> metricNames) {
jsonWriter.writeFieldName("_aws");
jsonWriter.writeFieldName(AWS_METADATA_KEY);
jsonWriter.writeStartObject();

jsonWriter.writeFieldName("Timestamp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.http.HttpMetric;
Expand Down Expand Up @@ -97,4 +102,96 @@ void Publish_multipleMetrics() {
assertThat(loggedEvents()).hasSize(2);
}

@Test
void publish_propertiesSupplierThrowsException_publishesWithoutCustomProperties() {
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> { throw new RuntimeException("supplier failed"); })
.build();

MetricCollector metricCollector = MetricCollector.create("test");
metricCollector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(metricCollector.collect());

// Should have: 1 warning about supplier + 1 EMF info log
boolean hasWarning = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.WARN
&& e.getMessage().getFormattedMessage().contains("Properties supplier threw an exception"));
assertThat(hasWarning).isTrue();

boolean hasEmfOutput = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"));
assertThat(hasEmfOutput).isTrue();

// EMF output should not contain any custom properties
String emfLog = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.findFirst().get().getMessage().getFormattedMessage();
assertThat(emfLog).contains("\"AvailableConcurrency\":5");
}

@Test
void publish_propertiesSupplierReturnsNull_publishesWithoutCustomProperties() {
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> null)
.build();

MetricCollector metricCollector = MetricCollector.create("test");
metricCollector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(metricCollector.collect());

// Should have EMF output without custom properties
boolean hasEmfOutput = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"));
assertThat(hasEmfOutput).isTrue();

String emfLog = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.findFirst().get().getMessage().getFormattedMessage();
assertThat(emfLog).contains("\"AvailableConcurrency\":5");
// No warning should be logged for null return
boolean hasWarning = loggedEvents().stream()
.anyMatch(e -> e.getLevel() == Level.WARN);
assertThat(hasWarning).isFalse();
}

@Test
void publish_statefulSupplier_eachPublishUsesCurrentMap() {
AtomicInteger counter = new AtomicInteger(0);
EmfMetricLoggingPublisher publisher = publisherBuilder
.logGroupName("/aws/lambda/emfMetricTest")
.propertiesSupplier(() -> {
int count = counter.incrementAndGet();
Map<String, String> map = new HashMap<String, String>();
map.put("InvocationCount", String.valueOf(count));
return map;
})
.build();

// First publish
MetricCollector mc1 = MetricCollector.create("test1");
mc1.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(mc1.collect());

// Second publish
MetricCollector mc2 = MetricCollector.create("test2");
mc2.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 10);
publisher.publish(mc2.collect());

// Collect all EMF info logs
List<String> emfLogs = loggedEvents().stream()
.filter(e -> e.getLevel() == Level.INFO
&& e.getMessage().getFormattedMessage().contains("\"_aws\":{"))
.map(e -> e.getMessage().getFormattedMessage())
.collect(java.util.stream.Collectors.toList());

assertThat(emfLogs).hasSize(2);
assertThat(emfLogs.get(0)).contains("\"InvocationCount\":\"1\"");
assertThat(emfLogs.get(1)).contains("\"InvocationCount\":\"2\"");
}
}
Loading
Loading