-
Notifications
You must be signed in to change notification settings - Fork 265
SSE implementation #767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SSE implementation #767
Conversation
feat: Introduced ApplicationIntegrationToolset in JavaADK
…g both streaming and non-streaming capabilities
feat: Add BaseToolset and update McpToolset to use the new interface
Integration of Amazon Bedrock for Claude Models. Example and readme.
google adk merge
Long Term Session memory
adk-core google
… BaseMemoryService constructor
Reverting CassandraRagRetrival extra chnages
Google adk merge
…, Remove DB credential constants, use literals directly, Reduce ConnectionTimeout, HikariCP connection for high-performance connection pooling
Updated the PostgresArtifactService
Google fork syncing
Removed create table for artifacts
…ative) - Make HttpServer SSE default endpoint on port 9085 - Add Spring SSE alternative endpoint on port 9086 - Fix JSON parsing: Change from Gson to Jackson ObjectMapper - Add comprehensive framework comparison documentation - Add testing scripts and documentation - Add unit and integration tests Changes: - HttpServerSseController: Use Jackson ObjectMapper for JSON parsing - HttpServerSseConfig: Default port 9085, enabled by default - ExecutionController: Spring endpoint renamed to /run_sse_spring - application.properties: Configure Spring port 9086, HttpServer port 9085 Documentation: - SSE_FRAMEWORK_COMPARISON.md: Comprehensive framework comparison - TEST_SSE_ENDPOINT.md: Testing guide - QUICK_START_SSE.md: Quick start guide - COMMIT_GUIDE.md: Commit instructions - TEST_RESULTS.md: Test results - TESTING_SUMMARY.md: Test summary Tests: - HttpServerSseControllerTest: Unit tests - HttpServerSseControllerIntegrationTest: Integration tests - Updated existing SseEventStreamService tests Author: Sandeep Belgavi Date: January 24, 2026
Summary of ChangesHello @Sandeep-BA, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands the Agent Development Kit (ADK) for Java by introducing a flexible and performant Server-Sent Events (SSE) streaming mechanism, offering both a lightweight Java HttpServer option and a feature-rich Spring-based alternative. Concurrently, it integrates a diverse set of persistence solutions including PostgreSQL, MapDB, MongoDB, Redis, and Cassandra for robust storage of artifacts and session data. The changes also broaden LLM model compatibility with new Bedrock and Ollama integrations, enhance Gemma's function calling, and introduce a Kafka-based event processing pipeline, thereby establishing a comprehensive multi-tier architecture for scalable and resilient agent development. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
…ative) - Make HttpServer SSE default endpoint on port 9085 - Add Spring SSE alternative endpoint on port 9086 - Fix JSON parsing: Change from Gson to Jackson ObjectMapper - Add comprehensive guide with pros/cons and usage instructions - Add testing scripts and unit/integration tests Changes: - HttpServerSseController: Use Jackson ObjectMapper for JSON parsing - HttpServerSseConfig: Default port 9085, enabled by default - ExecutionController: Spring endpoint renamed to /run_sse_spring - application.properties: Configure Spring port 9086, HttpServer port 9085 Documentation: - SSE_GUIDE.md: Comprehensive guide with pros/cons and usage Tests: - HttpServerSseControllerTest: Unit tests - HttpServerSseControllerIntegrationTest: Integration tests - Updated existing SseEventStreamService tests Author: Sandeep Belgavi Date: January 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements SSE with HttpServer (default) and Spring (alternative), including implementation, configuration, testing, and documentation. The code review highlights several issues, including potential data loss due to incorrect primary key usage in the event_content_parts table, fragile string splitting for filenames and versions, non-atomic operations in Redis, blocking HTTP requests, and the need for more robust error handling. It also suggests improvements to configuration management, JSON handling, and adherence to Bedrock API specifications.
| LlmAgent.builder() | ||
| .name(NAME) | ||
| .model(new com.google.adk.models.OllamaBaseLM("qwen3:0.6b"))//.model(new RedbusADG("40")) | ||
| .description("Agent to calculate trigonometric functions (sine, cosine, tangent) for given angles.") // Updated description |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using event_id as the PRIMARY KEY for the event_content_parts table implies that each event can only have one content part. If an event can have multiple parts (e.g., text, image, function call), then event_id alone cannot uniquely identify each row. This would lead to data loss, as only the last part inserted for a given event_id would be retained due to the ON CONFLICT (event_id) DO UPDATE SET clause in KafkaEventRepository. Consider using a composite primary key, such as (event_id, part_index) or (event_id, part_type) if part_type is unique per event, or introduce a separate unique part_id column.
|
|
||
| for (KafkaEventData.Part part : parts) { | ||
| String partType = part.getPartType(); | ||
| String textContent = null; | ||
| String functionCallId = null; | ||
| String functionCallName = null; | ||
| String functionCallArgs = null; | ||
| String functionResponseId = null; | ||
| String functionResponseName = null; | ||
| String functionResponseData = null; | ||
|
|
||
| switch (partType) { | ||
| case "text": | ||
| textContent = part.getText(); | ||
| break; | ||
| case "functionCall": | ||
| if (part.getFunctionCall() != null) { | ||
| functionCallId = part.getFunctionCall().getId(); | ||
| functionCallName = part.getFunctionCall().getName(); | ||
| functionCallArgs = part.getFunctionCall().getArgs(); | ||
| } | ||
| break; | ||
| case "functionResponse": | ||
| if (part.getFunctionResponse() != null) { | ||
| functionResponseId = part.getFunctionResponse().getId(); | ||
| functionResponseName = part.getFunctionResponse().getName(); | ||
| functionResponseData = part.getFunctionResponse().getResponse(); | ||
| } | ||
| break; | ||
| default: | ||
| logger.warn("Unknown content part type for event ID: {} - {}", eventId, partType); | ||
| continue; | ||
| } | ||
|
|
||
| stmt.setString(1, eventId); | ||
| stmt.setString(2, sessionId); | ||
| stmt.setString(3, partType); | ||
| stmt.setString(4, textContent); | ||
| stmt.setString(5, functionCallId); | ||
| stmt.setString(6, functionCallName); | ||
| stmt.setString(7, safeJsonString(functionCallArgs)); | ||
| stmt.setString(8, functionResponseId); | ||
| stmt.setString(9, functionResponseName); | ||
| stmt.setString(10, safeJsonString(functionResponseData)); | ||
|
|
||
| stmt.addBatch(); | ||
| } | ||
|
|
||
| int[] rowsAffected = stmt.executeBatch(); | ||
| conn.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event_content_parts table schema defines event_id as the PRIMARY KEY. However, the upsertEventContentParts method iterates through a List<KafkaEventData.Part> and attempts to insert each part using the same event_id. Due to the ON CONFLICT (event_id) DO UPDATE SET clause, this will result in only the last part from the list being stored for a given event_id, effectively discarding all other parts. If an event can have multiple content parts, the primary key for event_content_parts must be a composite key (e.g., (event_id, part_index) or (event_id, part_type) if part_type is unique per event) to allow multiple entries per event.
| .required() | ||
| .ifPresent(requiredList -> parametersMap.put("required", requiredList)); | ||
| toolMap.put("parameters", parametersMap); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tool schema is being put under the key "parameters" in the toolMap. However, the Bedrock converse API expects the tool schema to be directly under "inputSchema" within the toolSpec object, not nested under "parameters". This incorrect structure will likely lead to API errors when attempting to use tool calling. Please adjust the JSON structure to match Bedrock's expected format for tool declarations.
| <groupId>org.testcontainers</groupId> | ||
| <artifactId>cassandra</artifactId> | ||
| <version>1.19.7</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.testcontainers</groupId> | ||
| <artifactId>junit-jupiter</artifactId> | ||
| <version>1.19.7</version> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testcontainers dependencies (cassandra and junit-jupiter) are correctly scoped as test. However, the testcontainers dependency itself is not explicitly declared with test scope, which means it will be included in the runtime classpath of the core module. Test-scoped dependencies should not be part of the main application's runtime dependencies.
| httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofInputStream()); | ||
| int statusCode = response.statusCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The httpClient.send() method is a blocking operation. In a streaming context, blocking the thread while waiting for the entire response to be received can negate the benefits of streaming and lead to performance bottlenecks. For true streaming, HttpClient.sendAsync() should be used, and the HttpResponse.BodyHandlers.ofInputStream() should be processed asynchronously as data arrives.
| } | ||
|
|
||
| JSONObject corrected = new JSONObject("{" + "\"events\":" + raw + "}"); | ||
| JSONArray events = corrected.getJSONArray("events"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| + "session_id=EXCLUDED.session_id, author=EXCLUDED.author, actions_state_delta=EXCLUDED.actions_state_delta, " | ||
| + "actions_artifact_delta=EXCLUDED.actions_artifact_delta, actions_requested_auth_configs=EXCLUDED.actions_requested_auth_configs, " | ||
| + "actions_transfer_to_agent=EXCLUDED.actions_transfer_to_agent, content_role=EXCLUDED.content_role, " | ||
| + "timestamp=EXCLUDED.timestamp, invocation_id=EXCLUDED.invocation_id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line stmtEvents.setString(4, actions.opt("stateDelta").toString()); uses opt("stateDelta").toString() to get the JSON string for a JSONB column. If stateDelta is a complex JSON object, toString() might not produce a valid JSON string suitable for direct insertion into a JSONB column, or it might produce a string with unwanted escaping. It's safer to explicitly serialize the stateDelta object to a JSON string using objectMapper.writeValueAsString(actions.optJSONObject("stateDelta")) to ensure correct JSON formatting.
| for (int i = 0; i < events.length(); i++) { | ||
| JSONObject ev = events.getJSONObject(i); | ||
| // logger.info("Processing event for insertion: {}", ev.toString()); | ||
| JSONObject actions = ev.getJSONObject("actions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line stmtParts.setString(7, fc != null ? fc.opt("args").toString() : null); uses fc.opt("args").toString() to get the JSON string for a JSONB column. If args is a complex JSON object, toString() might not produce a valid JSON string suitable for direct insertion into a JSONB column, or it might produce a string with unwanted escaping. It's safer to explicitly serialize the args object to a JSON string using objectMapper.writeValueAsString(fc.optJSONObject("args")) to ensure correct JSON formatting.
| stmtParts.executeUpdate(); | ||
| } | ||
| } | ||
| conn.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conn.commit() is called inside the loop, meaning a separate transaction is committed for each event. This is inefficient if there are many events to insert for a single session, as each commit incurs overhead. It would be more efficient to commit once after all events for a session have been inserted, or to use batch updates and commit the entire batch.
| log.info("Using MapDbSessionService"); | ||
| return new MapDbSessionService("map.db"); | ||
| } catch (Exception ex) { | ||
| java.util.logging.Logger.getLogger(AdkWebServer.class.getName()).log(Level.SEVERE, null, ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value for event_data is an empty JSON array []. However, the KafkaEventRepository's parseEventData method expects event_data to contain an events array within a JSON object (e.g., {"events": [...]}). This discrepancy might lead to parsing errors or unexpected behavior if event_data is initialized with this default and later processed. Consider if event_data should default to {}, or if the parsing logic should explicitly handle an empty array as a valid input for an empty list of events.
No description provided.