Skip to content

feat(http-client): introduce ServerSentEvent type and spec-compliant SSE parser#853

Open
ehsavoie wants to merge 1 commit intoa2aproject:mainfrom
ehsavoie:issue-839
Open

feat(http-client): introduce ServerSentEvent type and spec-compliant SSE parser#853
ehsavoie wants to merge 1 commit intoa2aproject:mainfrom
ehsavoie:issue-839

Conversation

@ehsavoie
Copy link
Copy Markdown
Collaborator

@ehsavoie ehsavoie commented May 6, 2026

Replace raw string SSE callbacks with a structured ServerSentEvent record across
all HTTP client implementations (JDK, Vert.x, Android). Add ServerSentEventParser
with full SSE spec compliance (multi-line data, event id/type, retry, DoS limits).
Expand integration tests for typed SSE events and add Android transport variants
for existing JSONRPC and REST reference tests.

Fixes #839 🦕

@ehsavoie ehsavoie requested review from jmesnil and kabir May 6, 2026 15:20
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the SSE handling mechanism by introducing a dedicated ServerSentEvent record and a unified ServerSentEventParser across the JDK, Android, and Vert.x HTTP client implementations. The A2AHttpClient interface and its consumers have been updated to use this new event structure. Additionally, a deadlock issue in Vert.x routes was addressed by disabling ordered execution for blocking handlers. Review feedback highlights that non-SSE responses are incorrectly delivered line-by-line in several client implementations, which could break JSON parsing for multi-line bodies. There is also a concern regarding the ServerSentEventParser dispatching pending data upon flushing, which violates the SSE specification's requirement to discard such data when a stream ends mid-event.

I am having trouble creating individual review comments. Click here to see my feedback.

http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java (161-163)

high

Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response (e.g., a JSON error) is multi-line or pretty-printed. For non-SSE responses, the entire body should be buffered and delivered as a single ServerSentEvent to ensure it can be parsed correctly as a single JSON object.

extras/http-client-vertx/src/main/java/org/a2aproject/sdk/client/http/VertxA2AHttpClient.java (613-615)

high

Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response is multi-line JSON. Similar to the implementation for error bodies in AndroidA2AHttpClient, the entire non-SSE body should be buffered and delivered as a single ServerSentEvent.

extras/http-client-android/src/main/java/org/a2aproject/sdk/client/http/AndroidA2AHttpClient.java (188-192)

high

Delivering non-SSE bodies line-by-line to the messageConsumer will cause parsing failures in SSEEventListener if the response is multi-line JSON. While error bodies are correctly handled as a single block at line 167, 2xx non-SSE responses are still split by line here. It is recommended to buffer the entire body and deliver it as a single ServerSentEvent.

http-client/src/main/java/org/a2aproject/sdk/client/http/ServerSentEventParser.java (171-173)

medium

The SSE specification (Section 9.2.6) states that "Once the end of the file is reached, any pending data must be discarded." Dispatching the buffered event on flush() deviates from this requirement. This also introduces inconsistent behavior across client implementations, as those relying on standard-compliant parsers (like Vert.x) will not dispatch a partial event if the stream ends without a trailing empty line.

    public void flush() {
        // Per SSE spec: any pending data must be discarded if the stream ends mid-event
        dataBuffer.clear();
        dataBufferChars = 0;
        eventType = null;
        skippingCurrentEvent = false;
    }

@ehsavoie ehsavoie force-pushed the issue-839 branch 2 times, most recently from 14a480f to bc9e6e7 Compare May 7, 2026 06:07
@ehsavoie
Copy link
Copy Markdown
Collaborator Author

ehsavoie commented May 7, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the SSE handling logic across the SDK to use a unified ServerSentEvent record and a dedicated ServerSentEventParser. This change standardizes SSE processing, improves error handling, and introduces support for SSE fields like event types, IDs, and retry intervals. Additionally, the PR adds support for Android and Vert.x HTTP clients and includes necessary configuration updates for testing. My review highlights critical improvements needed for error propagation in the JDK client and security enhancements to prevent potential Denial of Service vulnerabilities through response body size limiting.

