Skip to content

Conversation

@Sandeep-BA
Copy link

No description provided.

manojkumarredbus and others added 30 commits July 7, 2025 12:42
feat: Introduced ApplicationIntegrationToolset in JavaADK
…g both streaming and non-streaming capabilities
feat: Add BaseToolset and update McpToolset to use the new interface
Integration of Amazon Bedrock for Claude Models. Example and readme.
Yashas Shetty and others added 25 commits January 5, 2026 16:10
Reverting CassandraRagRetrival extra chnages
…, Remove DB credential constants, use literals directly, Reduce ConnectionTimeout, HikariCP connection for high-performance connection pooling
Updated the PostgresArtifactService
Removed create table for artifacts
…ative)

- Make HttpServer SSE default endpoint on port 9085
- Add Spring SSE alternative endpoint on port 9086
- Fix JSON parsing: Change from Gson to Jackson ObjectMapper
- Add comprehensive framework comparison documentation
- Add testing scripts and documentation
- Add unit and integration tests

Changes:
- HttpServerSseController: Use Jackson ObjectMapper for JSON parsing
- HttpServerSseConfig: Default port 9085, enabled by default
- ExecutionController: Spring endpoint renamed to /run_sse_spring
- application.properties: Configure Spring port 9086, HttpServer port 9085

Documentation:
- SSE_FRAMEWORK_COMPARISON.md: Comprehensive framework comparison
- TEST_SSE_ENDPOINT.md: Testing guide
- QUICK_START_SSE.md: Quick start guide
- COMMIT_GUIDE.md: Commit instructions
- TEST_RESULTS.md: Test results
- TESTING_SUMMARY.md: Test summary

Tests:
- HttpServerSseControllerTest: Unit tests
- HttpServerSseControllerIntegrationTest: Integration tests
- Updated existing SseEventStreamService tests

Author: Sandeep Belgavi
Date: January 24, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Sandeep-BA, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly expands the Agent Development Kit (ADK) for Java by introducing a flexible and performant Server-Sent Events (SSE) streaming mechanism, offering both a lightweight Java HttpServer option and a feature-rich Spring-based alternative. Concurrently, it integrates a diverse set of persistence solutions including PostgreSQL, MapDB, MongoDB, Redis, and Cassandra for robust storage of artifacts and session data. The changes also broaden LLM model compatibility with new Bedrock and Ollama integrations, enhance Gemma's function calling, and introduce a Kafka-based event processing pipeline, thereby establishing a comprehensive multi-tier architecture for scalable and resilient agent development.

Highlights

  • Server-Sent Events (SSE) Implementation: Introduced two distinct SSE implementations: a lightweight Java HttpServer-based option (default on port 8081) and a Spring-based alternative (on port 8080). Both can run simultaneously and share the same request/response format.
  • Enhanced Persistence Layer: Integrated multiple persistence solutions including PostgreSQL, MapDB, MongoDB, Redis, and Cassandra for storing artifacts and session data, offering diverse choices for different use cases.
  • Expanded LLM Model Support: Added new integrations for Bedrock and Ollama LLMs, enhanced Gemma with function calling emulation, and introduced Redbus AD Gateway for Azure LLM access, broadening the range of supported models.
  • Kafka Event Processing Pipeline: Implemented a Kafka consumer service to process ADK events in real-time and persist them to PostgreSQL, supporting a robust multi-tier architecture for event streaming and data consistency.
  • Comprehensive Documentation: Provided extensive new Markdown documentation covering SSE implementation details, framework comparisons, quick start guides, testing procedures, and architectural overviews for the new features.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

…ative)

- Make HttpServer SSE default endpoint on port 9085
- Add Spring SSE alternative endpoint on port 9086
- Fix JSON parsing: Change from Gson to Jackson ObjectMapper
- Add comprehensive guide with pros/cons and usage instructions
- Add testing scripts and unit/integration tests

