feat(http-client): introduce ServerSentEvent type and spec-compliant SSE parser#853
feat(http-client): introduce ServerSentEvent type and spec-compliant SSE parser#853ehsavoie wants to merge 1 commit intoa2aproject:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the SSE handling mechanism by introducing a dedicated ServerSentEvent record and a unified ServerSentEventParser across the JDK, Android, and Vert.x HTTP client implementations. The A2AHttpClient interface and its consumers have been updated to use this new event structure. Additionally, a deadlock issue in Vert.x routes was addressed by disabling ordered execution for blocking handlers. Review feedback highlights that non-SSE responses are incorrectly delivered line-by-line in several client implementations, which could break JSON parsing for multi-line bodies. There is also a concern regarding the ServerSentEventParser dispatching pending data upon flushing, which violates the SSE specification's requirement to discard such data when a stream ends mid-event.
I am having trouble creating individual review comments. Click here to see my feedback.
http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java (161-163)
Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response (e.g., a JSON error) is multi-line or pretty-printed. For non-SSE responses, the entire body should be buffered and delivered as a single ServerSentEvent to ensure it can be parsed correctly as a single JSON object.
extras/http-client-vertx/src/main/java/org/a2aproject/sdk/client/http/VertxA2AHttpClient.java (613-615)
Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response is multi-line JSON. Similar to the implementation for error bodies in AndroidA2AHttpClient, the entire non-SSE body should be buffered and delivered as a single ServerSentEvent.
extras/http-client-android/src/main/java/org/a2aproject/sdk/client/http/AndroidA2AHttpClient.java (188-192)
Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response is multi-line JSON. While error bodies are correctly handled as a single block at line 167, 2xx non-SSE responses are still split by line here. It is recommended to buffer the entire body and deliver it as a single ServerSentEvent.
http-client/src/main/java/org/a2aproject/sdk/client/http/ServerSentEventParser.java (171-173)
The SSE specification (Section 9.2.6) states that "Once the end of the file is reached, any pending data must be discarded." Dispatching the buffered event on flush() deviates from this requirement. This also introduces inconsistent behavior across client implementations, as those relying on standard-compliant parsers (like Vert.x) will not dispatch a partial event if the stream ends without a trailing empty line.
public void flush() {
// Per SSE spec: any pending data must be discarded if the stream ends mid-event
dataBuffer.clear();
dataBufferChars = 0;
eventType = null;
skippingCurrentEvent = false;
}14a480f to
bc9e6e7
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the SSE handling logic across the SDK to use a unified ServerSentEvent record and a dedicated ServerSentEventParser. This change standardizes SSE processing, improves error handling, and introduces support for SSE fields like event types, IDs, and retry intervals. Additionally, the PR adds support for Android and Vert.x HTTP clients and includes necessary configuration updates for testing. My review highlights critical improvements needed for error propagation in the JDK client and security enhancements to prevent potential Denial of Service vulnerabilities through response body size limiting.
| return httpClient.sendAsync(request, bodyHandler) | ||
| .thenAccept(response -> { | ||
| // Handle non-authentication/non-authorization errors here | ||
| if (!isSuccessStatus(response.statusCode()) && | ||
| response.statusCode() != HTTP_UNAUTHORIZED && | ||
| response.statusCode() != HTTP_FORBIDDEN) { | ||
| subscriber.onError(new IOException("Request failed with status " + response.statusCode() + ":" + response.body())); | ||
| } | ||
| }); | ||
| .thenRun(() -> {}); |
There was a problem hiding this comment.
The errorConsumer is not notified if the asynchronous request fails (e.g., due to a connection error) before the body handler is reached. Propagating the failure of the sendAsync future to the subscriber ensures consistent error handling across all client implementations.
return httpClient.sendAsync(request, bodyHandler)
.whenComplete((resp, ex) -> {
if (ex != null) {
subscriber.onError(ex);
}
})
.thenRun(() -> {});| } else if (!item.isEmpty()) { | ||
| if (nonSseBodyBuffer.length() > 0) { | ||
| nonSseBodyBuffer.append('\n'); | ||
| } | ||
| // Other SSE control lines (event:, id:, retry:, :) are ignored | ||
| } else { | ||
| // Plain error body: deliver so SSEEventListener can parse the typed error | ||
| messageConsumer.accept(item); | ||
| nonSseBodyBuffer.append(item); | ||
| } |
There was a problem hiding this comment.
The nonSseBodyBuffer lacks a size limit when accumulating non-SSE response bodies. This could lead to memory exhaustion if a server sends an unexpectedly large response. It is recommended to enforce a hard cap (e.g., 1 MB) and signal an error if exceeded to prevent Denial of Service (DoS) via OutOfMemoryError.
} else if (!item.isEmpty()) {
if (nonSseBodyBuffer.length() + item.length() > 1024 * 1024) {
subscriber.onError(new IOException("Response body exceeded maximum allowed size"));
return;
}
if (nonSseBodyBuffer.length() > 0) {
nonSseBodyBuffer.append('\n');
}
nonSseBodyBuffer.append(item);
}References
- When a class is designed to hold the entire content of a file or response in memory, an explicit size validation is necessary to prevent Denial of Service (DoS) via OutOfMemoryError for arbitrarily large inputs.
| try (InputStream is = connection.getInputStream(); | ||
| BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { | ||
| String line; | ||
| while ((line = reader.readLine()) != null) { | ||
| if (line.startsWith("data:")) { | ||
| String data = line.substring(5).trim(); | ||
| if (!data.isEmpty()) { | ||
| messageConsumer.accept(data); | ||
| if (isSse) { | ||
| ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer); | ||
| while ((line = reader.readLine()) != null) { | ||
| sseParser.processLine(line); | ||
| } | ||
| sseParser.flush(); | ||
| } else { | ||
| StringBuilder bodyBuffer = new StringBuilder(); | ||
| while ((line = reader.readLine()) != null) { | ||
| if (!line.isEmpty()) { | ||
| if (bodyBuffer.length() > 0) { | ||
| bodyBuffer.append('\n'); | ||
| } | ||
| bodyBuffer.append(line); | ||
| } | ||
| } | ||
| String body = bodyBuffer.toString(); | ||
| if (!body.isEmpty()) { | ||
| messageConsumer.accept(new ServerSentEvent(body)); | ||
| } | ||
| } | ||
| completeRunnable.run(); | ||
| } |
There was a problem hiding this comment.
The current implementation for non-SSE responses in processSSEResponse buffers the entire body without a size limit, which is a potential DoS vulnerability. Reusing the readStreamWithLimit method provides a safe 10 MB cap and simplifies the logic by only opening the BufferedReader when an SSE stream is actually detected.
try (InputStream is = connection.getInputStream()) {
if (isSse) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer);
String line;
while ((line = reader.readLine()) != null) {
sseParser.processLine(line);
}
sseParser.flush();
}
} else {
String body = readStreamWithLimit(is);
if (!body.isEmpty()) {
messageConsumer.accept(new ServerSentEvent(body));
}
}
completeRunnable.run();
}References
- When a class is designed to hold the entire content of a file or response in memory, an explicit size validation is necessary to prevent Denial of Service (DoS) via OutOfMemoryError for arbitrarily large inputs.
…SSE parser Replace raw string SSE callbacks with a structured ServerSentEvent record across all HTTP client implementations (JDK, Vert.x, Android). Add ServerSentEventParser with full SSE spec compliance (multi-line data, event id/type, retry, DoS limits). Expand integration tests for typed SSE events and add Android transport variants for existing JSONRPC and REST reference tests. This fixes a2aproject#839 Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
Replace raw string SSE callbacks with a structured ServerSentEvent record across
all HTTP client implementations (JDK, Vert.x, Android). Add ServerSentEventParser
with full SSE spec compliance (multi-line data, event id/type, retry, DoS limits).
Expand integration tests for typed SSE events and add Android transport variants
for existing JSONRPC and REST reference tests.
Fixes #839 🦕