Skip to content
Open
5 changes: 3 additions & 2 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──

```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
Expand All @@ -61,7 +61,8 @@ src/
│ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers)
│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ ├── read-record-step-executor.ts # AI-powered record field reading step
│ └── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ └── trigger-action-step-executor.ts # AI-powered action trigger step (with confirmation flow)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
└── index.ts # Barrel exports
Expand Down
19 changes: 19 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,22 @@ export class NoWritableFieldsError extends WorkflowExecutorError {
super(`No writable fields on record from collection "${collectionName}"`);
}
}

export class NoActionsError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(`No actions available on collection "${collectionName}"`);
}
}

/**
* Thrown when a step's side effect succeeded (action/update/decision)
* but the resulting state could not be persisted to the RunStore.
*/
export class StepPersistenceError extends WorkflowExecutorError {
readonly cause?: unknown;

constructor(message: string, cause?: unknown) {
super(message);
if (cause !== undefined) this.cause = cause;
}
}
52 changes: 28 additions & 24 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
LoadRelatedRecordStepExecutionData,
StepExecutionData,
} from '../types/step-execution-data';
import type { StepOutcome } from '../types/step-outcome';
import type { BaseStepStatus, StepOutcome } from '../types/step-outcome';
import type { AIMessage, BaseMessage } from '@langchain/core/messages';

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
Expand All @@ -29,7 +29,18 @@
this.context = context;
}

abstract execute(): Promise<StepExecutionResult>;
async execute(): Promise<StepExecutionResult> {
try {
return await this.doExecute();
} catch (error) {
if (error instanceof WorkflowExecutorError) {
return this.buildOutcomeResult({ status: 'error', error: error.message });
}
throw error;

Check failure on line 39 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Expected blank line before this statement

Check failure on line 39 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Expected blank line before this statement
}
}

protected abstract doExecute(): Promise<StepExecutionResult>;

/** Find a field by displayName first, then fallback to fieldName. */
protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined {
Expand All @@ -39,25 +50,11 @@
);
}

/**
* Builds a StepExecutionResult with the given status and optional error.
* Only for record-task executors — hardcodes type: 'record-task'.
* ConditionStepExecutor and future non-record-task executors must NOT call this method.
*/
protected buildOutcomeResult(
status: 'success' | 'error' | 'awaiting-input',
error?: string,
): StepExecutionResult {
return {
stepOutcome: {
type: 'record-task',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status,
...(error !== undefined && { error }),
},
};
}
/** Builds a StepExecutionResult with the step-type-specific outcome shape. */
protected abstract buildOutcomeResult(outcome: {
status: BaseStepStatus;
error?: string;
}): StepExecutionResult;

/**
* Returns a SystemMessage array summarizing previously executed steps.
Expand Down Expand Up @@ -101,8 +98,8 @@
if (isExecutedStepOnExecutor(execution)) {
if (execution.executionParams !== undefined) {
lines.push(` Input: ${JSON.stringify(execution.executionParams)}`);
} else if (execution.type === 'update-record' && execution.pendingUpdate) {
lines.push(` Pending: ${JSON.stringify(execution.pendingUpdate)}`);
} else if ('pendingData' in execution && execution.pendingData !== undefined) {
lines.push(` Pending: ${JSON.stringify(execution.pendingData)}`);
}

if (execution.executionResult) {
Expand Down Expand Up @@ -136,7 +133,14 @@
*/
private extractToolCallArgs<T = Record<string, unknown>>(response: AIMessage): T {
const toolCall = response.tool_calls?.[0];
if (toolCall?.args) return toolCall.args as T;

if (toolCall !== undefined) {
if (toolCall.args !== undefined && toolCall.args !== null) {
return toolCall.args as T;
}

throw new MalformedToolCallError(toolCall.name ?? 'unknown', 'args field is missing or null');
}

const invalidCall = response.invalid_tool_calls?.[0];

Expand Down
78 changes: 34 additions & 44 deletions packages/workflow-executor/src/executors/condition-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { StepExecutionResult } from '../types/execution';
import type { ConditionStepDefinition } from '../types/step-definition';
import type { ConditionStepStatus } from '../types/step-outcome';

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';

import { WorkflowExecutorError } from '../errors';
import { StepPersistenceError } from '../errors';
import BaseStepExecutor from './base-step-executor';

interface GatewayToolArgs {
Expand Down Expand Up @@ -37,7 +38,22 @@ const GATEWAY_SYSTEM_PROMPT = `You are an AI agent selecting the correct option
- Do not refer to yourself as "I" in the response, use a passive formulation instead.`;

export default class ConditionStepExecutor extends BaseStepExecutor<ConditionStepDefinition> {
async execute(): Promise<StepExecutionResult> {
protected buildOutcomeResult(outcome: {
status: ConditionStepStatus;
error?: string;
selectedOption?: string;
}): StepExecutionResult {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
...outcome,
},
};
}

protected async doExecute(): Promise<StepExecutionResult> {
const { stepDefinition: step } = this.context;

const tool = new DynamicStructuredTool({
Expand All @@ -63,54 +79,28 @@ export default class ConditionStepExecutor extends BaseStepExecutor<ConditionSte
new HumanMessage(`**Question**: ${step.prompt ?? 'Choose the most appropriate option.'}`),
];

let args: GatewayToolArgs;
const args = await this.invokeWithTool<GatewayToolArgs>(messages, tool);
const { option: selectedOption, reasoning } = args;

try {
args = await this.invokeWithTool<GatewayToolArgs>(messages, tool);
} catch (error) {
if (error instanceof WorkflowExecutorError) {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'error',
error: error.message,
},
};
}

throw error;
await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'condition',
stepIndex: this.context.stepIndex,
executionParams: { answer: selectedOption, reasoning },
executionResult: selectedOption ? { answer: selectedOption } : undefined,
});
} catch (cause) {
throw new StepPersistenceError(
`Condition step state could not be persisted ` +
`(run "${this.context.runId}", step ${this.context.stepIndex})`,
cause,
);
}

const { option: selectedOption, reasoning } = args;

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'condition',
stepIndex: this.context.stepIndex,
executionParams: { answer: selectedOption, reasoning },
executionResult: selectedOption ? { answer: selectedOption } : undefined,
});

if (!selectedOption) {
return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'manual-decision',
},
};
return this.buildOutcomeResult({ status: 'manual-decision' });
}

return {
stepOutcome: {
type: 'condition',
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
status: 'success',
selectedOption,
},
};
return this.buildOutcomeResult({ status: 'success', selectedOption });
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { StepExecutionResult } from '../types/execution';
import type { CollectionSchema, RecordRef } from '../types/record';
import type { CollectionSchema } from '../types/record';
import type { RecordTaskStepDefinition } from '../types/step-definition';
import type { FieldReadResult } from '../types/step-execution-data';

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { z } from 'zod';

import { NoReadableFieldsError, NoResolvedFieldsError, WorkflowExecutorError } from '../errors';
import BaseStepExecutor from './base-step-executor';
import { NoReadableFieldsError, NoResolvedFieldsError } from '../errors';
import RecordTaskStepExecutor from './record-task-step-executor';

const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request.
Select the field(s) that best answer the request. You can read one field or multiple fields at once.
Expand All @@ -18,50 +18,40 @@ Important rules:
- Final answer is definitive, you won't receive any other input from the user.
- Do not refer to yourself as "I" in the response, use a passive formulation instead.`;

