Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;

Expand Down Expand Up @@ -155,6 +156,9 @@ public final void setDecompressorRegistry(DecompressorRegistry decompressorRegis
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
if (!useGet) {
// Capture the message encoding before headers are cleared, so we can warn
// if the server doesn't advertise support for it (issue #1804)
transportState().setSentMessageEncoding(headers.get(MESSAGE_ENCODING_KEY));
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
Expand Down Expand Up @@ -224,6 +228,8 @@ protected abstract static class TransportState extends AbstractStream.TransportS
private ClientStreamListener listener;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
/** The message encoding sent by the client, or null if identity/none. */
@Nullable private String sentMessageEncoding;

private boolean deframerClosed = false;
private Runnable deframerClosedTask;
Expand Down Expand Up @@ -261,6 +267,16 @@ private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry)
checkNotNull(decompressorRegistry, "decompressorRegistry");
}

/**
* Sets the message encoding that the client is using for outbound messages.
* Used to warn if the server doesn't advertise support for this encoding.
*
* @param messageEncoding the encoding name (e.g., "gzip"), or null for identity
*/
protected void setSentMessageEncoding(@Nullable String messageEncoding) {
this.sentMessageEncoding = messageEncoding;
}

@VisibleForTesting
public final void setListener(ClientStreamListener listener) {
checkState(this.listener == null, "Already called setListener");
Expand Down Expand Up @@ -342,6 +358,19 @@ protected void inboundHeadersReceived(Metadata headers) {
}
}

// Warn if client sent compressed messages but server didn't advertise support
if (sentMessageEncoding != null) {
byte[] acceptEncoding = headers.get(MESSAGE_ACCEPT_ENCODING_KEY);
if (acceptEncoding == null) {
log.log(
Level.FINE,
"Server did not include grpc-accept-encoding header in response. "
+ "Client sent messages with encoding [{0}]. "
+ "The server may not support this encoding.",
sentMessageEncoding);
}
}

listener().headersRead(headers);
}

Expand Down
140 changes: 140 additions & 0 deletions core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -527,6 +533,140 @@ public void resetOnReadyThreshold() {
assertNull(options.clearOnReadyThreshold().getOnReadyThreshold());
}

@Test
public void inboundHeadersReceived_warnsWhenServerDoesNotAdvertiseAcceptEncoding() {
// Set up log capture
Logger logger = Logger.getLogger(AbstractClientStream.class.getName());
Level originalLevel = logger.getLevel();
List<LogRecord> logs = new ArrayList<>();
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {}

@Override
public void close() {}
};
logger.addHandler(handler);
logger.setLevel(Level.FINE);

try {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);

// Simulate that client sent gzip-compressed messages
stream.transportState().setSentMessageEncoding("gzip");

// Server responds without grpc-accept-encoding header
Metadata headers = new Metadata();
stream.transportState().inboundHeadersReceived(headers);

// Verify warning was logged
verify(mockListener).headersRead(headers);
assertThat(logs).hasSize(1);
LogRecord record = logs.get(0);
assertThat(record.getLevel()).isEqualTo(Level.FINE);
assertThat(record.getMessage()).contains("grpc-accept-encoding");
// The parameter {0} contains the encoding
assertThat(record.getParameters()).asList().contains("gzip");
} finally {
logger.removeHandler(handler);
logger.setLevel(originalLevel);
}
}

@Test
public void inboundHeadersReceived_noWarningWhenServerAdvertisesAcceptEncoding() {
// Set up log capture
Logger logger = Logger.getLogger(AbstractClientStream.class.getName());
Level originalLevel = logger.getLevel();
List<LogRecord> logs = new ArrayList<>();
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {}

@Override
public void close() {}
};
logger.addHandler(handler);
logger.setLevel(Level.FINE);

try {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);

// Simulate that client sent gzip-compressed messages
stream.transportState().setSentMessageEncoding("gzip");

// Server responds with grpc-accept-encoding header
Metadata headers = new Metadata();
headers.put(
GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY,
"gzip".getBytes(java.nio.charset.StandardCharsets.US_ASCII));
stream.transportState().inboundHeadersReceived(headers);

// Verify no warning was logged
verify(mockListener).headersRead(headers);
assertThat(logs).isEmpty();
} finally {
logger.removeHandler(handler);
logger.setLevel(originalLevel);
}
}

@Test
public void inboundHeadersReceived_noWarningWhenClientDidNotUseCompression() {
// Set up log capture
Logger logger = Logger.getLogger(AbstractClientStream.class.getName());
Level originalLevel = logger.getLevel();
List<LogRecord> logs = new ArrayList<>();
Handler handler = new Handler() {
@Override
public void publish(LogRecord record) {
logs.add(record);
}

@Override
public void flush() {}

@Override
public void close() {}
};
logger.addHandler(handler);
logger.setLevel(Level.FINE);

try {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);

// Client did not set any encoding (using identity/no compression)
// sentMessageEncoding is null by default

// Server responds without grpc-accept-encoding header
Metadata headers = new Metadata();
stream.transportState().inboundHeadersReceived(headers);

// Verify no warning was logged (client didn't use compression)
verify(mockListener).headersRead(headers);
assertThat(logs).isEmpty();
} finally {
logger.removeHandler(handler);
logger.setLevel(originalLevel);
}
}

/**
* No-op base class for testing.
*/
Expand Down