Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9432bb1
feat(workflow-executor): add UpdateRecordStepExecutor with confirmati…
Mar 19, 2026
a70b36a
refactor(workflow-executor): simplify interruption lookup in UpdateRe…
Mar 19, 2026
6aee69c
refactor(workflow-executor): rename toolConfirmationInterruption to p…
Mar 19, 2026
545bbd0
refactor(workflow-executor): throw on missing pendingUpdate instead o…
Mar 19, 2026
ebf033f
refactor(workflow-executor): type executionResult as union instead of…
Mar 19, 2026
61e2cb0
fix(workflow-executor): preserve pendingUpdate after confirmation for…
Mar 19, 2026
31a27c6
refactor(workflow-executor): remove unsafe cast in UpdateRecordStepEx…
Mar 19, 2026
f7a3edc
refactor(workflow-executor): simplify UpdateRecordStepExecutor and im…
Mar 19, 2026
9819917
refactor(workflow-executor): remove redundant buildSuccessResult/buil…
Mar 19, 2026
11b41b5
refactor(workflow-executor): rename history to previousSteps and docu…
Mar 19, 2026
239ca2f
docs(workflow-executor): document pendingUpdate field on UpdateRecord…
Mar 19, 2026
50905bc
refactor(workflow-executor): extract findField helper to deduplicate …
Mar 19, 2026
129290c
refactor(workflow-executor): apply PR review fixes across executors
Mar 19, 2026
bc3d0c6
refactor(workflow-executor): rename AiTask to RecordTask for clarity
Mar 19, 2026
d895154
refactor(workflow-executor): remove unused fields from RecordTaskStep…
Mar 19, 2026
d1651db
feat(workflow-executor): add ToolTaskStepDefinition for MCP tool steps
Mar 19, 2026
6a49657
fix(workflow-executor): fix prettier formatting on StepDefinition uni…
Mar 19, 2026
949d77a
refactor(workflow-executor): introduce UpdateTarget to reduce resolve…
Mar 20, 2026
b939ac7
test(workflow-executor): address PR review gaps on UpdateRecordStepEx…
Mar 20, 2026
180f3fd
refactor(workflow-executor): rename ToolTask→McpTask and automaticCom…
Mar 20, 2026
1309b54
test(workflow-executor): rename misleading stepId 'ai-step' to 'read-…
Mar 20, 2026
d99fe27
refactor(workflow-executor): simplify userInput to userConfirmed boolean
Mar 20, 2026
c884d37
feat(workflow-executor): add TriggerActionStepExecutor with confirmat…
Mar 20, 2026
b28c467
refactor(workflow-executor): propagate actionName through ActionTarge…
Mar 20, 2026
21e4bac
refactor(workflow-executor): extract ActionRef and FieldRef, align Re…
Mar 20, 2026
e4ad28f
refactor(workflow-executor): rename fieldNames→fieldDisplayNames and …
Mar 20, 2026
30687a8
refactor(workflow-executor): use FieldRef[] in ReadRecord executionPa…
Mar 20, 2026
fd6250e
style(workflow-executor): fix prettier formatting in read-record-step…
Mar 20, 2026
b0bb7c8
refactor(workflow-executor): rename Zod schema keys to remove "displa…
Mar 20, 2026
9b9a1df
refactor(workflow-executor): introduce executor hierarchy and central…
Mar 20, 2026
c8b9247
refactor(workflow-executor): apply Template Method pattern and consol…
Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──