export default class ReadRecordStepExecutor extends BaseStepExecutor<RecordTaskStepDefinition> {
async execute(): Promise<StepExecutionResult> {
export default class ReadRecordStepExecutor extends RecordTaskStepExecutor<RecordTaskStepDefinition> {
protected async doExecute(): Promise<StepExecutionResult> {
const { stepDefinition: step } = this.context;
const records = await this.getAvailableRecordRefs();

let selectedRecordRef: RecordRef;
let schema: CollectionSchema;
let fieldResults: FieldReadResult[];

try {
selectedRecordRef = await this.selectRecordRef(records, step.prompt);
schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const selectedDisplayNames = await this.selectFields(schema, step.prompt);
const resolvedFieldNames = selectedDisplayNames
.map(name => this.findField(schema, name)?.fieldName)
.filter((name): name is string => name !== undefined);

if (resolvedFieldNames.length === 0) {
throw new NoResolvedFieldsError(selectedDisplayNames);
}

const recordData = await this.context.agentPort.getRecord(
selectedRecordRef.collectionName,
selectedRecordRef.recordId,
resolvedFieldNames,
);
fieldResults = this.formatFieldResults(recordData.values, schema, selectedDisplayNames);
} catch (error) {
if (error instanceof WorkflowExecutorError) {
return this.buildOutcomeResult('error', error.message);
}

throw error;
const selectedRecordRef = await this.selectRecordRef(records, step.prompt);
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const selectedDisplayNames = await this.selectFields(schema, step.prompt);
const resolvedFieldNames = selectedDisplayNames
.map(name => this.findField(schema, name)?.fieldName)
.filter((name): name is string => name !== undefined);

if (resolvedFieldNames.length === 0) {
throw new NoResolvedFieldsError(selectedDisplayNames);
}

const recordData = await this.context.agentPort.getRecord(
selectedRecordRef.collectionName,
selectedRecordRef.recordId,
resolvedFieldNames,
);
const fieldResults = this.formatFieldResults(recordData.values, schema, selectedDisplayNames);

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'read-record',
stepIndex: this.context.stepIndex,
executionParams: { fieldNames: fieldResults.map(f => f.fieldName) },
executionParams: {
fields: fieldResults.map(({ name, displayName }) => ({ name, displayName })),
},
executionResult: { fields: fieldResults },
selectedRecordRef,
});

return this.buildOutcomeResult('success');
return this.buildOutcomeResult({ status: 'success' });
}

private async selectFields(
Expand Down Expand Up @@ -114,16 +104,16 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor<RecordTaskS
private formatFieldResults(
values: Record<string, unknown>,
schema: CollectionSchema,
fieldNames: string[],
fieldDisplayNames: string[],
): FieldReadResult[] {
return fieldNames.map(name => {
return fieldDisplayNames.map(name => {
const field = this.findField(schema, name);

if (!field) return { error: `Field not found: ${name}`, fieldName: name, displayName: name };
if (!field) return { error: `Field not found: ${name}`, name, displayName: name };

return {
value: values[field.fieldName],
fieldName: field.fieldName,
name: field.fieldName,
displayName: field.displayName,
};
});
Expand Down
Loading
Loading