-
Notifications
You must be signed in to change notification settings - Fork 4k
MCS connection scaling interop tests for Java #12651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fb52bbf
f518d5e
3874631
0bad82f
51b38e2
4da06a4
df70a27
bee005f
efce818
2ac2d47
4214889
e099d1d
3d75bf8
84d9528
3136bca
c3fc7c3
93cb9ad
3719011
dbb3881
0216d3b
3346bf8
d27128b
a8d66e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,7 @@ | |
| import io.grpc.testing.integration.Messages.TestOrcaReport; | ||
| import java.io.File; | ||
| import java.io.FileInputStream; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.nio.charset.Charset; | ||
| import java.util.Arrays; | ||
|
|
@@ -563,7 +564,46 @@ private void runTest(TestCases testCase) throws Exception { | |
| tester.testOrcaOob(); | ||
| break; | ||
| } | ||
|
|
||
|
|
||
| case MAX_CONCURRENT_STREAMS_CONNECTION_SCALING: { | ||
| ChannelCredentials channelCredentials; | ||
| if (useTls) { | ||
| if (!useTestCa) { | ||
| channelCredentials = TlsChannelCredentials.create(); | ||
| } else { | ||
| try { | ||
| channelCredentials = TlsChannelCredentials.newBuilder() | ||
| .trustManager(TlsTesting.loadCert("ca.pem")) | ||
| .build(); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } else { | ||
| channelCredentials = InsecureChannelCredentials.create(); | ||
| } | ||
| ManagedChannelBuilder<?> channelBuilder; | ||
| if (serverPort == 0) { | ||
| channelBuilder = Grpc.newChannelBuilder(serverHost, channelCredentials); | ||
| } else { | ||
| channelBuilder = | ||
| Grpc.newChannelBuilderForAddress(serverHost, serverPort, channelCredentials); | ||
| } | ||
| if (serverHostOverride != null) { | ||
| channelBuilder.overrideAuthority(serverHostOverride); | ||
| } | ||
| channelBuilder.disableServiceConfigLookUp(); | ||
| try { | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse( | ||
| "{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}"); | ||
| channelBuilder.defaultServiceConfig(serviceConfigMap); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build())); | ||
| break; | ||
| } | ||
| default: | ||
| throw new IllegalArgumentException("Unknown test case: " + testCase); | ||
| } | ||
|
|
@@ -596,6 +636,7 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor( | |
| } | ||
|
|
||
| private class Tester extends AbstractInteropTest { | ||
|
|
||
| @Override | ||
| protected ManagedChannelBuilder<?> createChannelBuilder() { | ||
| boolean useGeneric = false; | ||
|
|
@@ -979,31 +1020,17 @@ public void testOrcaOob() throws Exception { | |
| .build(); | ||
|
|
||
| final int retryLimit = 5; | ||
| BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); | ||
| final Object lastItem = new Object(); | ||
| StreamingOutputCallResponseObserver streamingOutputCallResponseObserver = | ||
| new StreamingOutputCallResponseObserver(lastItem); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver = | ||
| asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() { | ||
|
|
||
| @Override | ||
| public void onNext(StreamingOutputCallResponse value) { | ||
| queue.add(value); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| queue.add(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| queue.add(lastItem); | ||
| } | ||
| }); | ||
| asyncStub.fullDuplexCall(streamingOutputCallResponseObserver); | ||
|
|
||
| streamObserver.onNext(StreamingOutputCallRequest.newBuilder() | ||
| .setOrcaOobReport(answer) | ||
| .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); | ||
| assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); | ||
| assertThat(streamingOutputCallResponseObserver.take()) | ||
| .isInstanceOf(StreamingOutputCallResponse.class); | ||
| int i = 0; | ||
| for (; i < retryLimit; i++) { | ||
| Thread.sleep(1000); | ||
|
|
@@ -1016,7 +1043,8 @@ public void onCompleted() { | |
| streamObserver.onNext(StreamingOutputCallRequest.newBuilder() | ||
| .setOrcaOobReport(answer2) | ||
| .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); | ||
| assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); | ||
| assertThat(streamingOutputCallResponseObserver.take()) | ||
| .isInstanceOf(StreamingOutputCallResponse.class); | ||
|
|
||
| for (i = 0; i < retryLimit; i++) { | ||
| Thread.sleep(1000); | ||
|
|
@@ -1027,7 +1055,7 @@ public void onCompleted() { | |
| } | ||
| assertThat(i).isLessThan(retryLimit); | ||
| streamObserver.onCompleted(); | ||
| assertThat(queue.take()).isSameInstanceAs(lastItem); | ||
| assertThat(streamingOutputCallResponseObserver.take()).isSameInstanceAs(lastItem); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -1054,6 +1082,85 @@ protected ServerBuilder<?> getHandshakerServerBuilder() { | |
| protected int operationTimeoutMillis() { | ||
| return 15000; | ||
| } | ||
|
|
||
| class StreamingOutputCallResponseObserver implements | ||
| StreamObserver<StreamingOutputCallResponse> { | ||
| private final Object lastItem; | ||
| private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); | ||
ejona86 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public StreamingOutputCallResponseObserver(Object lastItem) { | ||
| this.lastItem = lastItem; | ||
| } | ||
|
|
||
| @Override | ||
| public void onNext(StreamingOutputCallResponse value) { | ||
| queue.add(value); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| queue.add(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| queue.add(lastItem); | ||
| } | ||
|
|
||
| Object take() throws InterruptedException { | ||
| return queue.take(); | ||
| } | ||
| } | ||
|
|
||
| public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception { | ||
| final Object lastItem = new Object(); | ||
| StreamingOutputCallResponseObserver responseObserver1 = | ||
| new StreamingOutputCallResponseObserver(lastItem); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver1 = | ||
| asyncStub.fullDuplexCall(responseObserver1); | ||
| StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() | ||
| .addResponseParameters(ResponseParameters.newBuilder() | ||
| .setFillPeerSocketAddress( | ||
| Messages.BoolValue.newBuilder().setValue(true).build()) | ||
| .build()) | ||
| .build(); | ||
| streamObserver1.onNext(request); | ||
| Object responseObj = responseObserver1.take(); | ||
| StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj; | ||
| String clientSocketAddressInCall1 = callResponse.getPeerSocketAddress(); | ||
| assertThat(clientSocketAddressInCall1).isNotEmpty(); | ||
|
|
||
| StreamingOutputCallResponseObserver responseObserver2 = | ||
| new StreamingOutputCallResponseObserver(lastItem); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver2 = | ||
| asyncStub.fullDuplexCall(responseObserver2); | ||
| streamObserver2.onNext(request); | ||
| callResponse = (StreamingOutputCallResponse) responseObserver2.take(); | ||
| String clientSocketAddressInCall2 = callResponse.getPeerSocketAddress(); | ||
|
|
||
| assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2); | ||
|
|
||
| // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new | ||
| // connection to be created in the same subchannel and not get queued. | ||
| StreamingOutputCallResponseObserver responseObserver3 = | ||
| new StreamingOutputCallResponseObserver(lastItem); | ||
| StreamObserver<StreamingOutputCallRequest> streamObserver3 = | ||
| asyncStub.fullDuplexCall(responseObserver3); | ||
| streamObserver3.onNext(request); | ||
| callResponse = (StreamingOutputCallResponse) responseObserver3.take(); | ||
| String clientSocketAddressInCall3 = callResponse.getPeerSocketAddress(); | ||
|
|
||
| // This assertion is currently failing because connection scaling when MCS limit has been | ||
| // reached is not yet implemented in gRPC Java. | ||
| assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1); | ||
|
|
||
| streamObserver1.onCompleted(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: complete all three RPCs, then verify all three. That way the RPCs complete in parallel.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| streamObserver2.onCompleted(); | ||
| streamObserver3.onCompleted(); | ||
| assertThat(responseObserver1.take()).isSameInstanceAs(lastItem); | ||
| assertThat(responseObserver2.take()).isSameInstanceAs(lastItem); | ||
| assertThat(responseObserver3.take()).isSameInstanceAs(lastItem); | ||
| } | ||
| } | ||
|
|
||
| private static String validTestCasesHelpText() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,18 @@ | |
|
|
||
| package io.grpc.testing.integration; | ||
|
|
||
| import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.collect.Queues; | ||
| import com.google.errorprone.annotations.concurrent.GuardedBy; | ||
| import com.google.protobuf.ByteString; | ||
| import io.grpc.Context; | ||
| import io.grpc.Contexts; | ||
| import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.ServerCall; | ||
| import io.grpc.ServerCall.Listener; | ||
| import io.grpc.ServerCallHandler; | ||
| import io.grpc.ServerInterceptor; | ||
| import io.grpc.Status; | ||
|
|
@@ -42,10 +47,12 @@ | |
| import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; | ||
| import io.grpc.testing.integration.Messages.TestOrcaReport; | ||
| import io.grpc.testing.integration.TestServiceGrpc.AsyncService; | ||
| import java.net.SocketAddress; | ||
| import java.util.ArrayDeque; | ||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Queue; | ||
|
|
@@ -61,8 +68,8 @@ | |
| * sent in response streams. | ||
| */ | ||
| public class TestServiceImpl implements io.grpc.BindableService, AsyncService { | ||
| static Context.Key<SocketAddress> PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address"); | ||
| private final Random random = new Random(); | ||
|
|
||
| private final ScheduledExecutorService executor; | ||
| private final ByteString compressableBuffer; | ||
| private final MetricRecorder metricRecorder; | ||
|
|
@@ -235,9 +242,27 @@ public void onNext(StreamingOutputCallRequest request) { | |
| .asRuntimeException()); | ||
| return; | ||
| } | ||
| if (whetherSendClientSocketAddressInResponse(request)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we just ignore most of the request? It looks like this should go through
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other parts of the request are about chunking behavior which doesn't apply for this test. I don't need to use |
||
| responseObserver.onNext( | ||
| StreamingOutputCallResponse.newBuilder() | ||
| .setPeerSocketAddress(PEER_ADDRESS_CONTEXT_KEY.get().toString()) | ||
| .build()); | ||
| return; | ||
| } | ||
| dispatcher.enqueue(toChunkQueue(request)); | ||
| } | ||
|
|
||
| private boolean whetherSendClientSocketAddressInResponse(StreamingOutputCallRequest request) { | ||
| Iterator<ResponseParameters> responseParametersIterator = | ||
| request.getResponseParametersList().iterator(); | ||
| while (responseParametersIterator.hasNext()) { | ||
| if (responseParametersIterator.next().getFillPeerSocketAddress().getValue()) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| if (oobTestLocked) { | ||
|
|
@@ -507,7 +532,8 @@ public static List<ServerInterceptor> interceptors() { | |
| return Arrays.asList( | ||
| echoRequestHeadersInterceptor(Util.METADATA_KEY), | ||
| echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), | ||
| echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); | ||
| echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY), | ||
| new McsScalingTestcaseInterceptor()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -539,6 +565,22 @@ public void close(Status status, Metadata trailers) { | |
| }; | ||
| } | ||
|
|
||
| static class McsScalingTestcaseInterceptor implements ServerInterceptor { | ||
| @Override | ||
| public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, | ||
| Metadata headers, ServerCallHandler<ReqT, RespT> next) { | ||
| SocketAddress peerAddress = call.getAttributes().get(TRANSPORT_ATTR_REMOTE_ADDR); | ||
|
|
||
| // Create a new context with the peer address value | ||
| Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress); | ||
| try { | ||
| return Contexts.interceptCall(newContext, call, headers, next); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Echoes request headers with the specified key(s) from a client into response headers only. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need this? Otherwise we are orphaning the RPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.