```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
Expand All @@ -60,7 +60,9 @@ src/
├── executors/ # Step executor implementations
│ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers)
│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ └── read-record-step-executor.ts # AI-powered record field reading step
│ ├── read-record-step-executor.ts # AI-powered record field reading step
│ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ └── trigger-action-step-executor.ts # AI-powered action trigger step (with confirmation flow)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
└── index.ts # Barrel exports
Expand Down
25 changes: 25 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,28 @@ export class NoResolvedFieldsError extends WorkflowExecutorError {
super(`None of the requested fields could be resolved: ${fieldNames.join(', ')}`);
}
}

export class NoWritableFieldsError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(`No writable fields on record from collection "${collectionName}"`);
}
}

export class NoActionsError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(`No actions available on collection "${collectionName}"`);
}
}

/**
* Thrown when a step's side effect succeeded (action/update/decision)
* but the resulting state could not be persisted to the RunStore.
*/
export class StepPersistenceError extends WorkflowExecutorError {
readonly cause?: unknown;

constructor(message: string, cause?: unknown) {
super(message);
if (cause !== undefined) this.cause = cause;
}
}
138 changes: 129 additions & 9 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,67 @@
import type { ExecutionContext, StepExecutionResult } from '../types/execution';
import type { CollectionSchema, FieldSchema, RecordRef } from '../types/record';
import type { StepDefinition } from '../types/step-definition';
import type { StepExecutionData } from '../types/step-execution-data';
import type { StepOutcome } from '../types/step-outcome';
import type {
LoadRelatedRecordStepExecutionData,
StepExecutionData,
} from '../types/step-execution-data';
import type { BaseStepStatus, StepOutcome } from '../types/step-outcome';
import type { AIMessage, BaseMessage } from '@langchain/core/messages';
import type { DynamicStructuredTool } from '@langchain/core/tools';

import { SystemMessage } from '@langchain/core/messages';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';

import { MalformedToolCallError, MissingToolCallError } from '../errors';
import {
MalformedToolCallError,
MissingToolCallError,
NoRecordsError,
WorkflowExecutorError,
} from '../errors';
import { isExecutedStepOnExecutor } from '../types/step-execution-data';

export default abstract class BaseStepExecutor<TStep extends StepDefinition = StepDefinition> {
protected readonly context: ExecutionContext<TStep>;

protected readonly schemaCache = new Map<string, CollectionSchema>();

constructor(context: ExecutionContext<TStep>) {
this.context = context;
}

abstract execute(): Promise<StepExecutionResult>;
async execute(): Promise<StepExecutionResult> {
try {
return await this.doExecute();
} catch (error) {
if (error instanceof WorkflowExecutorError) {
return this.buildOutcomeResult({ status: 'error', error: error.message });
}
throw error;

Check failure on line 39 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Expected blank line before this statement

Check failure on line 39 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Expected blank line before this statement
}
}

protected abstract doExecute(): Promise<StepExecutionResult>;

/** Find a field by displayName first, then fallback to fieldName. */
protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined {
return (
schema.fields.find(f => f.displayName === name) ??
schema.fields.find(f => f.fieldName === name)
);
}

/** Builds a StepExecutionResult with the step-type-specific outcome shape. */
protected abstract buildOutcomeResult(outcome: {
status: BaseStepStatus;
error?: string;
}): StepExecutionResult;

/**
* Returns a SystemMessage array summarizing previously executed steps.
* Empty array when there is no history. Ready to spread into a messages array.
*/
protected async buildPreviousStepsMessages(): Promise<SystemMessage[]> {
if (!this.context.history.length) return [];
if (!this.context.previousSteps.length) return [];

const summary = await this.summarizePreviousSteps();

Expand All @@ -40,7 +77,7 @@
private async summarizePreviousSteps(): Promise<string> {
const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);

return this.context.history
return this.context.previousSteps
.map(({ stepDefinition, stepOutcome }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex);

Expand All @@ -61,6 +98,8 @@
if (isExecutedStepOnExecutor(execution)) {
if (execution.executionParams !== undefined) {
lines.push(` Input: ${JSON.stringify(execution.executionParams)}`);
} else if ('pendingData' in execution && execution.pendingData !== undefined) {
lines.push(` Pending: ${JSON.stringify(execution.pendingData)}`);
}

if (execution.executionResult) {
Expand Down Expand Up @@ -94,7 +133,14 @@
*/
private extractToolCallArgs<T = Record<string, unknown>>(response: AIMessage): T {
const toolCall = response.tool_calls?.[0];
if (toolCall?.args) return toolCall.args as T;

if (toolCall !== undefined) {
if (toolCall.args !== undefined && toolCall.args !== null) {
return toolCall.args as T;
}

throw new MalformedToolCallError(toolCall.name ?? 'unknown', 'args field is missing or null');
}

const invalidCall = response.invalid_tool_calls?.[0];

Expand All @@ -107,4 +153,78 @@

throw new MissingToolCallError();
}

/** Returns baseRecordRef + any related records loaded by previous steps. */
protected async getAvailableRecordRefs(): Promise<RecordRef[]> {
const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const relatedRecords = stepExecutions
.filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record')
.map(e => e.record);

return [this.context.baseRecordRef, ...relatedRecords];
}

/** Selects a record ref via AI when multiple are available, returns directly when only one. */
protected async selectRecordRef(
records: RecordRef[],
prompt: string | undefined,
): Promise<RecordRef> {
if (records.length === 0) throw new NoRecordsError();
if (records.length === 1) return records[0];

const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r)));
const identifierTuple = identifiers as [string, ...string[]];

const tool = new DynamicStructuredTool({
name: 'select-record',
description: 'Select the most relevant record for this workflow step.',
schema: z.object({
recordIdentifier: z.enum(identifierTuple),
}),
func: undefined,
});

const messages = [
...(await this.buildPreviousStepsMessages()),
new SystemMessage(
'You are an AI agent selecting the most relevant record for a workflow step.\n' +
'Choose the record whose collection best matches the user request.\n' +
'Pay attention to the collection name of each record.',
),
new HumanMessage(prompt ?? 'Select the most relevant record.'),
];

const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>(
messages,
tool,
);

const selectedIndex = identifiers.indexOf(recordIdentifier);

if (selectedIndex === -1) {
throw new WorkflowExecutorError(
`AI selected record "${recordIdentifier}" which does not match any available record`,
);
}

return records[selectedIndex];
}

/** Fetches a collection schema from WorkflowPort, with caching. */
protected async getCollectionSchema(collectionName: string): Promise<CollectionSchema> {
const cached = this.schemaCache.get(collectionName);
if (cached) return cached;

const schema = await this.context.workflowPort.getCollectionSchema(collectionName);
this.schemaCache.set(collectionName, schema);

return schema;
}

/** Formats a record ref as "Step X - CollectionDisplayName #id". */
protected async toRecordIdentifier(record: RecordRef): Promise<string> {
const schema = await this.getCollectionSchema(record.collectionName);

return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`;
}
}
73 changes: 34 additions & 39 deletions packages/workflow-executor/src/executors/condition-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { StepExecutionResult } from '../types/execution';
import type { ConditionStepDefinition } from '../types/step-definition';
import type { ConditionStepStatus } from '../types/step-outcome';

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';