Changes:
- HttpServerSseController: Use Jackson ObjectMapper for JSON parsing
- HttpServerSseConfig: Default port 9085, enabled by default
- ExecutionController: Spring endpoint renamed to /run_sse_spring
- application.properties: Configure Spring port 9086, HttpServer port 9085

Documentation:
- SSE_GUIDE.md: Comprehensive guide with pros/cons and usage

Tests:
- HttpServerSseControllerTest: Unit tests
- HttpServerSseControllerIntegrationTest: Integration tests
- Updated existing SseEventStreamService tests

Author: Sandeep Belgavi
Date: January 24, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements SSE with HttpServer (default) and Spring (alternative), including implementation, configuration, testing, and documentation. The code review highlights several issues, including potential data loss due to incorrect primary key usage in the event_content_parts table, fragile string splitting for filenames and versions, non-atomic operations in Redis, blocking HTTP requests, and the need for more robust error handling. It also suggests improvements to configuration management, JSON handling, and adherence to Bedrock API specifications.

LlmAgent.builder()
.name(NAME)
.model(new com.google.adk.models.OllamaBaseLM("qwen3:0.6b"))//.model(new RedbusADG("40"))
.description("Agent to calculate trigonometric functions (sine, cosine, tangent) for given angles.") // Updated description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using event_id as the PRIMARY KEY for the event_content_parts table implies that each event can only have one content part. If an event can have multiple parts (e.g., text, image, function call), then event_id alone cannot uniquely identify each row. This would lead to data loss, as only the last part inserted for a given event_id would be retained due to the ON CONFLICT (event_id) DO UPDATE SET clause in KafkaEventRepository. Consider using a composite primary key, such as (event_id, part_index) or (event_id, part_type) if part_type is unique per event, or introduce a separate unique part_id column.

Comment on lines +209 to +258

for (KafkaEventData.Part part : parts) {
String partType = part.getPartType();
String textContent = null;
String functionCallId = null;
String functionCallName = null;
String functionCallArgs = null;
String functionResponseId = null;
String functionResponseName = null;
String functionResponseData = null;

switch (partType) {
case "text":
textContent = part.getText();
break;
case "functionCall":
if (part.getFunctionCall() != null) {
functionCallId = part.getFunctionCall().getId();
functionCallName = part.getFunctionCall().getName();
functionCallArgs = part.getFunctionCall().getArgs();
}
break;
case "functionResponse":
if (part.getFunctionResponse() != null) {
functionResponseId = part.getFunctionResponse().getId();
functionResponseName = part.getFunctionResponse().getName();
functionResponseData = part.getFunctionResponse().getResponse();
}
break;
default:
logger.warn("Unknown content part type for event ID: {} - {}", eventId, partType);
continue;
}

stmt.setString(1, eventId);
stmt.setString(2, sessionId);
stmt.setString(3, partType);
stmt.setString(4, textContent);
stmt.setString(5, functionCallId);
stmt.setString(6, functionCallName);
stmt.setString(7, safeJsonString(functionCallArgs));
stmt.setString(8, functionResponseId);
stmt.setString(9, functionResponseName);
stmt.setString(10, safeJsonString(functionResponseData));

stmt.addBatch();
}

int[] rowsAffected = stmt.executeBatch();
conn.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The event_content_parts table schema defines event_id as the PRIMARY KEY. However, the upsertEventContentParts method iterates through a List<KafkaEventData.Part> and attempts to insert each part using the same event_id. Due to the ON CONFLICT (event_id) DO UPDATE SET clause, this will result in only the last part from the list being stored for a given event_id, effectively discarding all other parts. If an event can have multiple content parts, the primary key for event_content_parts must be a composite key (e.g., (event_id, part_index) or (event_id, part_type) if part_type is unique per event) to allow multiple entries per event.