Comment on lines +246 to +247
return httpClient.sendAsync(request, bodyHandler)
.thenAccept(response -> {
// Handle non-authentication/non-authorization errors here
if (!isSuccessStatus(response.statusCode()) &&
response.statusCode() != HTTP_UNAUTHORIZED &&
response.statusCode() != HTTP_FORBIDDEN) {
subscriber.onError(new IOException("Request failed with status " + response.statusCode() + ":" + response.body()));
}
});
.thenRun(() -> {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The errorConsumer is not notified if the asynchronous request fails (e.g., due to a connection error) before the body handler is reached. Propagating the failure of the sendAsync future to the subscriber ensures consistent error handling across all client implementations.

            return httpClient.sendAsync(request, bodyHandler)
                    .whenComplete((resp, ex) -> {
                        if (ex != null) {
                            subscriber.onError(ex);
                        }
                    })
                    .thenRun(() -> {});

Comment on lines +162 to 167
} else if (!item.isEmpty()) {
if (nonSseBodyBuffer.length() > 0) {
nonSseBodyBuffer.append('\n');
}
// Other SSE control lines (event:, id:, retry:, :) are ignored
} else {
// Plain error body: deliver so SSEEventListener can parse the typed error
messageConsumer.accept(item);
nonSseBodyBuffer.append(item);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The nonSseBodyBuffer lacks a size limit when accumulating non-SSE response bodies. This could lead to memory exhaustion if a server sends an unexpectedly large response. It is recommended to enforce a hard cap (e.g., 1 MB) and signal an error if exceeded to prevent Denial of Service (DoS) via OutOfMemoryError.

                        } else if (!item.isEmpty()) {
                            if (nonSseBodyBuffer.length() + item.length() > 1024 * 1024) {
                                subscriber.onError(new IOException("Response body exceeded maximum allowed size"));
                                return;
                            }
                            if (nonSseBodyBuffer.length() > 0) {
                                nonSseBodyBuffer.append('\n');
                            }
                            nonSseBodyBuffer.append(item);
                        }
References
  1. When a class is designed to hold the entire content of a file or response in memory, an explicit size validation is necessary to prevent Denial of Service (DoS) via OutOfMemoryError for arbitrarily large inputs.

Comment on lines 178 to 203
try (InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data:")) {
String data = line.substring(5).trim();
if (!data.isEmpty()) {
messageConsumer.accept(data);
if (isSse) {
ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer);
while ((line = reader.readLine()) != null) {
sseParser.processLine(line);
}
sseParser.flush();
} else {
StringBuilder bodyBuffer = new StringBuilder();
while ((line = reader.readLine()) != null) {
if (!line.isEmpty()) {
if (bodyBuffer.length() > 0) {
bodyBuffer.append('\n');
}
bodyBuffer.append(line);
}
}
String body = bodyBuffer.toString();
if (!body.isEmpty()) {
messageConsumer.accept(new ServerSentEvent(body));
}
}
completeRunnable.run();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The current implementation for non-SSE responses in processSSEResponse buffers the entire body without a size limit, which is a potential DoS vulnerability. Reusing the readStreamWithLimit method provides a safe 10 MB cap and simplifies the logic by only opening the BufferedReader when an SSE stream is actually detected.

        try (InputStream is = connection.getInputStream()) {
          if (isSse) {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
              ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer);
              String line;
              while ((line = reader.readLine()) != null) {
                sseParser.processLine(line);
              }
              sseParser.flush();
            }
          } else {
            String body = readStreamWithLimit(is);
            if (!body.isEmpty()) {
              messageConsumer.accept(new ServerSentEvent(body));
            }
          }
          completeRunnable.run();
        }
References
  1. When a class is designed to hold the entire content of a file or response in memory, an explicit size validation is necessary to prevent Denial of Service (DoS) via OutOfMemoryError for arbitrarily large inputs.

…SSE parser

Replace raw string SSE callbacks with a structured ServerSentEvent record across
  all HTTP client implementations (JDK, Vert.x, Android). Add ServerSentEventParser
  with full SSE spec compliance (multi-line data, event id/type, retry, DoS limits).
Expand integration tests for typed SSE events and add Android transport variants
  for existing JSONRPC and REST reference tests.

This fixes a2aproject#839

Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve SSE implementation

1 participant