Skip to content

Commit f3be2b9

Browse files
committed
Implement request attributes.
1 parent ff86408 commit f3be2b9

2 files changed

Lines changed: 368 additions & 14 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.google.common.annotations.VisibleForTesting;
77
import com.google.common.collect.ImmutableList;
8+
import com.google.common.collect.ImmutableMap;
89
import com.google.common.io.BaseEncoding;
910
import com.google.common.io.ByteStreams;
1011
import com.google.common.util.concurrent.MoreExecutors;
@@ -15,6 +16,8 @@
1516
import com.google.protobuf.Duration;
1617
import com.google.protobuf.InvalidProtocolBufferException;
1718
import com.google.protobuf.Message;
19+
import com.google.protobuf.Struct;
20+
import com.google.protobuf.Value;
1821
import com.google.protobuf.util.Durations;
1922
import io.envoyproxy.envoy.config.core.v3.GrpcService;
2023
import io.envoyproxy.envoy.config.core.v3.HeaderMap;
@@ -156,14 +159,16 @@ public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfigOverride(
156159

157160
@Nullable
158161
@Override
159-
public ClientInterceptor buildClientInterceptor(FilterConfig filterConfig,
162+
public ClientInterceptor buildClientInterceptor(@Nullable FilterConfig filterConfig,
160163
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
161164
ExternalProcessorFilterConfig config = (ExternalProcessorFilterConfig) filterConfig;
162-
checkNotNull(config, "filterConfig");
163165
if (overrideConfig instanceof ExternalProcessorFilterConfig) {
164-
ExtProcOverrides overrides = ((ExternalProcessorFilterConfig) overrideConfig).getExtProcOverrides();
165-
if (overrides != null) {
166+
ExternalProcessorFilterConfig over = (ExternalProcessorFilterConfig) overrideConfig;
167+
ExtProcOverrides overrides = over.getExtProcOverrides();
168+
if (overrides != null && config != null) {
166169
config = mergeConfigs(config, overrides);
170+
} else {
171+
config = over;
167172
}
168173
}
169174
checkNotNull(config, "config");
@@ -225,6 +230,7 @@ static final class ExternalProcessorFilterConfig implements FilterConfig {
225230
private final GrpcServiceConfig grpcServiceConfig;
226231
private final Optional<HeaderMutationRulesConfig> mutationRulesConfig;
227232
private final Optional<HeaderForwardingRulesConfig> forwardRulesConfig;
233+
private final ImmutableList<String> requestAttributes;
228234
private final boolean disableImmediateResponse;
229235
private final long deferredCloseTimeoutNanos;
230236
private final FilterContext filterContext;
@@ -248,13 +254,15 @@ private static ConfigOrError<ExternalProcessorFilterConfig> createInternal(
248254
GrpcService grpcService;
249255
HeaderMutationRulesConfig mutationRulesConfig = null;
250256
HeaderForwardingRulesConfig forwardRulesConfig = null;
257+
ImmutableList<String> requestAttributes = ImmutableList.of();
251258
long deferredCloseTimeoutNanos = TimeUnit.SECONDS.toNanos(5);
252259
boolean disableImmediateResponse = false;
253260

254261
if (externalProcessor != null) {
255262
mode = externalProcessor.getProcessingMode();
256263
grpcService = externalProcessor.getGrpcService();
257264
disableImmediateResponse = externalProcessor.getDisableImmediateResponse();
265+
requestAttributes = ImmutableList.copyOf(externalProcessor.getRequestAttributesList());
258266

259267
if (externalProcessor.hasMutationRules()) {
260268
try {
@@ -283,6 +291,7 @@ private static ConfigOrError<ExternalProcessorFilterConfig> createInternal(
283291
} else if (overrides != null) {
284292
mode = overrides.getProcessingMode();
285293
grpcService = overrides.getGrpcService();
294+
requestAttributes = ImmutableList.copyOf(overrides.getRequestAttributesList());
286295
} else {
287296
return ConfigOrError.fromError("Either externalProcessor or overrides must be non-null");
288297
}
@@ -309,7 +318,7 @@ private static ConfigOrError<ExternalProcessorFilterConfig> createInternal(
309318

310319
return ConfigOrError.fromConfig(new ExternalProcessorFilterConfig(
311320
externalProcessor, overrides, grpcServiceConfig, Optional.ofNullable(mutationRulesConfig),
312-
Optional.ofNullable(forwardRulesConfig),
321+
Optional.ofNullable(forwardRulesConfig), requestAttributes,
313322
disableImmediateResponse, deferredCloseTimeoutNanos, context));
314323
} catch (GrpcServiceParseException e) {
315324
return ConfigOrError.fromError("Error parsing GrpcService config: " + e.getMessage());
@@ -322,6 +331,7 @@ private static ConfigOrError<ExternalProcessorFilterConfig> createInternal(
322331
GrpcServiceConfig grpcServiceConfig,
323332
Optional<HeaderMutationRulesConfig> mutationRulesConfig,
324333
Optional<HeaderForwardingRulesConfig> forwardRulesConfig,
334+
ImmutableList<String> requestAttributes,
325335
boolean disableImmediateResponse,
326336
long deferredCloseTimeoutNanos,
327337
FilterContext filterContext) {
@@ -330,6 +340,7 @@ private static ConfigOrError<ExternalProcessorFilterConfig> createInternal(
330340
this.grpcServiceConfig = grpcServiceConfig;
331341
this.mutationRulesConfig = mutationRulesConfig;
332342
this.forwardRulesConfig = forwardRulesConfig;
343+
this.requestAttributes = requestAttributes;
333344
this.disableImmediateResponse = disableImmediateResponse;
334345
this.deferredCloseTimeoutNanos = deferredCloseTimeoutNanos;
335346
this.filterContext = filterContext;
@@ -362,6 +373,10 @@ Optional<HeaderForwardingRulesConfig> getForwardRulesConfig() {
362373
return forwardRulesConfig;
363374
}
364375

376+
ImmutableList<String> getRequestAttributes() {
377+
return requestAttributes;
378+
}
379+
365380
boolean getDisableImmediateResponse() {
366381
return disableImmediateResponse;
367382
}
@@ -518,7 +533,7 @@ public void start(Listener<ExtRespT> responseListener, Metadata headers) {
518533

519534
ExtProcClientCall extProcCall = new ExtProcClientCall(
520535
delayedCall, rawCall, stub, filterConfig, filterConfig.getMutationRulesConfig(),
521-
scheduler);
536+
scheduler, method, next);
522537

523538
return new ClientCall<ReqT, RespT>() {
524539
@Override
@@ -625,6 +640,106 @@ private static HeaderMap toHeaderMap(Metadata metadata, Optional<HeaderForwardin
625640
return builder.build();
626641
}
627642

643+
private static ImmutableMap<String, Struct> collectAttributes(
644+
ImmutableList<String> requestedAttributes,
645+
MethodDescriptor<?, ?> method,
646+
Channel channel,
647+
Metadata headers) {
648+
if (requestedAttributes.isEmpty()) {
649+
return ImmutableMap.of();
650+
}
651+
ImmutableMap.Builder<String, Struct> attributes = ImmutableMap.builder();
652+
for (String attr : requestedAttributes) {
653+
switch (attr) {
654+
case "request.path":
655+
case "request.url_path":
656+
attributes.put(attr, encodeAttribute("/" + method.getFullMethodName()));
657+
break;
658+
case "request.host":
659+
attributes.put(attr, encodeAttribute(channel.authority()));
660+
break;
661+
case "request.method":
662+
attributes.put(attr, encodeAttribute("POST"));
663+
break;
664+
case "request.headers":
665+
attributes.put(attr, encodeHeaders(headers));
666+
break;
667+
case "request.referer":
668+
String referer = getHeaderValue(headers, "referer");
669+
if (referer != null) {
670+
attributes.put(attr, encodeAttribute(referer));
671+
}
672+
break;
673+
case "request.useragent":
674+
String ua = getHeaderValue(headers, "user-agent");
675+
if (ua != null) {
676+
attributes.put(attr, encodeAttribute(ua));
677+
}
678+
break;
679+
case "request.id":
680+
String id = getHeaderValue(headers, "x-request-id");
681+
if (id != null) {
682+
attributes.put(attr, encodeAttribute(id));
683+
}
684+
break;
685+
case "request.query":
686+
attributes.put(attr, encodeAttribute(""));
687+
break;
688+
default:
689+
// "Not set" attributes or unrecognized ones (already validated) are skipped.
690+
break;
691+
}
692+
}
693+
return attributes.buildOrThrow();
694+
}
695+
696+
private static Struct encodeAttribute(String value) {
697+
return Struct.newBuilder()
698+
.putFields("", Value.newBuilder().setStringValue(value).build())
699+
.build();
700+
}
701+
702+
private static Struct encodeHeaders(Metadata headers) {
703+
Struct.Builder builder = Struct.newBuilder();
704+
for (String key : headers.keys()) {
705+
String value = getHeaderValue(headers, key);
706+
if (value != null) {
707+
builder.putFields(key.toLowerCase(Locale.ROOT),
708+
Value.newBuilder().setStringValue(value).build());
709+
}
710+
}
711+
return builder.build();
712+
}
713+
714+
@Nullable
715+
private static String getHeaderValue(Metadata headers, String headerName) {
716+
if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
717+
Metadata.Key<byte[]> key;
718+
try {
719+
key = Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER);
720+
} catch (IllegalArgumentException e) {
721+
return null;
722+
}
723+
Iterable<byte[]> values = headers.getAll(key);
724+
if (values == null) {
725+
return null;
726+
}
727+
java.util.List<String> encoded = new ArrayList<>();
728+
for (byte[] v : values) {
729+
encoded.add(BaseEncoding.base64().omitPadding().encode(v));
730+
}
731+
return com.google.common.base.Joiner.on(",").join(encoded);
732+
}
733+
Metadata.Key<String> key;
734+
try {
735+
key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
736+
} catch (IllegalArgumentException e) {
737+
return null;
738+
}
739+
Iterable<String> values = headers.getAll(key);
740+
return values == null ? null : com.google.common.base.Joiner.on(",").join(values);
741+
}
742+
628743
/**
629744
* A local subclass to expose the protected constructor of DelayedClientCall.
630745
*/
@@ -652,6 +767,8 @@ private static class ExtProcClientCall extends SimpleForwardingClientCall<InputS
652767
private final HeaderMutator mutator = HeaderMutator.create();
653768
private final AtomicInteger pendingRequests = new AtomicInteger(0);
654769
private final ProcessingMode currentProcessingMode;
770+
private final MethodDescriptor<?, ?> method;
771+
private final Channel channel;
655772

656773
private volatile Metadata requestHeaders;
657774
final AtomicBoolean activated = new AtomicBoolean(false);
@@ -670,7 +787,9 @@ protected ExtProcClientCall(
670787
ExternalProcessorGrpc.ExternalProcessorStub stub,
671788
ExternalProcessorFilterConfig config,
672789
Optional<HeaderMutationRulesConfig> mutationRulesConfig,
673-
ScheduledExecutorService scheduler) {
790+
ScheduledExecutorService scheduler,
791+
MethodDescriptor<?, ?> method,
792+
Channel channel) {
674793
super(delayedCall);
675794
this.delayedCall = delayedCall;
676795
this.rawCall = rawCall;
@@ -679,6 +798,8 @@ protected ExtProcClientCall(
679798
this.currentProcessingMode = config.getExternalProcessor().getProcessingMode();
680799
this.mutationFilter = new HeaderMutationFilter(mutationRulesConfig);
681800
this.scheduler = scheduler;
801+
this.method = method;
802+
this.channel = channel;
682803
}
683804

684805
private void activateCall() {
@@ -864,6 +985,7 @@ public void onCompleted() {
864985
.setHeaders(toHeaderMap(headers, config.getForwardRulesConfig()))
865986
.setEndOfStream(false)
866987
.build())
988+
.putAllAttributes(collectAttributes(config.getRequestAttributes(), method, channel, headers))
867989
.build());
868990
}
869991

0 commit comments

Comments
 (0)