Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cronet/src/main/java/io/grpc/cronet/CronetClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
* Client stream for the cronet transport.
*/
class CronetClientStream extends AbstractClientStream {
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private static final String LOG_TAG = "grpc-java-cronet";

Expand All @@ -69,6 +68,12 @@ class CronetClientStream extends AbstractClientStream {

static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY =
CallOptions.Key.create("cronet-annotations");
/**
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
* size leads to less overhead but more memory consumption. The current default value is 4KB.
*/
public static final CallOptions.Key<Integer> CRONET_READ_BUFFER_SIZE_KEY =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is not accessible to others, because it is in a package-private class. We want this class to be package-private. If we want this API for internal-only (at least for now), then you can add accessor methods to InternalCronetCallOptions.

CallOptions.Key.createWithDefault("cronet-read-buffer-size", 4 * 1024);

private final String url;
private final String userAgent;
Expand All @@ -85,6 +90,8 @@ class CronetClientStream extends AbstractClientStream {
private final Collection<Object> annotations;
private final TransportState state;
private final Sink sink = new Sink();
@VisibleForTesting
final int readBufferSize;
private StreamBuilderFactory streamFactory;

CronetClientStream(
Expand Down Expand Up @@ -120,6 +127,7 @@ class CronetClientStream extends AbstractClientStream {
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
callOptions);
this.readBufferSize = callOptions.getOption(CRONET_READ_BUFFER_SIZE_KEY);

// Tests expect the "plain" deframer behavior, not MigratingDeframer
// https://github.com/grpc/grpc-java/issues/7140
Expand Down Expand Up @@ -309,7 +317,7 @@ public void bytesRead(int processedBytes) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.read");
}
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}
}

Expand Down Expand Up @@ -429,7 +437,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
Log.v(LOG_TAG, "BidirectionalStream.read");
}
reportHeaders(info.getAllHeadersAsList(), false);
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}

@Override
Expand Down
36 changes: 36 additions & 0 deletions cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.cronet;

import static io.grpc.cronet.CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
Expand Down Expand Up @@ -92,6 +93,41 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
assertFalse(stream.idempotent);
}

@Test
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
CronetTransportFactory transportFactory =
(CronetTransportFactory) builder.buildTransportFactory();
CronetClientTransport transport =
(CronetClientTransport)
transportFactory.newClientTransport(
new InetSocketAddress("localhost", 443),
new ClientTransportOptions(),
channelLogger);
CronetClientStream stream = transport.newStream(
method, new Metadata(), CallOptions.DEFAULT, tracers);

assertEquals(4 * 1024, stream.readBufferSize);
}

@Test
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
CronetTransportFactory transportFactory =
(CronetTransportFactory) builder.buildTransportFactory();
CronetClientTransport transport =
(CronetClientTransport)
transportFactory.newClientTransport(
new InetSocketAddress("localhost", 443),
new ClientTransportOptions(),
channelLogger);
CronetClientStream stream = transport.newStream(
method, new Metadata(),
CallOptions.DEFAULT.withOption(CRONET_READ_BUFFER_SIZE_KEY, 32 * 1024), tracers);

assertEquals(32 * 1024, stream.readBufferSize);
}

@Test
public void scheduledExecutorService_default() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
Expand Down
Loading