The Workglow Task Graph engine is the foundational execution layer of the framework. It models computation as a directed acyclic graph (DAG) where nodes are tasks and edges are dataflows that carry typed data between output ports and input ports. The engine is responsible for resolving execution order via topological sorting, propagating data through the graph along dataflow edges, and managing the lifecycle of every task from PENDING through COMPLETED or FAILED.
The engine lives in the @workglow/task-graph package and provides two primary abstractions:
- TaskGraph -- a low-level DAG container with explicit
addTask,addDataflow, andrunmethods. - Workflow -- a high-level builder that chains tasks with
pipe(),parallel(), and control-flow helpers, then compiles down to a TaskGraph for execution.
Both abstractions share the same runtime: TaskGraphRunner executes the graph, TaskRunner executes individual tasks, and Dataflow objects shuttle data between them.
A Task is the atomic unit of computation. Every task is an instance of a class that extends the base Task<Input, Output, Config> class. Tasks declare their shape through static properties and JSON Schema definitions, and provide execution logic via the execute() method.
Key characteristics of a task:
- Statically typed ports: Input and output ports are defined by
inputSchema()andoutputSchema()static methods that return JSON Schema (DataPortSchema) objects. - Lifecycle-managed: Each task transitions through well-defined statuses (
PENDING,PROCESSING,STREAMING,COMPLETED,FAILED,ABORTING,DISABLED). - Independently runnable: A task can be executed standalone via
task.run()or as part of a graph. - Event-driven: Tasks emit events (
start,complete,error,progress,status,stream_start,stream_chunk,stream_end) that allow external code to observe execution.
A TaskGraph wraps a DirectedAcyclicGraph data structure specialized for tasks and dataflows. It enforces the acyclic invariant at the structural level -- you cannot add an edge that would create a cycle.
The TaskGraph provides:
- Node management (
addTask,removeTask,getTask,getTasks) - Edge management (
addDataflow,removeDataflow,getDataflow,getDataflows) - Topological ordering (
topologicallySortedNodes) - Execution (
run,runPreview) - Serialization (
toJSON,toDependencyJSON) - Event subscription (
subscribe,subscribeToTaskStatus,subscribeToTaskProgress,subscribeToDataflowStatus,subscribeToTaskStreaming)
A Dataflow is a directed edge that connects one task's output port to another task's input port. It is identified by four components:
sourceTaskId[sourceTaskPortId] ==> targetTaskId[targetTaskPortId]
For example, a dataflow from task A's result port to task B's value port:
new Dataflow("taskA", "result", "taskB", "value");Dataflows carry a value property that is populated during execution and can also carry streaming data via a ReadableStream<StreamEvent>.
When a TaskGraph runs, the TaskGraphRunner retrieves all tasks in topological order -- an ordering that guarantees every task executes only after all of its upstream dependencies have completed. For each task in order, the runner:
- Copies output data from incoming dataflows into the task's
runInputData. - Executes the task.
- Pushes the task's output data onto all outgoing dataflows.
This ensures deterministic, dependency-respecting execution without the caller needing to manually manage ordering.
Every task class must declare several static properties and two static schema methods. Here is the minimal structure:
import { Task } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
interface MyInput {
text: string;
}
interface MyOutput {
wordCount: number;
}
class WordCountTask extends Task<MyInput, MyOutput> {
static readonly type = "WordCountTask";
static readonly category = "Text";
static readonly title = "Word Count";
static readonly description = "Counts words in a string";
static readonly cachePolicy = { kind: "deterministic" } as const;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
text: { type: "string", title: "Input Text" },
},
required: ["text"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
wordCount: { type: "integer", title: "Word Count" },
},
} as const satisfies DataPortSchema;
}
async execute(input: MyInput): Promise<MyOutput> {
const words = input.text.trim().split(/\s+/);
return { wordCount: words.length };
}
}| Property | Type | Description |
|---|---|---|
type |
string |
Unique identifier for this task class in the registry |
category |
string |
Grouping label for UI organization |
title |
string |
Human-readable name |
description |
string |
Brief description of the task's purpose |
cachePolicy |
object |
Whether and where results can be cached across runs |
| Property | Type | Default | Description |
|---|---|---|---|
hasDynamicSchemas |
boolean |
false |
Set true if schemas change at runtime |
passthroughInputsToOutputs |
boolean |
false |
Mirror dynamic input ports to output |
customizable |
boolean |
false |
Allow saving as a preset in the builder |
isGraphOutput |
boolean |
false |
Mark as the definitive output node of a graph |
hasDynamicEntitlements |
boolean |
false |
Entitlements depend on runtime state |
The execute() method receives the validated input and an IExecuteContext object:
async execute(input: Input, context: IExecuteContext): Promise<Output | undefined> {
// context.signal -- AbortSignal for cancellation
// context.updateProgress -- report progress (0-100)
// context.own -- register a child task
// context.registry -- ServiceRegistry for DI lookups
return { result: computeSomething(input) };
}If the task returns undefined, the output is treated as an empty object {}.
Tasks may optionally override executePreview() for lightweight, sub-millisecond preview updates:
async executePreview(
input: Input,
context: IExecutePreviewContext
): Promise<Output | undefined> {
// Return a quick preview based on input
return { preview: input.text.substring(0, 100) };
}This method is called only by runPreview() and must complete in under 1 millisecond. Heavy computation belongs exclusively in execute().
run() and runPreview() are strictly orthogonal paths: run() invokes execute() (or executeStream()) and never calls executePreview(); runPreview() invokes executePreview() and never calls execute() or executeStream(). There is no post-execute() overlay, and cache hits during run() return the cached value verbatim.
const graph = new TaskGraph();
// Or with an output cache:
const graph = new TaskGraph({ outputCache: myOutputRepository });// Single task
graph.addTask(new WordCountTask({ defaults: { text: "hello world" } }));
// Multiple tasks
graph.addTasks([taskA, taskB, taskC]);// Connect taskA's "result" output to taskB's "value" input
graph.addDataflow(new Dataflow(taskA.id, "result", taskB.id, "value"));
// Bulk add
graph.addDataflows([
new Dataflow(taskA.id, "output", taskB.id, "input"),
new Dataflow(taskB.id, "output", taskC.id, "input"),
]);| Constant | Value | Purpose |
|---|---|---|
DATAFLOW_ALL_PORTS |
"*" |
Pass entire output object as input |
DATAFLOW_ERROR_PORT |
"[error]" |
Route error objects between tasks |
graph.getTask(taskId); // Get task by ID
graph.getTasks(); // All tasks
graph.topologicallySortedNodes(); // Tasks in execution order
graph.getDataflow(dataflowId); // Get dataflow by ID
graph.getDataflows(); // All dataflows
graph.getSourceDataflows(taskId); // Incoming dataflows for a task
graph.getTargetDataflows(taskId); // Outgoing dataflows from a task
graph.getSourceTasks(taskId); // Upstream tasks
graph.getTargetTasks(taskId); // Downstream tasks// Full execution
const results = await graph.run<MyOutput>(
{ text: "hello world" }, // Input for root tasks
{
outputCache: true, // Enable caching
timeout: 30000, // 30 second timeout
maxTasks: 100, // Safety limit
parentSignal: controller.signal, // External abort
}
);
// Preview execution (lightweight UI updates)
const previews = await graph.runPreview<MyOutput>({ text: "hello world" });The TaskGraphRunConfig interface provides these options:
| Option | Type | Description |
|---|---|---|
outputCache |
TaskOutputRepository | boolean |
Cache backend or true to use global |
parentSignal |
AbortSignal |
Signal to abort the entire graph |
registry |
ServiceRegistry |
DI registry for this execution |
accumulateLeafOutputs |
boolean |
Accumulate streaming output for leaf nodes (default true) |
timeout |
number |
Max execution time in milliseconds |
maxTasks |
number |
Maximum number of tasks allowed |
enforceEntitlements |
boolean |
Check entitlements before execution |
graph.abort(); // Abort all running tasks
graph.resetGraph(); // Reset all tasks to PENDINGTaskGraphRunner is a thin facade that owns three internal collaborator classes plus a per-run state object. Each collaborator has one responsibility; the facade orchestrates them.
TaskGraphRunner (facade)
long-lived: graph, outputCache, processScheduler, previewScheduler,
registry, resourceScope, accumulateLeafOutputs
per-run anchor: currentCtx?: RunContext
owns instances of:
├── RunScheduler — run-loop, abort/timeout, disabled cascade, progress
├── EdgeMaterializer — edge read/write, transforms, error routing
└── StreamPump — streaming inputs, tee fan-out, accumulation
│
▼
RunContext ── per-run, built by handleStart,
discarded by terminal handlers
| Class | Responsibility | Lifetime |
|---|---|---|
TaskGraphRunner |
Public API, lifecycle handlers, per-task orchestration in runTask, runGraphPreview body |
Whole runner |
RunScheduler |
For-await run loop, pushStatusFromNodeToEdges, disabled cascade, progress aggregation, timeout arm/clear |
Whole runner |
EdgeMaterializer |
copyInputFromEdgesToNode, pushOutputFromNodeToEdges, error-port routing, resetTask |
Whole runner |
StreamPump |
prepareStreamingInputs (tee), runStreamingTask, accumulation policy, stream fan-out |
Whole runner |
RunContext |
One run's mutable state — abort controller, in-progress maps, error map, telemetry span, timeout timer, entitlement enforcer | Single runGraph() call |
All four collaborator classes are marked @internal and re-exported from @workglow/task-graph so unit tests can construct them directly.
These are easy to confuse — both relate to a single run, but they live on opposite sides of the start of execution.
TaskGraphRunConfig |
RunContext |
|
|---|---|---|
| Direction | Caller → runner (input) | Runner-internal (state) |
| Lifetime | Provided once before the run starts | Created at run start, destroyed at run end |
| Mutability | Caller-built; runner reads it | Runner-mutated throughout the run |
| Visibility | Public (part of runGraph API) |
@internal collaborator state |
| Purpose | Wishes/policy for the run | Bookkeeping while the run is in flight |
handleStart translates the caller's wishes into runtime state:
config.parentSignal → ctx.abortController (wired listener)
config.timeout → ctx.graphTimeoutTimer + ctx.pendingGraphTimeoutError on fire
config.enforceEntitlements + registry → ctx.activeEnforcer
(no config equivalent) → ctx.runId, ctx.inProgressTasks, ctx.failedTaskErrors, ctx.telemetrySpan
Long-lived state from config (registry, outputCache, resourceScope, accumulateLeafOutputs) goes onto the facade — not into RunContext. Only state that exists only while a run is in flight belongs in RunContext.
Mnemonic: TaskGraphRunConfig is what the caller asks for; RunContext is what the runner is doing right now.
resourceScope is special: the runner sets this.resourceScope from
config.resourceScope when present, but if the caller did not provide one,
runGraph allocates a private ResourceScope and disposeAll()s it in a
finally block when the run terminates (success, error, or abort). Casual
callers get automatic cleanup of heavyweight resources (model unload, AI
session close, browser disconnect) with no ceremony. Caller-passed scopes are
never touched by the runner — expert callers retain full lifecycle control.
The same auto-ownership pattern applies to TaskRunner.run for the bare-task
path. Nested runs (GraphAsTask, IteratorTask, runTask, subGraph.run)
always forward this.resourceScope, which is defined post-handleStart, so
child runners observe a "caller-passed" scope and never own it. Disposal
happens exactly once, at the top of the call stack.
TaskGraph.run(input, config)
|
v
TaskGraphRunner.runGraph(input, config)
1. handleStart(config) -- Build RunContext, arm timeout, start telemetry
2. runScheduler.runLoop(input, config, ctx, edgeMat)
For each task from processScheduler.tasks():
a. streamPump.prepareStreamingInputs(task) -- Tee streaming inputs
b. streamPump.awaitStreamInputs(task, registry) -- Materialize pending streams
c. edgeMaterializer.copyInputFromEdgesToNode(task) -- Pull edge values
d. ctx.activeEnforcer?.checkTask(task) -- Runtime entitlement
e. Streaming branch: streamPump.runStreamingTask(...)
Non-streaming: task.runner.run(...) → edgeMaterializer.pushOutputFromNodeToEdges
f. runScheduler.pushStatusFromNodeToEdges + processScheduler.onTaskCompleted
3. Terminal precedence: timeout > task error > abort > complete
|
v
Collect results from ending nodes (no outgoing dataflows)
Return GraphResultArray<Output>
Task.run(overrides, runConfig)
|
v
TaskRunner.run(overrides, config)
|
v
1. handleStart() -- Set status to PROCESSING, create AbortController
2. setInput(overrides) -- Merge overrides into runInputData
3. resolveSchemaInputs() -- Resolve format annotations (models, repositories)
4. validateInput() -- Validate against compiled JSON Schema
5. Check cache -- If the cache policy allows it, look up cached result
6. executeTask() -- Call task.execute(input, context)
7. Cache result -- If the cache policy allows it, store in output cache
8. handleComplete() -- Set status to COMPLETED, emit events
|
v
Return runOutputData (locked, immutable)
PENDING --> PROCESSING --> STREAMING --> COMPLETED
| ^
+---> COMPLETED -----------+
|
+---> FAILED
|
+---> ABORTING ---> FAILED
PENDING --> DISABLED
| Status | Description |
|---|---|
PENDING |
Task has not started. Inputs can be modified freely. |
PROCESSING |
Task is currently executing its execute() method. |
STREAMING |
Task has begun producing streaming output chunks. |
COMPLETED |
Execution finished successfully. Output is locked and immutable. |
FAILED |
Execution threw an error. |
ABORTING |
Abort has been requested; cleanup is in progress. |
DISABLED |
Task was disabled (e.g., by a ConditionalTask that deactivated it). |
Once a task reaches COMPLETED, its runOutputData is considered immutable. This is a core invariant of the engine. Preview execution (runPreview) will not modify a completed task's output -- it returns the locked output unchanged without invoking executePreview().
The Workflow class provides a fluent API for constructing task graphs without manually creating dataflow edges.
import { Workflow } from "@workglow/task-graph";
const workflow = new Workflow()
.addTask(new FetchUrlTask({ defaults: { url: "https://example.com" } }))
.pipe(new ExtractTextTask())
.pipe(new WordCountTask());
const results = await workflow.run();| Method | Description |
|---|---|
addTask(task) |
Add a task to the workflow |
pipe(...tasks) |
Chain tasks sequentially with auto-wired dataflows |
parallel(tasks) |
Run tasks in parallel from the current position |
group(config) |
Start a sub-group (wraps in GraphAsTask) |
endGroup() |
Close the current group |
map(config) |
Start a map loop over array inputs |
endMap() |
Close the map loop |
reduce(config) |
Start a reduce loop with accumulator |
endReduce() |
Close the reduce loop |
while(config) |
Start a conditional loop |
endWhile() |
Close the while loop |
run(input?, config?) |
Build the graph and execute |
runPreview(input?) |
Build the graph and run preview-only execution |
TaskGraph emits events for structural changes and task lifecycle updates:
| Event | Parameters | Description |
|---|---|---|
task_added |
taskId |
A task was added to the graph |
task_removed |
taskId |
A task was removed from the graph |
dataflow_added |
dataflowId |
A dataflow was added |
dataflow_removed |
dataflowId |
A dataflow was removed |
| Event | Parameters | Description |
|---|---|---|
task_stream_start |
taskId |
A streaming task began output |
task_stream_chunk |
taskId, StreamEvent |
A streaming chunk was produced |
task_stream_end |
taskId, output |
Streaming completed |
// Subscribe to all task status changes
const unsub = graph.subscribeToTaskStatus((taskId, status) => {
console.log(`Task ${taskId}: ${status}`);
});
// Subscribe to progress updates
graph.subscribeToTaskProgress((taskId, progress, message) => {
console.log(`Task ${taskId}: ${progress}% - ${message}`);
});
// Subscribe to streaming events
graph.subscribeToTaskStreaming({
onStreamStart: (taskId) => console.log(`Stream started: ${taskId}`),
onStreamChunk: (taskId, event) => console.log(`Chunk:`, event),
onStreamEnd: (taskId, output) => console.log(`Stream ended:`, output),
});
// Clean up
unsub();TaskGraph supports JSON serialization for persistence and debugging:
// Standard JSON with full structure
const json = graph.toJSON();
// { tasks: [...], dataflows: [...] }
// Dependency-oriented JSON (easier to read)
const deps = graph.toDependencyJSON();
// [{ id, type, defaults, dependencies: { portName: { id, output } } }, ...]Both methods accept an optional TaskGraphJsonOptions parameter:
graph.toJSON({ withBoundaryNodes: true }); // Include input/output boundary nodesFor simple linear pipelines where all tasks share the same port names, the serialGraph helper creates a TaskGraph with serial dataflows:
import { serialGraph } from "@workglow/task-graph";
const graph = serialGraph(
[taskA, taskB, taskC],
"input", // input port name
"output" // output port name
);This creates dataflows taskA[input] ==> taskB[output] and taskB[input] ==> taskC[output] automatically.
import { Task, TaskGraph, Dataflow, TaskRegistry } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
// Define tasks
class DoubleTask extends Task<{ value: number }, { result: number }> {
static readonly type = "DoubleTask";
static readonly category = "Math";
static readonly title = "Double";
static readonly description = "Doubles the input value";
static readonly cachePolicy = { kind: "deterministic" } as const;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: { value: { type: "number" } },
required: ["value"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: { result: { type: "number" } },
} as const satisfies DataPortSchema;
}
async execute(input: { value: number }) {
return { result: input.value * 2 };
}
}
class AddTask extends Task<{ a: number; b: number }, { sum: number }> {
static readonly type = "AddTask";
static readonly category = "Math";
static readonly title = "Add";
static readonly description = "Adds two numbers";
static readonly cachePolicy = { kind: "deterministic" } as const;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
a: { type: "number" },
b: { type: "number" },
},
required: ["a", "b"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: { sum: { type: "number" } },
} as const satisfies DataPortSchema;
}
async execute(input: { a: number; b: number }) {
return { sum: input.a + input.b };
}
}
// Register tasks
TaskRegistry.registerTask(DoubleTask);
TaskRegistry.registerTask(AddTask);
// Build graph: double two values, then add them
const doubleA = new DoubleTask({ id: "doubleA", defaults: { value: 5 } });
const doubleB = new DoubleTask({ id: "doubleB", defaults: { value: 3 } });
const add = new AddTask({ id: "add" });
const graph = new TaskGraph();
graph.addTasks([doubleA, doubleB, add]);
graph.addDataflows([
new Dataflow("doubleA", "result", "add", "a"),
new Dataflow("doubleB", "result", "add", "b"),
]);
// Execute
const results = await graph.run();
// results = [{ id: "add", type: "AddTask", data: { sum: 16 } }]