The Workglow Task Graph engine is the foundational execution layer of the framework. It models computation as a directed acyclic graph (DAG) where nodes are tasks and edges are dataflows that carry typed data between output ports and input ports. The engine is responsible for resolving execution order via topological sorting, propagating data through the graph along dataflow edges, and managing the lifecycle of every task from PENDING through COMPLETED or FAILED.
The engine lives in the @workglow/task-graph package and provides two primary abstractions:
- TaskGraph -- a low-level DAG container with explicit
addTask,addDataflow, andrunmethods. - Workflow -- a high-level builder that chains tasks with
pipe(),parallel(), and control-flow helpers, then compiles down to a TaskGraph for execution.
Both abstractions share the same runtime: TaskGraphRunner executes the graph, TaskRunner executes individual tasks, and Dataflow objects shuttle data between them.
A Task is the atomic unit of computation. Every task is an instance of a class that extends the base Task<Input, Output, Config> class. Tasks declare their shape through static properties and JSON Schema definitions, and provide execution logic via the execute() method.
Key characteristics of a task:
- Statically typed ports: Input and output ports are defined by
inputSchema()andoutputSchema()static methods that return JSON Schema (DataPortSchema) objects. - Lifecycle-managed: Each task transitions through well-defined statuses (
PENDING,PROCESSING,STREAMING,COMPLETED,FAILED,ABORTING,DISABLED). - Independently runnable: A task can be executed standalone via
task.run()or as part of a graph. - Event-driven: Tasks emit events (
start,complete,error,progress,status,stream_start,stream_chunk,stream_end) that allow external code to observe execution.
A TaskGraph wraps a DirectedAcyclicGraph data structure specialized for tasks and dataflows. It enforces the acyclic invariant at the structural level -- you cannot add an edge that would create a cycle.
The TaskGraph provides:
- Node management (
addTask,removeTask,getTask,getTasks) - Edge management (
addDataflow,removeDataflow,getDataflow,getDataflows) - Topological ordering (
topologicallySortedNodes) - Execution (
run,runReactive) - Serialization (
toJSON,toDependencyJSON) - Event subscription (
subscribe,subscribeToTaskStatus,subscribeToTaskProgress,subscribeToDataflowStatus,subscribeToTaskStreaming)
A Dataflow is a directed edge that connects one task's output port to another task's input port. It is identified by four components:
sourceTaskId[sourceTaskPortId] ==> targetTaskId[targetTaskPortId]
For example, a dataflow from task A's result port to task B's value port:
new Dataflow("taskA", "result", "taskB", "value");Dataflows carry a value property that is populated during execution and can also carry streaming data via a ReadableStream<StreamEvent>.
When a TaskGraph runs, the TaskGraphRunner retrieves all tasks in topological order -- an ordering that guarantees every task executes only after all of its upstream dependencies have completed. For each task in order, the runner:
- Copies output data from incoming dataflows into the task's
runInputData. - Executes the task.
- Pushes the task's output data onto all outgoing dataflows.
This ensures deterministic, dependency-respecting execution without the caller needing to manually manage ordering.
Every task class must declare several static properties and two static schema methods. Here is the minimal structure:
import { Task } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
interface MyInput { text: string }
interface MyOutput { wordCount: number }
class WordCountTask extends Task<MyInput, MyOutput> {
static readonly type = "WordCountTask";
static readonly category = "Text";
static readonly title = "Word Count";
static readonly description = "Counts words in a string";
static readonly cacheable = true;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
text: { type: "string", title: "Input Text" },
},
required: ["text"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
wordCount: { type: "integer", title: "Word Count" },
},
} as const satisfies DataPortSchema;
}
async execute(input: MyInput): Promise<MyOutput> {
const words = input.text.trim().split(/\s+/);
return { wordCount: words.length };
}
}| Property | Type | Description |
|---|---|---|
type |
string |
Unique identifier for this task class in the registry |
category |
string |
Grouping label for UI organization |
title |
string |
Human-readable name |
description |
string |
Brief description of the task's purpose |
cacheable |
boolean |
Whether results can be cached across runs |
| Property | Type | Default | Description |
|---|---|---|---|
hasDynamicSchemas |
boolean |
false |
Set true if schemas change at runtime |
passthroughInputsToOutputs |
boolean |
false |
Mirror dynamic input ports to output |
customizable |
boolean |
false |
Allow saving as a preset in the builder |
isGraphOutput |
boolean |
false |
Mark as the definitive output node of a graph |
hasDynamicEntitlements |
boolean |
false |
Entitlements depend on runtime state |
The execute() method receives the validated input and an IExecuteContext object:
async execute(input: Input, context: IExecuteContext): Promise<Output | undefined> {
// context.signal -- AbortSignal for cancellation
// context.updateProgress -- report progress (0-100)
// context.own -- register a child task
// context.registry -- ServiceRegistry for DI lookups
return { result: computeSomething(input) };
}If the task returns undefined, the output is treated as an empty object {}.
Tasks may optionally override executeReactive() for lightweight, sub-millisecond preview updates:
async executeReactive(
input: Input,
output: Output,
context: IExecuteReactiveContext
): Promise<Output | undefined> {
// Return a quick preview based on input
return { ...output, preview: input.text.substring(0, 100) };
}This method is called during runReactive() and must complete in under 1 millisecond. Heavy computation belongs exclusively in execute().
const graph = new TaskGraph();
// Or with an output cache:
const graph = new TaskGraph({ outputCache: myOutputRepository });// Single task
graph.addTask(new WordCountTask({ defaults: { text: "hello world" } }));
// Multiple tasks
graph.addTasks([taskA, taskB, taskC]);// Connect taskA's "result" output to taskB's "value" input
graph.addDataflow(new Dataflow(taskA.id, "result", taskB.id, "value"));
// Bulk add
graph.addDataflows([
new Dataflow(taskA.id, "output", taskB.id, "input"),
new Dataflow(taskB.id, "output", taskC.id, "input"),
]);| Constant | Value | Purpose |
|---|---|---|
DATAFLOW_ALL_PORTS |
"*" |
Pass entire output object as input |
DATAFLOW_ERROR_PORT |
"[error]" |
Route error objects between tasks |
graph.getTask(taskId); // Get task by ID
graph.getTasks(); // All tasks
graph.topologicallySortedNodes(); // Tasks in execution order
graph.getDataflow(dataflowId); // Get dataflow by ID
graph.getDataflows(); // All dataflows
graph.getSourceDataflows(taskId); // Incoming dataflows for a task
graph.getTargetDataflows(taskId); // Outgoing dataflows from a task
graph.getSourceTasks(taskId); // Upstream tasks
graph.getTargetTasks(taskId); // Downstream tasks// Full execution
const results = await graph.run<MyOutput>(
{ text: "hello world" }, // Input for root tasks
{
outputCache: true, // Enable caching
timeout: 30000, // 30 second timeout
maxTasks: 100, // Safety limit
parentSignal: controller.signal, // External abort
}
);
// Reactive execution (previews)
const previews = await graph.runReactive<MyOutput>(
{ text: "hello world" }
);The TaskGraphRunConfig interface provides these options:
| Option | Type | Description |
|---|---|---|
outputCache |
TaskOutputRepository | boolean |
Cache backend or true to use global |
parentSignal |
AbortSignal |
Signal to abort the entire graph |
registry |
ServiceRegistry |
DI registry for this execution |
accumulateLeafOutputs |
boolean |
Accumulate streaming output for leaf nodes (default true) |
timeout |
number |
Max execution time in milliseconds |
maxTasks |
number |
Maximum number of tasks allowed |
enforceEntitlements |
boolean |
Check entitlements before execution |
graph.abort(); // Abort all running tasks
graph.resetGraph(); // Reset all tasks to PENDINGTaskGraph.run(input, config)
|
v
TaskGraphRunner.runGraph(input, config)
|
v
For each task in topological order:
1. copyInputFromEdgesToNode(task) -- Pull data from incoming dataflows
2. runTask(task, input) -- Execute via TaskRunner.run()
3. pushOutputFromNodeToEdges(task) -- Push output to outgoing dataflows
|
v
Collect results from ending nodes (no outgoing dataflows)
Return GraphResultArray<Output>
Task.run(overrides, runConfig)
|
v
TaskRunner.run(overrides, config)
|
v
1. handleStart() -- Set status to PROCESSING, create AbortController
2. setInput(overrides) -- Merge overrides into runInputData
3. resolveSchemaInputs() -- Resolve format annotations (models, repositories)
4. validateInput() -- Validate against compiled JSON Schema
5. Check cache -- If cacheable, look up cached result
6. executeTask() -- Call task.execute(input, context)
7. Cache result -- If cacheable, store in output cache
8. handleComplete() -- Set status to COMPLETED, emit events
|
v
Return runOutputData (locked, immutable)
PENDING --> PROCESSING --> STREAMING --> COMPLETED
| ^
+---> COMPLETED -----------+
|
+---> FAILED
|
+---> ABORTING ---> FAILED
PENDING --> DISABLED
| Status | Description |
|---|---|
PENDING |
Task has not started. Inputs can be modified freely. |
PROCESSING |
Task is currently executing its execute() method. |
STREAMING |
Task has begun producing streaming output chunks. |
COMPLETED |
Execution finished successfully. Output is locked and immutable. |
FAILED |
Execution threw an error. |
ABORTING |
Abort has been requested; cleanup is in progress. |
DISABLED |
Task was disabled (e.g., by a ConditionalTask that deactivated it). |
Once a task reaches COMPLETED, its runOutputData is considered immutable. This is a core invariant of the engine. Reactive execution (runReactive) will not modify a completed task's output -- it returns the cached result unchanged.
The Workflow class provides a fluent API for constructing task graphs without manually creating dataflow edges.
import { Workflow } from "@workglow/task-graph";
const workflow = new Workflow()
.addTask(new FetchUrlTask({ defaults: { url: "https://example.com" } }))
.pipe(new ExtractTextTask())
.pipe(new WordCountTask());
const results = await workflow.run();| Method | Description |
|---|---|
addTask(task) |
Add a task to the workflow |
pipe(...tasks) |
Chain tasks sequentially with auto-wired dataflows |
parallel(tasks) |
Run tasks in parallel from the current position |
group(config) |
Start a sub-group (wraps in GraphAsTask) |
endGroup() |
Close the current group |
map(config) |
Start a map loop over array inputs |
endMap() |
Close the map loop |
reduce(config) |
Start a reduce loop with accumulator |
endReduce() |
Close the reduce loop |
while(config) |
Start a conditional loop |
endWhile() |
Close the while loop |
run(input?, config?) |
Build the graph and execute |
runReactive(input?) |
Build the graph and execute reactively |
TaskGraph emits events for structural changes and task lifecycle updates:
| Event | Parameters | Description |
|---|---|---|
task_added |
taskId |
A task was added to the graph |
task_removed |
taskId |
A task was removed from the graph |
dataflow_added |
dataflowId |
A dataflow was added |
dataflow_removed |
dataflowId |
A dataflow was removed |
| Event | Parameters | Description |
|---|---|---|
task_stream_start |
taskId |
A streaming task began output |
task_stream_chunk |
taskId, StreamEvent |
A streaming chunk was produced |
task_stream_end |
taskId, output |
Streaming completed |
// Subscribe to all task status changes
const unsub = graph.subscribeToTaskStatus((taskId, status) => {
console.log(`Task ${taskId}: ${status}`);
});
// Subscribe to progress updates
graph.subscribeToTaskProgress((taskId, progress, message) => {
console.log(`Task ${taskId}: ${progress}% - ${message}`);
});
// Subscribe to streaming events
graph.subscribeToTaskStreaming({
onStreamStart: (taskId) => console.log(`Stream started: ${taskId}`),
onStreamChunk: (taskId, event) => console.log(`Chunk:`, event),
onStreamEnd: (taskId, output) => console.log(`Stream ended:`, output),
});
// Clean up
unsub();TaskGraph supports JSON serialization for persistence and debugging:
// Standard JSON with full structure
const json = graph.toJSON();
// { tasks: [...], dataflows: [...] }
// Dependency-oriented JSON (easier to read)
const deps = graph.toDependencyJSON();
// [{ id, type, defaults, dependencies: { portName: { id, output } } }, ...]Both methods accept an optional TaskGraphJsonOptions parameter:
graph.toJSON({ withBoundaryNodes: true }); // Include input/output boundary nodesFor simple linear pipelines where all tasks share the same port names, the serialGraph helper creates a TaskGraph with serial dataflows:
import { serialGraph } from "@workglow/task-graph";
const graph = serialGraph(
[taskA, taskB, taskC],
"input", // input port name
"output" // output port name
);This creates dataflows taskA[input] ==> taskB[output] and taskB[input] ==> taskC[output] automatically.
import { Task, TaskGraph, Dataflow, TaskRegistry } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
// Define tasks
class DoubleTask extends Task<{ value: number }, { result: number }> {
static readonly type = "DoubleTask";
static readonly category = "Math";
static readonly title = "Double";
static readonly description = "Doubles the input value";
static readonly cacheable = true;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: { value: { type: "number" } },
required: ["value"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: { result: { type: "number" } },
} as const satisfies DataPortSchema;
}
async execute(input: { value: number }) {
return { result: input.value * 2 };
}
}
class AddTask extends Task<{ a: number; b: number }, { sum: number }> {
static readonly type = "AddTask";
static readonly category = "Math";
static readonly title = "Add";
static readonly description = "Adds two numbers";
static readonly cacheable = true;
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
a: { type: "number" },
b: { type: "number" },
},
required: ["a", "b"],
} as const satisfies DataPortSchema;
}
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: { sum: { type: "number" } },
} as const satisfies DataPortSchema;
}
async execute(input: { a: number; b: number }) {
return { sum: input.a + input.b };
}
}
// Register tasks
TaskRegistry.registerTask(DoubleTask);
TaskRegistry.registerTask(AddTask);
// Build graph: double two values, then add them
const doubleA = new DoubleTask({ id: "doubleA", defaults: { value: 5 } });
const doubleB = new DoubleTask({ id: "doubleB", defaults: { value: 3 } });
const add = new AddTask({ id: "add" });
const graph = new TaskGraph();
graph.addTasks([doubleA, doubleB, add]);
graph.addDataflows([
new Dataflow("doubleA", "result", "add", "a"),
new Dataflow("doubleB", "result", "add", "b"),
]);
// Execute
const results = await graph.run();
// results = [{ id: "add", type: "AddTask", data: { sum: 16 } }]