import { StepPersistenceError } from '../errors';
import BaseStepExecutor from './base-step-executor';

interface GatewayToolArgs {
Expand Down Expand Up @@ -36,7 +38,22 @@ const GATEWAY_SYSTEM_PROMPT = `You are an AI agent selecting the correct option
- Do not refer to yourself as "I" in the response, use a passive formulation instead.`;

export default class ConditionStepExecutor extends BaseStepExecutor<ConditionStepDefinition> {
async execute(): Promise<StepExecutionResult> {
protected buildOutcomeResult(outcome: {
status: ConditionStepStatus;
error?: string;
selectedOption?: string;
}): StepExecutionResult {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
...outcome,
},
};
}

protected async doExecute(): Promise<StepExecutionResult> {
const { stepDefinition: step } = this.context;

const tool = new DynamicStructuredTool({
Expand All @@ -62,50 +79,28 @@ export default class ConditionStepExecutor extends BaseStepExecutor<ConditionSte
new HumanMessage(`**Question**: ${step.prompt ?? 'Choose the most appropriate option.'}`),
];

let args: GatewayToolArgs;
const args = await this.invokeWithTool<GatewayToolArgs>(messages, tool);
const { option: selectedOption, reasoning } = args;

try {
args = await this.invokeWithTool<GatewayToolArgs>(messages, tool);
} catch (error: unknown) {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'error',
error: (error as Error).message,
},
};
await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'condition',
stepIndex: this.context.stepIndex,
executionParams: { answer: selectedOption, reasoning },
executionResult: selectedOption ? { answer: selectedOption } : undefined,
});
} catch (cause) {
throw new StepPersistenceError(
`Condition step state could not be persisted ` +
`(run "${this.context.runId}", step ${this.context.stepIndex})`,
cause,
);
}

const { option: selectedOption, reasoning } = args;

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'condition',
stepIndex: this.context.stepIndex,
executionParams: { answer: selectedOption, reasoning },
executionResult: selectedOption ? { answer: selectedOption } : undefined,
});

if (!selectedOption) {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'manual-decision',
},
};
return this.buildOutcomeResult({ status: 'manual-decision' });
}

return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'success',
selectedOption,
},
};
return this.buildOutcomeResult({ status: 'success', selectedOption });
}
}
Loading
Loading