.required()
.ifPresent(requiredList -> parametersMap.put("required", requiredList));
toolMap.put("parameters", parametersMap);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tool schema is being put under the key "parameters" in the toolMap. However, the Bedrock converse API expects the tool schema to be directly under "inputSchema" within the toolSpec object, not nested under "parameters". This incorrect structure will likely lead to API errors when attempting to use tool calling. Please adjust the JSON structure to match Bedrock's expected format for tool declarations.

Comment on lines +235 to +245
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The testcontainers dependencies (cassandra and junit-jupiter) are correctly scoped as test. However, the testcontainers dependency itself is not explicitly declared with test scope, which means it will be included in the runtime classpath of the core module. Test-scoped dependencies should not be part of the main application's runtime dependencies.

Comment on lines +794 to +795
httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofInputStream());
int statusCode = response.statusCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The httpClient.send() method is a blocking operation. In a streaming context, blocking the thread while waiting for the entire response to be received can negate the benefits of streaming and lead to performance bottlenecks. For true streaming, HttpClient.sendAsync() should be used, and the HttpResponse.BodyHandlers.ofInputStream() should be processed asynchronously as data arrives.

}

JSONObject corrected = new JSONObject("{" + "\"events\":" + raw + "}");
JSONArray events = corrected.getJSONArray("events");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The line `JSONObject corrected = new JSONObject("{" + ""events":

+ "session_id=EXCLUDED.session_id, author=EXCLUDED.author, actions_state_delta=EXCLUDED.actions_state_delta, "
+ "actions_artifact_delta=EXCLUDED.actions_artifact_delta, actions_requested_auth_configs=EXCLUDED.actions_requested_auth_configs, "
+ "actions_transfer_to_agent=EXCLUDED.actions_transfer_to_agent, content_role=EXCLUDED.content_role, "
+ "timestamp=EXCLUDED.timestamp, invocation_id=EXCLUDED.invocation_id");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The line stmtEvents.setString(4, actions.opt("stateDelta").toString()); uses opt("stateDelta").toString() to get the JSON string for a JSONB column. If stateDelta is a complex JSON object, toString() might not produce a valid JSON string suitable for direct insertion into a JSONB column, or it might produce a string with unwanted escaping. It's safer to explicitly serialize the stateDelta object to a JSON string using objectMapper.writeValueAsString(actions.optJSONObject("stateDelta")) to ensure correct JSON formatting.

for (int i = 0; i < events.length(); i++) {
JSONObject ev = events.getJSONObject(i);
// logger.info("Processing event for insertion: {}", ev.toString());
JSONObject actions = ev.getJSONObject("actions");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The line stmtParts.setString(7, fc != null ? fc.opt("args").toString() : null); uses fc.opt("args").toString() to get the JSON string for a JSONB column. If args is a complex JSON object, toString() might not produce a valid JSON string suitable for direct insertion into a JSONB column, or it might produce a string with unwanted escaping. It's safer to explicitly serialize the args object to a JSON string using objectMapper.writeValueAsString(fc.optJSONObject("args")) to ensure correct JSON formatting.

stmtParts.executeUpdate();
}
}
conn.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The conn.commit() is called inside the loop, meaning a separate transaction is committed for each event. This is inefficient if there are many events to insert for a single session, as each commit incurs overhead. It would be more efficient to commit once after all events for a session have been inserted, or to use batch updates and commit the entire batch.

log.info("Using MapDbSessionService");
return new MapDbSessionService("map.db");
} catch (Exception ex) {
java.util.logging.Logger.getLogger(AdkWebServer.class.getName()).log(Level.SEVERE, null, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The default value for event_data is an empty JSON array []. However, the KafkaEventRepository's parseEventData method expects event_data to contain an events array within a JSON object (e.g., {"events": [...]}). This discrepancy might lead to parsing errors or unexpected behavior if event_data is initialized with this default and later processed. Consider if event_data should default to {}, or if the parsing logic should explicitly handle an empty array as a valid input for an empty list of events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants