Skip to content

feat(migration): migrate lb4-llm-chat-extension from LangGraph to Mastra #21

Draft
piyushsinghgaur1 wants to merge 4 commits into
mainfrom
feat/mastra-migration
Draft

feat(migration): migrate lb4-llm-chat-extension from LangGraph to Mastra #21
piyushsinghgaur1 wants to merge 4 commits into
mainfrom
feat/mastra-migration

Conversation

@piyushsinghgaur1
Copy link
Copy Markdown
Contributor

This pull request introduces a significant refactor to support the Mastra workflow engine, improve compatibility with Mastra's language model interface, and enhance the testability and maintainability of the codebase. The changes include dependency updates, new service bindings, expanded test coverage, and new utility methods for chat message handling. The most important changes are grouped below:

Mastra Integration and Dependency Updates

  • Added @mastra/core as a dependency in package.json to enable Mastra workflow and LLM support.
  • Registered WorkflowRunner as a provider in the component and replaced usage of ChatGraph with WorkflowRunner in integration tests and component registration. [1] [2] [3] [4] [5]

Service Bindings for Mastra LLMs

  • Introduced new binding keys MastraChatLLM and MastraFileLLM in keys.ts for plugging in Mastra-compatible language models, with detailed documentation for application integration. [1] [2]

ChatStore Enhancements for Mastra Compatibility

  • Added Mastra-compatible methods to ChatStore for adding human, AI, and tool messages using plain strings, and for retrieving messages and converting them to a Mastra-compatible format (CoreMessageLike). These methods avoid LangChain types and facilitate seamless workflow migration. [1] [2] [3]

Test Refactoring for WorkflowRunner

  • Refactored integration tests for GenerationService to use WorkflowRunner and async generators, improving test reliability and aligning with the new Mastra workflow execution model. Utility functions for empty and error event streams were added for robust testing. [1] [2] [3]

Event Type Extensions

  • Extended LLMStreamEvent in event.types.ts to include a new LLMStreamErrorEvent type, improving error handling and event stream expressiveness.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR migrates the chat generation pipeline from the LangGraph-based ChatGraph execution model to a Mastra workflow-based model, introducing a new ChatWorkflow pipeline and a LoopBack↔Mastra bridge to stream events back to the existing transports.

Changes:

  • Replaced ChatGraph usage in GenerationService with a new request-scoped WorkflowRunner that executes Mastra workflows and yields LLMStreamEvents.
  • Added a Mastra ChatWorkflow (init → prepare context → file processing → agent reasoning → persistence → end session) plus bridging utilities (async event queue, token accumulator, context trimming).
  • Added Mastra LLM binding keys and updated dependencies to include @mastra/core, with associated test refactors.

Reviewed changes

Copilot reviewed 23 out of 24 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/services/generation.service.ts Switches generation execution from ChatGraph to WorkflowRunner async event streaming.
src/mastra/workflows/chat/steps/prepare-context.step.ts Loads/paginates message history and trims it for Mastra agent context.
src/mastra/workflows/chat/steps/persist-conversation.step.ts Persists final assistant output and tool call results back to the DB.
src/mastra/workflows/chat/steps/init-session.step.ts Initializes/resumes session, emits Init event, persists user message.
src/mastra/workflows/chat/steps/file-processing.step.ts Summarizes uploaded files and merges summaries into the user prompt/context.
src/mastra/workflows/chat/steps/end-session.step.ts Updates token counts and emits the TokenCount event.
src/mastra/workflows/chat/steps/agent-reasoning.step.ts Streams Mastra agent execution, emits tool/error events and final message event.
src/mastra/workflows/chat/chat.workflow.ts Defines the Mastra workflow pipeline and documents expected RequestContext keys.
src/mastra/workflows/chat/chat-workflow-schemas.ts Adds Zod schemas for workflow input/output and step boundaries.
src/mastra/types.ts Adds Mastra-specific types (workflow request context, tool interface, outputs).
src/mastra/index.ts Barrel export for Mastra migration layer components/types/workflow/agent.
src/mastra/bridge/workflow-runner.ts Implements the LoopBack↔Mastra bridge and merges workflow + queue event streams.
src/mastra/bridge/workflow-request-context.ts Adds typed RequestContext wrapper for workflow steps/agent.
src/mastra/bridge/token-usage-accumulator.ts New per-request token usage accumulator for Mastra step-finish usage events.
src/mastra/bridge/context-window-manager.ts New context trimming utility intended to match prior LangGraph trimming behavior.
src/mastra/bridge/async-event-queue.ts Async queue for agent-callback events that can’t use Mastra step writer.
src/mastra/agents/chat-reasoning.agent.ts Defines Mastra agent and adapts existing IGraphTool tools into Mastra tools.
src/keys.ts Adds new DI binding keys for Mastra chat/file LLMs.
src/graphs/event.types.ts Extends LLMStreamEvent union with an explicit Error event type.
src/graphs/chat/chat.store.ts Adds Mastra-compatible message helpers and CoreMessage conversion.
src/component.ts Registers WorkflowRunner into the component service list.
src/tests/integration/generation.service.integration.ts Refactors integration tests to stub WorkflowRunner async generator behavior.
package.json Adds @mastra/core dependency (and PR version appears aligned to 3.0.0).
package-lock.json Lockfile updates for Mastra dependency tree and version bump.
Comments suppressed due to low confidence (2)

src/mastra/workflows/chat/steps/prepare-context.step.ts:60

  • This reads aiConfig.maxTokens, but the existing config uses maxTokenCount. If you keep maxTokens, it should be added to the config type and documented; otherwise switch this to maxTokenCount to preserve previous behavior.
    const maxTokens =
      (aiConfig as {maxTokens?: number} | undefined)?.maxTokens ??
      ContextWindowManager.DEFAULT_MAX_TOKENS;

src/mastra/workflows/chat/steps/agent-reasoning.step.ts:65

  • maxSteps is pulled from aiConfig.maxSteps, but AIIntegrationConfig doesn’t currently define it. Consider extending the config type (and binding docs) or using an existing config field to avoid silently ignoring configuration.
      {
        maxSteps: (aiConfig as {maxSteps?: number} | undefined)?.maxSteps ?? 20,
        abortSignal,
        requestContext: ctx,

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +152 to +220
while (!wsDone || !qDone) {
if (abortController.signal.aborted) {
debug('WorkflowRunner: abort signal received, stopping merge');
break;
}

// Build the list of active (not yet done) promises
const active: Promise<SlotResult>[] = [];
if (!wsDone) active.push(wsPromise);
if (!qDone) active.push(qPromise);

if (!active.length) break;

const result = await Promise.race(active);

if (result.source === 'ws') {
if (result.done) {
wsDone = true;
debug('WorkflowRunner: workflow stream exhausted');
} else {
// Extract LLMStreamEvent from workflow-step-output chunks
const chunk = result.value as {
type?: string;
payload?: {output?: unknown};
};
if (chunk?.type === 'workflow-step-output') {
const output = chunk.payload?.output;
if (isLLMStreamEvent(output)) {
if (output.type !== LLMStreamEventType.Log) {
yield output;
} else {
debug(
'WorkflowRunner: Log event (not forwarded):',
output.data,
);
}
}
}
// Schedule the next read from the workflow stream
wsPromise = wsIter
.next()
.then(r => ({done: r.done, value: r.value, source: 'ws' as const}));
}
} else {
// source === 'queue'
if (result.done) {
qDone = true;
debug('WorkflowRunner: AsyncEventQueue exhausted');
} else {
const event = result.value as LLMStreamEvent;
if (event.type !== LLMStreamEventType.Log) {
yield event;
} else {
debug(
'WorkflowRunner: Log event from queue (not forwarded):',
event.data,
);
}
// Schedule the next read from the queue
qPromise = qIter.next().then(r => ({
done: r.done,
value: r.value,
source: 'queue' as const,
}));
}
}
}

debug('WorkflowRunner: merge complete');
* Using `RequestContext<WorkflowRequestContext>` enables fully typed `.get()` and `.set()`
* calls throughout all workflow steps and the ChatReasoningAgent — zero `any` casts needed.
*
* All keys follow the snake_case convention matching the RequestContext.set() calls in
Comment on lines +39 to +43
/** AbortSignal propagated from the HTTP request's abort controller */
abortSignal: AbortSignal;
/** Authenticated user resolved from LoopBack auth middleware */
currentUser: IAuthUserWithPermissions | undefined;
}
Comment thread package.json
"@loopback/context": "^8.0.11",
"@loopback/repository": "^8.0.11",
"@loopback/sequelize": "^0.8.8",
"@mastra/core": "^1.32.1",
Comment on lines +70 to +74
@inject.getter(AuthenticationBindings.CURRENT_USER)
private readonly getCurrentUser: Getter<IAuthUserWithPermissions>,
@repository(ChatRepository)
private readonly chatRepository: ChatRepository,
) {}
execute: async ({inputData, requestContext}) => {
const ctx = asWorkflowContext(requestContext);
const chatStore = ctx.get('chatStore');
const aiConfig = ctx.get('aiConfig') as {maxTokens?: number} | undefined;
Comment on lines +46 to +48
const aiConfig = ctx.get('aiConfig') as
| {maxSteps?: number; modelName?: string}
| undefined;
Comment on lines +19 to +22
* All SSE events are routed through the AsyncEventQueue stored in RequestContext.
* Token usage is accumulated in TokenUsageAccumulator stored in RequestContext.
*
* The workflow does NOT directly interact with the SSE transport — that is the
const toolCalls = message.messages
?.filter(
(v): v is Message & {metadata: ToolMessageMetadata} =>
v.metadata.type === MessageMetadataType.Tool,
Comment on lines +35 to +40
static trim(
messages: CoreMessageLike[],
maxTokens: number = DEFAULT_MAX_TOKEN_COUNT,
): CoreMessageLike[] {
const totalTokens = messages.reduce(
(sum, m) => sum + ContextWindowManager._countMessageTokens(m),
Copy link
Copy Markdown

@rohit-sourcefuse rohit-sourcefuse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: feat(migration): LangGraph → Mastra Phase 1

Good direction overall. The typed WorkflowRequestContext bridge, function-based model injection on the Agent, and ContextWindowManager are all well-designed. However there are several blockers that prevent this from functioning at runtime, plus two correctness bugs in the concurrency layer.


BLOCKERS — must fix before merge

1. run.stream() not awaited (workflow-runner.ts:111)

run.stream() returns Promise<{stream: AsyncIterable<...>}>. The result is not awaited and stream is not destructured — workflowStream holds a Promise, not an AsyncIterable. workflowStream[Symbol.asyncIterator]() throws TypeError on every chat request. TypeScript misses this because @mastra/core is not installed (return type is any).

// Fix:
const run = chatWorkflow.createRun();
const {stream: workflowStream} = await run.stream({
  inputData: {prompt, files, sessionId},
  requestContext,
});
yield* this._mergeStreams(workflowStream, eventQueue, abortController);

2. AsyncEventQueue single-slot resolver race condition (async-event-queue.ts:17)

_resolve is a single slot. If two pushes happen before the consumer resumes (two simultaneous tool calls), the second push() overwrites _resolve = null and drops the first wakeup. The while(true) loop in the iterator stalls permanently — the SSE stream hangs silently.

// Fix: replace _resolve: (() => void) | null with an array
private readonly _resolvers: Array<() => void> = [];

push(event: LLMStreamEvent): void {
  if (this._closed) return;
  this._queue.push(event);
  this._resolvers.splice(0).forEach(r => r());
}

close(): void {
  this._closed = true;
  this._resolvers.splice(0).forEach(r => r());
}

// In iterator:
await new Promise<void>(resolve => { this._resolvers.push(resolve); });

3. @mastra/core not installed

node_modules/@mastra does not exist. Every import from @mastra/core/workflows, @mastra/core/agent, @mastra/core/request-context fails with Cannot find module. Both CI checks fail for this reason. Run npm install, confirm subpath exports resolve, commit the updated lock-file.

4. @langchain/openrouter does not exist (openrouter.provider.ts:2)

@langchain/openrouter is not a published npm package. This new sub-module does not compile. Either remove it from this PR or implement via @ai-sdk/openai with baseURL: 'https://openrouter.ai/api/v1'.

5. @langchain/classic/vectorstores/memory does not exist (inmemory.vector.ts:2)

No such package. Should be @langchain/community/vectorstores/memory.

6. LLMStreamErrorEvent.data.message breaks the SSE contract (event.types.ts:65)

The new type uses { data: { message: string } }. The TDD spec (§11.4) and existing client expectations use { data: { error: string } }. Any client checking event.data.error receives undefined silently. Change the field to error: string and update the emit site in agent-reasoning.step.ts.


IMPORTANT — address before Phase 2

7. _mergeStreams leaks unhandled promise rejections (workflow-runner.ts:131)

When Promise.race() settles on one source, the other pending .next() promise is abandoned. If it later rejects, Node.js emits UnhandledPromiseRejection. Add a try/finally that calls wsIter.return?.() and qIter.return?.() to properly close both iterators on exit.

8. Log events silently dropped (workflow-runner.ts:180, 202)

LLMStreamEventType.Log events are filtered out from both sources. The existing LangGraph system forwards Log events — they drive "Reading file: ..." and "Context compressed: ..." UI messages. Dropping them silently is a backward-compat regression. Remove the filter or document it explicitly as a breaking change.

9. Unused @repository(ChatRepository) injection (workflow-runner.ts:73)

Injected but never used anywhere in the class body. Remove it.

10. Dead LangGraph services still registered (component.ts:87-103)

CallLLMNode, RunToolNode, InitSessionNode, SummariseFileNode, ContextCompressionNode, EndSessionNode, ChatGraph are all still in this.services. Since GenerationService now uses WorkflowRunner exclusively, these nodes are instantiated per request but never called. Remove them or gate with a rollback feature flag.

11. Silent tool message loss (persist-conversation.step.ts:52-70)

When addAIMessageText() returns undefined, the if (aiMessage) guard silently skips persisting all tool call records. Throw explicitly instead:

if (!aiMessage) {
  throw new Error(
    `PersistConversationStep: failed to persist AI message for session ${sessionId}`,
  );
}

12. AiIntegrationBindings.Checkpointer not marked deprecated (keys.ts:37)

Per TDD §26.8 this key has no Mastra equivalent and should be @deprecated now.


Test coverage

The only new test file stubs WorkflowRunner and verifies call counts — it provides no regression protection for the new Mastra layer. Zero tests for: AsyncEventQueue (push/close/concurrency), ContextWindowManager (system-message-preserved, array-typed content, boundary conditions), TokenUsageAccumulator, WorkflowRunner._mergeStreams (abort, one-source-exhausted), all 6 workflow steps, and ChatStore.toCoreMessage() (especially the tool role branch that constructs the tool-result content array passed directly to the LLM API).

AsyncEventQueue and ContextWindowManager are dependency-free pure classes — adding tests for them would have caught the resolver race before review.


Must fix before merge: items 1–6
Fix before Phase 2: items 7–12 + unit tests for pure utility classes

rohit-sourcefuse

This comment was marked as resolved.

rohit-sourcefuse

This comment was marked as off-topic.

rohit-sourcefuse

This comment was marked as duplicate.

Copy link
Copy Markdown

@rohit-sourcefuse rohit-sourcefuse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline comments on specific issues. See the full request-changes review above for the complete list.

// The iterator yields WorkflowStreamEvent — steps emit via writer.write() which
// surfaces as {type: 'workflow-step-output', payload: {output: <our event>}}.
const workflowStream = run.stream({
inputData: {prompt, files, sessionId},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKER] run.stream() is not awaited — runtime crash on every request

Per @mastra/core ^1.32.1, run.stream() returns Promise<{stream: AsyncIterable<WorkflowStreamEvent>}>. The result must be awaited and the stream property destructured. As written workflowStream holds a Promise — workflowStream[Symbol.asyncIterator]() throws TypeError on every request.

const run = chatWorkflow.createRun();
const {stream: workflowStream} = await run.stream({
  inputData: {prompt, files, sessionId},
  requestContext,
});
yield* this._mergeStreams(workflowStream, eventQueue, abortController);

private readonly getCurrentUser: Getter<IAuthUserWithPermissions>,
@repository(ChatRepository)
private readonly chatRepository: ChatRepository,
) {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused injection — remove chatRepository

@repository(ChatRepository) is injected here but never referenced in the class body. Remove it to eliminate the dead DI dependency.

const output = chunk.payload?.output;
if (isLLMStreamEvent(output)) {
if (output.type !== LLMStreamEventType.Log) {
yield output;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log events silently dropped — backward-compat regression

The LangGraph system forwarded Log events to clients, driving 'Reading file: ...' and 'Context compressed: ...' UI messages. Filtering them here without a changelog entry silently breaks that behavior. Remove the filter or document it explicitly as a breaking change.

export class AsyncEventQueue implements AsyncIterable<LLMStreamEvent> {
private readonly _queue: LLMStreamEvent[] = [];
private _resolve: (() => void) | null = null;
private _closed = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKER] Single-slot resolver — SSE stream stalls on concurrent tool calls

_resolve is a single slot. If two push() calls happen before the consumer resumes (parallel tool calls), the second call drops the first wakeup and the while(true) loop stalls permanently — SSE stream hangs silently with no error.

Replace with an array:

private readonly _resolvers: Array<() => void> = [];

push(event: LLMStreamEvent): void {
  if (this._closed) return;
  this._queue.push(event);
  this._resolvers.splice(0).forEach(r => r());
}

close(): void {
  this._closed = true;
  this._resolvers.splice(0).forEach(r => r());
}

// In iterator:
await new Promise<void>(resolve => { this._resolvers.push(resolve); });

Comment thread src/graphs/event.types.ts

export type LLMStreamErrorEvent = {
type: LLMStreamEventType.Error;
data: {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKER] Error event field name breaks SSE contract

The field is message: string here, but the TDD spec (§11.4) and existing client expectations use error: string. Any client checking event.data.error receives undefined silently.

export type LLMStreamErrorEvent = {
  type: LLMStreamEventType.Error;
  data: { error: string };  // not 'message'
};

Update the emit site in agent-reasoning.step.ts to match: data: {error: errMsg}.

const aiMessage = await chatStore.addAIMessageText(sessionId, finalText);

if (!aiMessage) {
debug('PersistConversation: addAIMessageText returned undefined');
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent tool message loss when addAIMessageText returns undefined

If the AI message persistence fails, the if (aiMessage) guard on line 70 silently skips all tool call records for the turn. Data is lost with only a debug log.

if (!aiMessage) {
  throw new Error(
    `PersistConversationStep: failed to persist AI message for session ${sessionId}`,
  );
}

Comment thread src/component.ts
ChatStore,
// mastra migration
WorkflowRunner,
// graph
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead LangGraph services still registered — remove or gate with a rollback flag

Since GenerationService now delegates entirely to WorkflowRunner, all six LangGraph nodes and ChatGraph in this services array are instantiated per request by LB4 DI but never called. Remove them, or keep them behind a WORKFLOW_ENGINE=langgraph feature flag for rollback safety (TDD §22.5).

Comment thread src/keys.ts
);

// ── Mastra LLM bindings (Phase 1 migration) ──────────────────────────────
/**
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark Checkpointer as @deprecated now (TDD §26.8)

This binding has no Mastra equivalent and will be removed in Phase 5. Consumers need advance notice before @langchain/langgraph is dropped.

/**
 * @deprecated No Mastra equivalent. Never wired in graph.compile({}).
 * Will be removed in Phase 5 when @langchain/langgraph is dropped.
 */
export const Checkpointer = ...

@sonarqubecloud
Copy link
Copy Markdown

SonarQube reviewer guide

Summary: Add Mastra framework integration to replace LangGraph with Phase 1-3 workflows (Chat, DBQuery, Visualization) and migrate GenerationService to use WorkflowRunner instead of ChatGraph.

Review Focus:

  • Core architecture change: GenerationService now delegates to WorkflowRunner which orchestrates Mastra workflows instead of LangGraph
  • New AsyncEventQueue and TokenUsageAccumulator for concurrent event streaming and token tracking
  • Request-scoped context injection enabling per-request LLM/tool/config resolution
  • Large addition of Mastra-native workflow/step/tool implementations (~6000 LOC); verify workflow composition, error handling, and event forwarding
  • Legacy tool compatibility layer converting LangChain IGraphTool → Mastra tools

Start review at: src/services/generation.service.ts. This is the entry point showing the migration from ChatGraph.execute() to WorkflowRunner.executeChatWorkflow(). Understanding this change clarifies the overall direction and helps frame expectations for all downstream workflow implementations. Then review src/mastra/bridge/workflow-runner.ts to understand request context binding, stream merging, and event delivery orchestration.

💬 Please send your feedback

Quality Gate Failed Quality Gate failed

Failed conditions
1 Security Hotspot
21 New Critical Issues (required ≤ 0)
13 New Major Issues (required ≤ 5)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants