feat: Drupal AI Agents compatibility (Fiber streaming + native Bedrock chat)#11
feat: Drupal AI Agents compatibility (Fiber streaming + native Bedrock chat)#11steveworley wants to merge 10 commits into
Conversation
Surfaces a model's `maxOutputTokens` value from the cached models list, or NULL when unknown. Callers can use this to clamp request `maxTokens` to the destination model's actual cap — Bedrock rejects oversize values with a 400 (Nova Lite is 5000 tokens, Claude Sonnet 4.6 is 64K, etc.), so a model-aware clamp avoids opaque upstream failures. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dashboard 4xx/5xx responses carry validation details in the body (e.g. "Model X does not support function calling"). Previously the error path only logged status code + reason phrase, leaving operators to guess at the actual cause. Now we capture the first 2KB of the response body so the failure is immediately diagnosable from watchdog. Applies to both `QuantCloudClient::post()` (around the JSON branch and the chat fallback) and `QuantCloudStreamingClient::chatStreamRaw()` / `chatStream()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The dashboard's chat/stream SSE format is neither standard Bedrock
Converse nor OpenAI — it uses a custom shape:
- Init frame: {requestId, model, streaming: true}
- Text delta: {delta: "...", complete: bool}
- Tool input: {name, toolUseId, input: {...}}
- Summary done: {stopReason, usage, response: {role, content, toolUse}}
The previous iterator yielded raw arrays through an anonymous
IteratorAggregate, bypassing `createStreamedChatMessage()`. As a result
`StreamedChatMessageIterator::reconstructChatOutput()` saw no messages
and produced empty output, breaking the Drupal AI Agents loop entirely
(agents read the assistant reply via the iterator, not the underlying
HTTP body).
This commit replumbs the iterator onto `doIterate()` per the base
class contract, parses each frame variant explicitly, and yields proper
`StreamedChatMessage` objects so reconstruction works. The summary
frame's `response.content` is deliberately skipped (it carries the
full accumulated text already emitted via deltas; re-emitting would
double the assistant message).
Tool calls require an extra hop: the base iterator's
`assembleToolCalls()` calls `$tool->toArray()` on each entry returned
by `getTools()`. Plain arrays don't have a `toArray()` method, so this
commit adds a tiny readonly `StreamedToolCall` value object that
renders to the OpenAI-shape array the assembler expects.
Removes `QuantCloudProviderFailureDetectionTest.php` — it tested
protected helpers (`isEmptyResponseContent`, `isLikelyTokenLimited`,
`getOutputTokenCount`, `getStopReason`) that lived on the old chat()
implementation and no longer exist after the provider refactor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous chat() implementation was buffered-only — every request
to the dashboard's /chat endpoint waited for the complete response
before returning. For long Claude responses (large tool-bearing
Canvas page builds in particular), this regularly tripped the
dashboard's CloudFront origin timeout (~60-180s) and surfaced as a
504 to the caller, even though Bedrock itself was still generating.
This rewrite adds an inline Fiber-aware streaming hook (mirroring the
pattern in Drupal AI's `OpenAiBasedProviderClientBase::chat()`):
- If the caller has set `$this->streamed = TRUE` (e.g. AI Explorer's
"Streamed" checkbox), return the SSE iterator directly so the
consumer can read chunks live.
- Otherwise, if we're inside an active Fiber (Drupal AI Agents'
solving loop is fibered), open the streaming endpoint anyway,
iterate chunks while suspending the Fiber between them so the
cooperator can yield, then reconstruct a buffered-style ChatOutput
for the caller. From the agent's perspective the response is
buffered; on the wire it streams, which keeps the connection alive
and dodges the CloudFront origin timeout.
- Otherwise (plain buffered call), POST to /chat as before and parse
the dashboard's native Bedrock-flavoured response.
Other changes folded in:
- `maxTokens` is clamped to the destination model's `maxOutputTokens`
from ModelsService before sending. Bedrock rejects oversize values
with a 400 (Nova Lite caps at 5000, etc.), so the clamp keeps
callers from running into opaque upstream failures when the AI
Defaults UI lets you configure a higher value than a given model
supports.
- Tool definitions sent in `toolConfig.tools` are now correctly
wrapped in Bedrock's `{toolSpec: {name, description, inputSchema}}`
shape (rather than the OpenAI `{type, function}` shape Drupal AI's
`ToolsInput::renderToolsArray()` returns), since the dashboard's
validator requires `toolSpec`.
- Native parsing of the dashboard's response shape, including the
non-standard sibling `toolUse` array (sibling of `content`, not
nested inside) that the dashboard returns for assistant tool calls.
- `applyMaxTokensClamp()` and `ensureAuthenticated()` are private
helpers used by both the buffered and streaming branches.
Embeddings, text-to-image, and image-to-image continue to use
`QuantCloudClient` directly — those endpoints have non-OpenAI request
and response shapes that aren't worth running through the SDK.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
PR Review: feat/ai-agents-fiber-streaming → fix/release-blockers
PR #11 — "feat: Drupal AI Agents compatibility (Fiber streaming + native Bedrock chat)"
4 commits · 7 files changed · ~909 insertions, ~1,000 deletions
Summary
This PR rewrites the chat pipeline to support Drupal AI Agents' Fiber-based streaming model, replaces the old OpenAI-shaped SSE iterator with a native Bedrock/dashboard-frame iterator, adds upstream error body logging, and introduces a getMaxOutputTokens() clamp. The architecture is sound and the intent is clear. There are no exploitable security vulnerabilities. There are several correctness concerns worth fixing before merge, and one deletion that needs justification.
🔴 Blockers
1. reconstructChatOutput() is referenced in the class docblock but does not exist
src/QuantCloudChatMessageIterator.php:20 states:
// `reconstructChatOutput()` can recover the final `ChatMessage` (text + toolThe method is referenced in the class-level docblock as the mechanism callers use to get a ChatOutput after consuming the stream. However, reconstructChatOutput() does not appear anywhere in the file. The Fiber path in QuantCloudProvider::chat() calls the iterator and then calls $iterator->reconstructChatOutput() to build the final ChatOutput to return.
If this method is inherited from StreamedChatMessageIterator in the base ai module, this is fine — but the docblock implies it is implemented here. Verify the base class provides this method and that it correctly handles the Bedrock-shaped tool calls emitted by this iterator. If the base class reconstructChatOutput() expects OpenAI-shaped tool objects and StreamedToolCall::toArray() returns a different shape, the Fiber path will silently produce empty or malformed tool results. This is a CWE-241 (Improper Handling of Unexpected Data Type) class of bug.
Remediation: Add an integration test or at minimum confirm (with a code citation) that the base class reconstructChatOutput() consumes StreamedToolCall::toArray() correctly. If it doesn't, override it here.
2. Deleted test file removes the only coverage for failure detection — no replacement
tests/src/Unit/QuantCloudProviderFailureDetectionTest.php is deleted entirely (285 lines, covering max_tokens, length, end_turn, unknown, and NULL stop-reason scenarios). No replacement test file is added anywhere in the PR.
The PR simultaneously rewrites parseChatResponse() and the entire streaming path. Deleting the only unit tests for failure/stop-reason detection while rewriting the code under test leaves the module with zero automated coverage for these paths.
Remediation: Either restore the test file (adapted for the new response shape) or add equivalent coverage for parseChatResponse(), doIterate() stop-reason handling, and the applyMaxTokensClamp() clamp logic before merge. The getMaxOutputTokens() helper in ModelsService also has no tests.
⚠️ Warnings
3. applyUsage() called twice on the same message in the summary frame
src/QuantCloudChatMessageIterator.php — In the summary frame's tool-use loop, applyUsage() is called once per tool yielded on freshly created messages — that's fine. But in the delta path the pattern createStreamedChatMessage → applyUsage → yield is duplicated in two branches with the same $message variable in scope. If the base class applyUsage() accumulates rather than sets token counts, this will double-count tokens on messages that hit both branches. Verify the base class implementation is idempotent or restructure to call applyUsage() exactly once per message.
4. QuantCloudClient logs the upstream error body unconditionally — potential PII/credential leak in logs
src/Client/QuantCloudClient.php (error handler, new lines):
$body = '';
if ($e instanceof RequestException && $e->getResponse()) {
$status = $e->getResponse()->getStatusCode();
$reason = $e->getResponse()->getReasonPhrase();
$body = $e->getResponse()->getBody()->getContents();
}
$this->logger->error('...@body', ['@body' => $body]);The upstream error body is now logged regardless of the advanced.enable_logging config flag. The success path correctly gates logging behind $config->get('advanced.enable_logging'). Error bodies from Bedrock/dashboard can contain request context (prompts, tool inputs) that may include PII or PROTECTED-classification content. Logging them unconditionally to Drupal's watchdog violates APP-11 (security of personal information) and ISM-0988 (log content controls).
Remediation: Either gate the body logging behind the same enable_logging flag, or truncate/redact the body before logging (e.g. first 200 chars, strip any content fields).
5. convertMessage() silently drops images when a tool result is present
src/Plugin/AiProvider/QuantCloudProvider.php ~line 415:
if ($message->getToolsId()) {
return [
'role' => 'user',
'content' => $text,
'toolUseId' => $message->getToolsId(),
];
}The early return means a message that is both a tool result and carries images (valid in multi-modal agentic flows) will silently drop the images. This is unlikely in practice today but will produce confusing silent failures if Drupal AI Agents ever sends vision content alongside a tool result. At minimum, add a // Note: images are intentionally ignored for tool-result messages comment.
6. getMaxOutputTokens() hardcodes a static model→token map with no staleness mechanism
src/Service/ModelsService.php ~line 145 — the map will silently clamp requests for models added after this commit (returning NULL = no clamp, which is safe) and will silently over-clamp if Anthropic raises limits on existing model versions. Add a @todo comment noting the map needs updating when new models are onboarded, and consider whether the dashboard API exposes model caps that could be fetched dynamically.
7. QuantCloudStreamingClient missing declare(strict_types=1)
src/Client/QuantCloudStreamingClient.php:1 — the file header lacks declare(strict_types=1) while all other files touched in this PR have it added. Minor consistency issue.
Nits (non-blocking)
src/QuantCloudChatMessageIterator.php:71—$maxWarnings = 3duplicatesMAX_SSE_DECODE_WARNINGS(the class constant defined inQuantCloudStreamingClient). Use the constant or move it to this class.src/StreamedToolCall.php— the class docblock referencesStreamedChatMessageIterator::assembleToolCalls()as the consumer; if that method is in the upstreamaimodule, pin the minimum module version inai_provider_quant_cloud.info.yml.src/Plugin/AiProvider/QuantCloudProvider.php~line 158 — the comment// We do not advertise ChatFiberSupportcould briefly explain why (wire format mismatch).src/QuantCloudChatMessageIterator.php—readLine()confirm it handles\r\nline endings (SSE spec allows both\nand\r\n).
Verdict
| # | Severity | Must fix before prod? |
|---|---|---|
| 1 | Blocker | Yes — verify reconstructChatOutput() contract with StreamedToolCall |
| 2 | Blocker | Yes — restore/replace deleted test coverage |
| 3 | Warning | Yes — double applyUsage() call risks token count corruption |
| 4 | Warning | Yes — unconditional error body logging risks PII in watchdog |
| 5 | Warning | No — add comment, low practical risk today |
| 6 | Warning | No — add @todo, safe fallback exists |
| 7 | Warning | No — consistency fix |
Items 1 and 2 are classified Blockers because item 1 has a concrete silent-failure path (tool calls silently malformed if the base class contract isn't met) and item 2 removes the only automated safety net for a rewritten critical path. Both are straightforward to resolve. The rest of the PR — the Fiber-aware streaming design, the SSE frame parser, the applyMaxTokensClamp guard, and the error body logging improvement — are well-structured and a clear improvement over the previous implementation.
Automated review by Quant Code
Upstream error responses can echo back prompt/tool inputs that may include PROTECTED data in government deployments. Always log status and reason, but only include the response body when advanced.enable_logging is TRUE. When disabled, emit a redacted placeholder pointing operators at the flag. Also truncate body to 500 chars (down from 2000). Touches: - QuantCloudClient::post() and ::get() - QuantCloudStreamingClient::chatStreamRaw() and ::chatStream()
- Add declare(strict_types=1) to QuantCloudStreamingClient. - Document that convertMessage() intentionally drops images on tool-result branches; revisit if AI Agents emits combined shapes. - Add @todo against ModelsService::getFallbackModels() noting that the hardcoded Bedrock caps go stale and should ideally be fetched from a dedicated dashboard endpoint. - Explain in QuantCloudProvider::getSupportedCapabilities() why ChatFiberSupport is intentionally omitted (Bedrock-shape wire format vs upstream's OpenAI-shape Fiber pump). - Promote QuantCloudStreamingClient::MAX_SSE_DECODE_WARNINGS to public and reuse it from QuantCloudChatMessageIterator (was duplicating the literal 3). - Document setter (not accumulator) semantics of the upstream StreamedChatMessage token-usage methods on applyUsage(). - Tighten readLine() to honour SSE-spec line endings: also break on bare \r and strip a trailing \r after \n. - Strengthen StreamedToolCall class docblock with explicit file+line citations of the assembleToolCalls() contract it satisfies.
Replaces the deleted QuantCloudProviderFailureDetectionTest with
coverage targeted at the new SSE iterator code paths and the OpenAI-
shape tool-call value object.
QuantCloudChatMessageIteratorTest covers, against a fake PSR-7 stream:
- init frame emits nothing,
- text deltas emit assistant messages,
- inline {toolUseId, name, input} frame yields a single tool call,
- response.toolUse[] on the summary frame yields one message per entry,
- summary frame populates the iterator's finish reason,
- malformed `data:` lines are skipped with at most 3 warnings logged,
- unknown event shapes yield nothing,
- response.content on the summary frame is NOT re-emitted as a delta
(guards against the doubling regression we hit during the refactor).
StreamedToolCallTest covers the OpenAI shape contract verified against
StreamedChatMessageIterator::assembleToolCalls() in Item #1:
- toArray() returns {id, type:'function', function:{name, arguments}},
- empty {} arguments string survives the value object verbatim,
- special characters round-trip through JSON encode/decode unchanged.
Item #1 verification turned up nothing to fix; the iterator's expected
keys (id, function.name, function.arguments) are exactly what
StreamedToolCall::toArray() produces. See updated class-level docblock
in the previous commit for the file+line citation.
Keys live under $this->state['emitted_tool_ids'] so the per-stream state is grouped on a single property, matching the verification hook expected by operators.
Previously assistant messages with prior tool calls were serialised
with a flat top-level `toolUse` sibling of `content`, and tool-result
messages with a flat top-level `toolUseId` sibling of `content`. Both
are non-standard shapes that the dashboard's upstream Bedrock loop
does not round-trip reliably — it loses the assistant→tool pairing,
so the model thinks its previous tool call never completed and retries
the same side-effect. The agent UI surfaces this as
"It seems the metadata was already added in a previous step"
"It looks like there may be some duplicate responses occurring"
"saved twice"
…and spirals through the same tool call multiple times before
eventually giving up.
Switch both message types to standard Bedrock Converse shape:
- Assistant with tools → `content: [{text: ...}, {toolUse: {...}}, …]`
- Tool result → `content: [{toolResult: {toolUseId, content: [{text}]}}]`
This is the format AWS documents and what the dashboard's translator
expects for native Bedrock-format requests.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Brings the provider into line with what Drupal AI Agents loops (e.g. Drupal Canvas's page builder) actually need: Fiber-aware streaming so long Claude responses don't trip the dashboard's CloudFront origin timeout, plus native Bedrock-flavoured request/response handling (no OpenAI SDK translation in the middle).
chat()now detects an active Fiber and silently swaps to the streaming endpoint, suspending between SSE chunks and reconstructing a buffered-styleChatOutputso the caller sees one complete response. Mirrors the pattern inai_provider_amazeeio/OpenAiBasedProviderClientBase::chat().maxTokensis clamped to the destination model'smaxOutputTokensfromModelsServicebefore sending. Bedrock rejects oversize values with a 400 (Nova Lite caps at 5000 etc.), and the AI Defaults UI happily lets operators configure higher numbers — the clamp prevents opaque upstream failures.{toolSpec: {name, description, inputSchema}}shape (rather than the OpenAI{type, function}shapeToolsInput::renderToolsArray()produces), since the dashboard validator requirestoolSpec.doIterate()contract and yields properStreamedChatMessageobjects, soreconstructChatOutput()actually sees the messages — the previous anonymousIteratorAggregatebypassed that and produced empty output, breaking the agent loop.StreamedToolCallvalue object wraps tool-call entries so the base iterator'sassembleToolCalls()can call$tool->toArray()(its contract); plain arrays don't satisfy that.status N.Removes
tests/src/Unit/QuantCloudProviderFailureDetectionTest.php— it tested protected helpers on the oldchat()(isEmptyResponseContent,isLikelyTokenLimited,getOutputTokenCount,getStopReason) that no longer exist after this refactor.Net diff: +909, −1,451.
Test plan
drush php:evalwith a simpleChatInput— confirm text response$provider->streamedOutput(TRUE), iterate the returnedChatOutput— confirm SSE chunks flowchat()insidenew \Fiber(...), resume until terminated — confirm reconstructedChatMessagehas correct text + tool callsChatInputwith tools defined, receivetoolUseback, reply withrole: toolhistory, confirm Claude consumes itQuantCloudClient(unchanged code path)Notes for reviewers
QuantCloudChatMessageIteratordeliberately skipsresponse.contenton the summary frame — it carries the full assembled assistant text already emitted via earlierdeltaframes; re-emitting would double the message.applyMaxTokensClamp()is a no-op when the model's cap is unknown (returns the request value untouched) so newly-added Bedrock models won't be silently capped at zero.toolUseas a sibling ofcontent(not nested inside the content array). The parser handles both legitimately, but the sibling layout is what production currently emits.🤖 Generated with Claude Code