feat(migration): migrate lb4-llm-chat-extension from LangGraph to Mastra #21
feat(migration): migrate lb4-llm-chat-extension from LangGraph to Mastra #21piyushsinghgaur1 wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR migrates the chat generation pipeline from the LangGraph-based ChatGraph execution model to a Mastra workflow-based model, introducing a new ChatWorkflow pipeline and a LoopBack↔Mastra bridge to stream events back to the existing transports.
Changes:
- Replaced
ChatGraphusage inGenerationServicewith a new request-scopedWorkflowRunnerthat executes Mastra workflows and yieldsLLMStreamEvents. - Added a Mastra
ChatWorkflow(init → prepare context → file processing → agent reasoning → persistence → end session) plus bridging utilities (async event queue, token accumulator, context trimming). - Added Mastra LLM binding keys and updated dependencies to include
@mastra/core, with associated test refactors.
Reviewed changes
Copilot reviewed 23 out of 24 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/services/generation.service.ts | Switches generation execution from ChatGraph to WorkflowRunner async event streaming. |
| src/mastra/workflows/chat/steps/prepare-context.step.ts | Loads/paginates message history and trims it for Mastra agent context. |
| src/mastra/workflows/chat/steps/persist-conversation.step.ts | Persists final assistant output and tool call results back to the DB. |
| src/mastra/workflows/chat/steps/init-session.step.ts | Initializes/resumes session, emits Init event, persists user message. |
| src/mastra/workflows/chat/steps/file-processing.step.ts | Summarizes uploaded files and merges summaries into the user prompt/context. |
| src/mastra/workflows/chat/steps/end-session.step.ts | Updates token counts and emits the TokenCount event. |
| src/mastra/workflows/chat/steps/agent-reasoning.step.ts | Streams Mastra agent execution, emits tool/error events and final message event. |
| src/mastra/workflows/chat/chat.workflow.ts | Defines the Mastra workflow pipeline and documents expected RequestContext keys. |
| src/mastra/workflows/chat/chat-workflow-schemas.ts | Adds Zod schemas for workflow input/output and step boundaries. |
| src/mastra/types.ts | Adds Mastra-specific types (workflow request context, tool interface, outputs). |
| src/mastra/index.ts | Barrel export for Mastra migration layer components/types/workflow/agent. |
| src/mastra/bridge/workflow-runner.ts | Implements the LoopBack↔Mastra bridge and merges workflow + queue event streams. |
| src/mastra/bridge/workflow-request-context.ts | Adds typed RequestContext wrapper for workflow steps/agent. |
| src/mastra/bridge/token-usage-accumulator.ts | New per-request token usage accumulator for Mastra step-finish usage events. |
| src/mastra/bridge/context-window-manager.ts | New context trimming utility intended to match prior LangGraph trimming behavior. |
| src/mastra/bridge/async-event-queue.ts | Async queue for agent-callback events that can’t use Mastra step writer. |
| src/mastra/agents/chat-reasoning.agent.ts | Defines Mastra agent and adapts existing IGraphTool tools into Mastra tools. |
| src/keys.ts | Adds new DI binding keys for Mastra chat/file LLMs. |
| src/graphs/event.types.ts | Extends LLMStreamEvent union with an explicit Error event type. |
| src/graphs/chat/chat.store.ts | Adds Mastra-compatible message helpers and CoreMessage conversion. |
| src/component.ts | Registers WorkflowRunner into the component service list. |
| src/tests/integration/generation.service.integration.ts | Refactors integration tests to stub WorkflowRunner async generator behavior. |
| package.json | Adds @mastra/core dependency (and PR version appears aligned to 3.0.0). |
| package-lock.json | Lockfile updates for Mastra dependency tree and version bump. |
Comments suppressed due to low confidence (2)
src/mastra/workflows/chat/steps/prepare-context.step.ts:60
- This reads
aiConfig.maxTokens, but the existing config usesmaxTokenCount. If you keepmaxTokens, it should be added to the config type and documented; otherwise switch this tomaxTokenCountto preserve previous behavior.
const maxTokens =
(aiConfig as {maxTokens?: number} | undefined)?.maxTokens ??
ContextWindowManager.DEFAULT_MAX_TOKENS;
src/mastra/workflows/chat/steps/agent-reasoning.step.ts:65
maxStepsis pulled fromaiConfig.maxSteps, butAIIntegrationConfigdoesn’t currently define it. Consider extending the config type (and binding docs) or using an existing config field to avoid silently ignoring configuration.
{
maxSteps: (aiConfig as {maxSteps?: number} | undefined)?.maxSteps ?? 20,
abortSignal,
requestContext: ctx,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| while (!wsDone || !qDone) { | ||
| if (abortController.signal.aborted) { | ||
| debug('WorkflowRunner: abort signal received, stopping merge'); | ||
| break; | ||
| } | ||
|
|
||
| // Build the list of active (not yet done) promises | ||
| const active: Promise<SlotResult>[] = []; | ||
| if (!wsDone) active.push(wsPromise); | ||
| if (!qDone) active.push(qPromise); | ||
|
|
||
| if (!active.length) break; | ||
|
|
||
| const result = await Promise.race(active); | ||
|
|
||
| if (result.source === 'ws') { | ||
| if (result.done) { | ||
| wsDone = true; | ||
| debug('WorkflowRunner: workflow stream exhausted'); | ||
| } else { | ||
| // Extract LLMStreamEvent from workflow-step-output chunks | ||
| const chunk = result.value as { | ||
| type?: string; | ||
| payload?: {output?: unknown}; | ||
| }; | ||
| if (chunk?.type === 'workflow-step-output') { | ||
| const output = chunk.payload?.output; | ||
| if (isLLMStreamEvent(output)) { | ||
| if (output.type !== LLMStreamEventType.Log) { | ||
| yield output; | ||
| } else { | ||
| debug( | ||
| 'WorkflowRunner: Log event (not forwarded):', | ||
| output.data, | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| // Schedule the next read from the workflow stream | ||
| wsPromise = wsIter | ||
| .next() | ||
| .then(r => ({done: r.done, value: r.value, source: 'ws' as const})); | ||
| } | ||
| } else { | ||
| // source === 'queue' | ||
| if (result.done) { | ||
| qDone = true; | ||
| debug('WorkflowRunner: AsyncEventQueue exhausted'); | ||
| } else { | ||
| const event = result.value as LLMStreamEvent; | ||
| if (event.type !== LLMStreamEventType.Log) { | ||
| yield event; | ||
| } else { | ||
| debug( | ||
| 'WorkflowRunner: Log event from queue (not forwarded):', | ||
| event.data, | ||
| ); | ||
| } | ||
| // Schedule the next read from the queue | ||
| qPromise = qIter.next().then(r => ({ | ||
| done: r.done, | ||
| value: r.value, | ||
| source: 'queue' as const, | ||
| })); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| debug('WorkflowRunner: merge complete'); |
| * Using `RequestContext<WorkflowRequestContext>` enables fully typed `.get()` and `.set()` | ||
| * calls throughout all workflow steps and the ChatReasoningAgent — zero `any` casts needed. | ||
| * | ||
| * All keys follow the snake_case convention matching the RequestContext.set() calls in |
| /** AbortSignal propagated from the HTTP request's abort controller */ | ||
| abortSignal: AbortSignal; | ||
| /** Authenticated user resolved from LoopBack auth middleware */ | ||
| currentUser: IAuthUserWithPermissions | undefined; | ||
| } |
| "@loopback/context": "^8.0.11", | ||
| "@loopback/repository": "^8.0.11", | ||
| "@loopback/sequelize": "^0.8.8", | ||
| "@mastra/core": "^1.32.1", |
| @inject.getter(AuthenticationBindings.CURRENT_USER) | ||
| private readonly getCurrentUser: Getter<IAuthUserWithPermissions>, | ||
| @repository(ChatRepository) | ||
| private readonly chatRepository: ChatRepository, | ||
| ) {} |
| execute: async ({inputData, requestContext}) => { | ||
| const ctx = asWorkflowContext(requestContext); | ||
| const chatStore = ctx.get('chatStore'); | ||
| const aiConfig = ctx.get('aiConfig') as {maxTokens?: number} | undefined; |
| const aiConfig = ctx.get('aiConfig') as | ||
| | {maxSteps?: number; modelName?: string} | ||
| | undefined; |
| * All SSE events are routed through the AsyncEventQueue stored in RequestContext. | ||
| * Token usage is accumulated in TokenUsageAccumulator stored in RequestContext. | ||
| * | ||
| * The workflow does NOT directly interact with the SSE transport — that is the |
| const toolCalls = message.messages | ||
| ?.filter( | ||
| (v): v is Message & {metadata: ToolMessageMetadata} => | ||
| v.metadata.type === MessageMetadataType.Tool, |
| static trim( | ||
| messages: CoreMessageLike[], | ||
| maxTokens: number = DEFAULT_MAX_TOKEN_COUNT, | ||
| ): CoreMessageLike[] { | ||
| const totalTokens = messages.reduce( | ||
| (sum, m) => sum + ContextWindowManager._countMessageTokens(m), |
rohit-sourcefuse
left a comment
There was a problem hiding this comment.
Review: feat(migration): LangGraph → Mastra Phase 1
Good direction overall. The typed WorkflowRequestContext bridge, function-based model injection on the Agent, and ContextWindowManager are all well-designed. However there are several blockers that prevent this from functioning at runtime, plus two correctness bugs in the concurrency layer.
BLOCKERS — must fix before merge
1. run.stream() not awaited (workflow-runner.ts:111)
run.stream() returns Promise<{stream: AsyncIterable<...>}>. The result is not awaited and stream is not destructured — workflowStream holds a Promise, not an AsyncIterable. workflowStream[Symbol.asyncIterator]() throws TypeError on every chat request. TypeScript misses this because @mastra/core is not installed (return type is any).
// Fix:
const run = chatWorkflow.createRun();
const {stream: workflowStream} = await run.stream({
inputData: {prompt, files, sessionId},
requestContext,
});
yield* this._mergeStreams(workflowStream, eventQueue, abortController);2. AsyncEventQueue single-slot resolver race condition (async-event-queue.ts:17)
_resolve is a single slot. If two pushes happen before the consumer resumes (two simultaneous tool calls), the second push() overwrites _resolve = null and drops the first wakeup. The while(true) loop in the iterator stalls permanently — the SSE stream hangs silently.
// Fix: replace _resolve: (() => void) | null with an array
private readonly _resolvers: Array<() => void> = [];
push(event: LLMStreamEvent): void {
if (this._closed) return;
this._queue.push(event);
this._resolvers.splice(0).forEach(r => r());
}
close(): void {
this._closed = true;
this._resolvers.splice(0).forEach(r => r());
}
// In iterator:
await new Promise<void>(resolve => { this._resolvers.push(resolve); });3. @mastra/core not installed
node_modules/@mastra does not exist. Every import from @mastra/core/workflows, @mastra/core/agent, @mastra/core/request-context fails with Cannot find module. Both CI checks fail for this reason. Run npm install, confirm subpath exports resolve, commit the updated lock-file.
4. @langchain/openrouter does not exist (openrouter.provider.ts:2)
@langchain/openrouter is not a published npm package. This new sub-module does not compile. Either remove it from this PR or implement via @ai-sdk/openai with baseURL: 'https://openrouter.ai/api/v1'.
5. @langchain/classic/vectorstores/memory does not exist (inmemory.vector.ts:2)
No such package. Should be @langchain/community/vectorstores/memory.
6. LLMStreamErrorEvent.data.message breaks the SSE contract (event.types.ts:65)
The new type uses { data: { message: string } }. The TDD spec (§11.4) and existing client expectations use { data: { error: string } }. Any client checking event.data.error receives undefined silently. Change the field to error: string and update the emit site in agent-reasoning.step.ts.
IMPORTANT — address before Phase 2
7. _mergeStreams leaks unhandled promise rejections (workflow-runner.ts:131)
When Promise.race() settles on one source, the other pending .next() promise is abandoned. If it later rejects, Node.js emits UnhandledPromiseRejection. Add a try/finally that calls wsIter.return?.() and qIter.return?.() to properly close both iterators on exit.
8. Log events silently dropped (workflow-runner.ts:180, 202)
LLMStreamEventType.Log events are filtered out from both sources. The existing LangGraph system forwards Log events — they drive "Reading file: ..." and "Context compressed: ..." UI messages. Dropping them silently is a backward-compat regression. Remove the filter or document it explicitly as a breaking change.
9. Unused @repository(ChatRepository) injection (workflow-runner.ts:73)
Injected but never used anywhere in the class body. Remove it.
10. Dead LangGraph services still registered (component.ts:87-103)
CallLLMNode, RunToolNode, InitSessionNode, SummariseFileNode, ContextCompressionNode, EndSessionNode, ChatGraph are all still in this.services. Since GenerationService now uses WorkflowRunner exclusively, these nodes are instantiated per request but never called. Remove them or gate with a rollback feature flag.
11. Silent tool message loss (persist-conversation.step.ts:52-70)
When addAIMessageText() returns undefined, the if (aiMessage) guard silently skips persisting all tool call records. Throw explicitly instead:
if (!aiMessage) {
throw new Error(
`PersistConversationStep: failed to persist AI message for session ${sessionId}`,
);
}12. AiIntegrationBindings.Checkpointer not marked deprecated (keys.ts:37)
Per TDD §26.8 this key has no Mastra equivalent and should be @deprecated now.
Test coverage
The only new test file stubs WorkflowRunner and verifies call counts — it provides no regression protection for the new Mastra layer. Zero tests for: AsyncEventQueue (push/close/concurrency), ContextWindowManager (system-message-preserved, array-typed content, boundary conditions), TokenUsageAccumulator, WorkflowRunner._mergeStreams (abort, one-source-exhausted), all 6 workflow steps, and ChatStore.toCoreMessage() (especially the tool role branch that constructs the tool-result content array passed directly to the LLM API).
AsyncEventQueue and ContextWindowManager are dependency-free pure classes — adding tests for them would have caught the resolver race before review.
Must fix before merge: items 1–6
Fix before Phase 2: items 7–12 + unit tests for pure utility classes
rohit-sourcefuse
left a comment
There was a problem hiding this comment.
Inline comments on specific issues. See the full request-changes review above for the complete list.
| // The iterator yields WorkflowStreamEvent — steps emit via writer.write() which | ||
| // surfaces as {type: 'workflow-step-output', payload: {output: <our event>}}. | ||
| const workflowStream = run.stream({ | ||
| inputData: {prompt, files, sessionId}, |
There was a problem hiding this comment.
[BLOCKER] run.stream() is not awaited — runtime crash on every request
Per @mastra/core ^1.32.1, run.stream() returns Promise<{stream: AsyncIterable<WorkflowStreamEvent>}>. The result must be awaited and the stream property destructured. As written workflowStream holds a Promise — workflowStream[Symbol.asyncIterator]() throws TypeError on every request.
const run = chatWorkflow.createRun();
const {stream: workflowStream} = await run.stream({
inputData: {prompt, files, sessionId},
requestContext,
});
yield* this._mergeStreams(workflowStream, eventQueue, abortController);| private readonly getCurrentUser: Getter<IAuthUserWithPermissions>, | ||
| @repository(ChatRepository) | ||
| private readonly chatRepository: ChatRepository, | ||
| ) {} |
There was a problem hiding this comment.
Unused injection — remove chatRepository
@repository(ChatRepository) is injected here but never referenced in the class body. Remove it to eliminate the dead DI dependency.
| const output = chunk.payload?.output; | ||
| if (isLLMStreamEvent(output)) { | ||
| if (output.type !== LLMStreamEventType.Log) { | ||
| yield output; |
There was a problem hiding this comment.
Log events silently dropped — backward-compat regression
The LangGraph system forwarded Log events to clients, driving 'Reading file: ...' and 'Context compressed: ...' UI messages. Filtering them here without a changelog entry silently breaks that behavior. Remove the filter or document it explicitly as a breaking change.
| export class AsyncEventQueue implements AsyncIterable<LLMStreamEvent> { | ||
| private readonly _queue: LLMStreamEvent[] = []; | ||
| private _resolve: (() => void) | null = null; | ||
| private _closed = false; |
There was a problem hiding this comment.
[BLOCKER] Single-slot resolver — SSE stream stalls on concurrent tool calls
_resolve is a single slot. If two push() calls happen before the consumer resumes (parallel tool calls), the second call drops the first wakeup and the while(true) loop stalls permanently — SSE stream hangs silently with no error.
Replace with an array:
private readonly _resolvers: Array<() => void> = [];
push(event: LLMStreamEvent): void {
if (this._closed) return;
this._queue.push(event);
this._resolvers.splice(0).forEach(r => r());
}
close(): void {
this._closed = true;
this._resolvers.splice(0).forEach(r => r());
}
// In iterator:
await new Promise<void>(resolve => { this._resolvers.push(resolve); });|
|
||
| export type LLMStreamErrorEvent = { | ||
| type: LLMStreamEventType.Error; | ||
| data: { |
There was a problem hiding this comment.
[BLOCKER] Error event field name breaks SSE contract
The field is message: string here, but the TDD spec (§11.4) and existing client expectations use error: string. Any client checking event.data.error receives undefined silently.
export type LLMStreamErrorEvent = {
type: LLMStreamEventType.Error;
data: { error: string }; // not 'message'
};Update the emit site in agent-reasoning.step.ts to match: data: {error: errMsg}.
| const aiMessage = await chatStore.addAIMessageText(sessionId, finalText); | ||
|
|
||
| if (!aiMessage) { | ||
| debug('PersistConversation: addAIMessageText returned undefined'); |
There was a problem hiding this comment.
Silent tool message loss when addAIMessageText returns undefined
If the AI message persistence fails, the if (aiMessage) guard on line 70 silently skips all tool call records for the turn. Data is lost with only a debug log.
if (!aiMessage) {
throw new Error(
`PersistConversationStep: failed to persist AI message for session ${sessionId}`,
);
}| ChatStore, | ||
| // mastra migration | ||
| WorkflowRunner, | ||
| // graph |
There was a problem hiding this comment.
Dead LangGraph services still registered — remove or gate with a rollback flag
Since GenerationService now delegates entirely to WorkflowRunner, all six LangGraph nodes and ChatGraph in this services array are instantiated per request by LB4 DI but never called. Remove them, or keep them behind a WORKFLOW_ENGINE=langgraph feature flag for rollback safety (TDD §22.5).
| ); | ||
|
|
||
| // ── Mastra LLM bindings (Phase 1 migration) ────────────────────────────── | ||
| /** |
There was a problem hiding this comment.
Mark Checkpointer as @deprecated now (TDD §26.8)
This binding has no Mastra equivalent and will be removed in Phase 5. Consumers need advance notice before @langchain/langgraph is dropped.
/**
* @deprecated No Mastra equivalent. Never wired in graph.compile({}).
* Will be removed in Phase 5 when @langchain/langgraph is dropped.
*/
export const Checkpointer = ...
SonarQube reviewer guideSummary: Add Mastra framework integration to replace LangGraph with Phase 1-3 workflows (Chat, DBQuery, Visualization) and migrate GenerationService to use WorkflowRunner instead of ChatGraph. Review Focus:
Start review at:
|




This pull request introduces a significant refactor to support the Mastra workflow engine, improve compatibility with Mastra's language model interface, and enhance the testability and maintainability of the codebase. The changes include dependency updates, new service bindings, expanded test coverage, and new utility methods for chat message handling. The most important changes are grouped below:
Mastra Integration and Dependency Updates
@mastra/coreas a dependency inpackage.jsonto enable Mastra workflow and LLM support.WorkflowRunneras a provider in the component and replaced usage ofChatGraphwithWorkflowRunnerin integration tests and component registration. [1] [2] [3] [4] [5]Service Bindings for Mastra LLMs
MastraChatLLMandMastraFileLLMinkeys.tsfor plugging in Mastra-compatible language models, with detailed documentation for application integration. [1] [2]ChatStore Enhancements for Mastra Compatibility
ChatStorefor adding human, AI, and tool messages using plain strings, and for retrieving messages and converting them to a Mastra-compatible format (CoreMessageLike). These methods avoid LangChain types and facilitate seamless workflow migration. [1] [2] [3]Test Refactoring for WorkflowRunner
GenerationServiceto useWorkflowRunnerand async generators, improving test reliability and aligning with the new Mastra workflow execution model. Utility functions for empty and error event streams were added for robust testing. [1] [2] [3]Event Type Extensions
LLMStreamEventinevent.types.tsto include a newLLMStreamErrorEventtype, improving error handling and event stream expressiveness.