The Workglow task graph engine provides two distinct execution modes that serve fundamentally different purposes:
-
Full Execution (
run) -- Runs the task'sexecute()method, produces cached and immutable results, and transitions the task toCOMPLETEDstatus. This is the primary execution path for production workloads. -
Reactive Execution (
runReactive) -- Runs the task'sexecuteReactive()method for lightweight, sub-millisecond UI previews. Tasks remain in their current status (typicallyPENDING), and output is treated as temporary and mutable.
This dual-mode architecture allows the same task graph to serve both batch processing and interactive UI scenarios. A user editing an input node in a visual builder sees instant preview updates via runReactive(), while the final "Run" button triggers run() to produce the authoritative, cached output.
Full execution is the authoritative execution path. It:
- Invokes the task's
execute()method with full input validation - Produces deterministic, cacheable results
- Transitions the task to
COMPLETEDstatus - Locks the output as immutable
- Supports caching, streaming, abort signals, timeouts, and telemetry
When you call task.run(overrides, runConfig), execution is delegated to TaskRunner.run():
TaskRunner.run(overrides, config)
|
v
1. handleStart(config)
- Set status = PROCESSING
- Create AbortController
- Link parent signal if provided
- Configure output cache
- Start telemetry span
- Emit "start" and "status" events
|
v
2. setInput(overrides)
- Merge overrides into runInputData
|
v
3. Resolve config schema annotations
- Resolve format annotations (e.g., "mcp-server" references) in task.config
- Uses originalConfig as the resolution source for re-runs
|
v
4. resolveSchemaInputs()
- Resolve format annotations in input (e.g., format:"model", format:"storage:tabular")
- Replace string identifiers with resolved instances from the ServiceRegistry
|
v
5. validateInput()
- Validate runInputData against the compiled JSON Schema (inputSchema)
- Throw TaskInvalidInputError if validation fails
|
v
6. Check abort
- If the signal is already aborted, throw TaskAbortedError
|
v
7. Check cache (if cacheable)
- Look up cached output by (task.type, input)
- If found: use cached result, call executeReactive for UI state, skip execute()
|
v
8. Execute
- If streamable: executeStreamingTask(input) -- consume executeStream() async iterable
- Otherwise: executeTask(input) -- call task.execute(input, context)
- Then: executeTaskReactive(input, output) -- merge reactive overlay
|
v
9. Store in cache (if cacheable and output is new)
|
v
10. handleComplete()
- Set status = COMPLETED
- Record completedAt timestamp
- Set progress = 100
- End telemetry span
- Emit "complete" and "status" events
|
v
Return runOutputData (locked, immutable)
When a TaskGraph executes via run(), the TaskGraphRunner orchestrates all tasks:
TaskGraph.run(input, config)
|
v
TaskGraphRunner.runGraph(input, config)
|
v
For each task in topological order:
|
v
1. copyInputFromEdgesToNode(task)
- For each incoming dataflow, read the dataflow's value
- Write it into task.runInputData at the target port
|
v
2. task.run(taskInput, runConfig)
- Root tasks (no incoming dataflows) receive the graph-level input
- Non-root tasks receive an empty override (dataflows provide data)
|
v
3. pushOutputFromNodeToEdges(task, output)
- For each outgoing dataflow, copy the relevant output port value
- Set the dataflow's status to COMPLETED
|
v
Collect results from ending nodes (tasks with no outgoing dataflows)
Return GraphResultArray<Output>
If any task throws during execute(), the TaskRunner catches the error and:
- Sets
task.status = FAILED - Stores the error on
task.error - Aborts any child subgraphs
- Emits
"error"and"status"events - Re-throws the error to the caller
The TaskGraphRunner catches per-task errors and can abort the entire graph depending on the failure mode.
Tasks support per-task timeouts via the timeout config property:
const task = new MyTask({ timeout: 5000 }); // 5 secondsWhen the timeout elapses, the TaskRunner aborts the task and throws a TaskTimeoutError. The graph-level timeout option in TaskGraphRunConfig applies to the entire graph execution.
Reactive execution exists for UI previews and interactive feedback. When a user edits a task's input in a visual builder, the system calls runReactive() to propagate lightweight updates through the graph without running heavy computation.
Key characteristics:
- Calls
executeReactive()instead ofexecute() - Does not change the task's status (PENDING stays PENDING)
- Output is temporary and mutable (not cached)
- Only affects
PENDINGtasks;COMPLETEDtasks return cached output unchanged - Must complete in under 1 millisecond per task
TaskRunner.runReactive(overrides)
|
v
1. Guard: if status == PROCESSING, return existing output (no re-entry)
|
v
2. setInput(overrides)
- Merge overrides into runInputData
|
v
3. Resolve config and input schema annotations
- Same resolution as full run (models, repositories, etc.)
|
v
4. handleStartReactive()
- Mark reactiveRunning = true (internal flag, no status change)
|
v
5. validateInput()
- Validate against input schema
|
v
6. executeTaskReactive(input, output)
- Call task.executeReactive(input, output, context)
- Merge result with existing output: Object.assign({}, output, reactiveResult)
|
v
7. handleCompleteReactive()
- Mark reactiveRunning = false
|
v
Return runOutputData (temporary, mutable)
The graph-level reactive execution has critical differences from the full run:
TaskGraph.runReactive(input, config)
|
v
TaskGraphRunner.runGraphReactive(input, config)
|
v
For each task in topological order:
|
v
IF task.status == PENDING:
1. task.resetInputData() -- Reset to construction defaults
2. copyInputFromEdgesToNode(task) -- Pull from incoming dataflows
ELSE (COMPLETED):
Skip input modification -- Output is locked, do not touch
|
v
IF isRootTask (no incoming dataflows):
taskInput = input -- Pass graph input to root tasks
ELSE:
taskInput = {}
|
v
task.runReactive(taskInput)
|
v
pushOutputFromNodeToEdges() -- Push output to dataflows
|
v
Return results from ending nodes
The executeReactive() method must be extremely fast. Its contract:
- Target: Complete in under 1 millisecond
- Allowed: Simple transformations, string formatting, preview generation
- Forbidden: Network requests, file I/O, heavy computation, model inference
// CORRECT: Quick preview computation
async executeReactive(input, output, context) {
return { ...output, preview: input.text.substring(0, 200) };
}
// INCORRECT: Heavy computation in reactive path
async executeReactive(input, output, context) {
// This takes 30 seconds and violates the contract
const result = await this.trainNeuralNetwork(input);
return { result };
}The default implementation simply returns the existing output unchanged:
async executeReactive(input, output, context) {
return output;
}Tasks that do not need preview updates can leave this default in place.
The most important invariant in the dual-mode system is: COMPLETED tasks are immutable.
Once run() finishes and a task transitions to COMPLETED:
runOutputDatais locked and cachedrunInputDatamust not be modifiedrunReactive()returns the cached output without callingexecuteReactive()- Dataflows from COMPLETED tasks carry fixed values
This invariant enables several guarantees:
- Cache correctness: The mapping from
(type, input)tooutputis stable. - Determinism: Re-running reactive execution on a partially-completed graph produces consistent results.
- UI consistency: COMPLETED nodes in a visual builder show their final, authoritative output regardless of how many reactive passes occur.
In interactive scenarios, a graph often contains a mix of COMPLETED and PENDING tasks. Consider a three-task pipeline:
[TaskA: COMPLETED] --> [TaskB: COMPLETED] --> [TaskC: PENDING]
If the user edits TaskC's input and triggers runReactive():
- TaskA (COMPLETED): Input is not modified. Returns cached output. Dataflow carries the locked value.
- TaskB (COMPLETED): Same behavior -- returns cached output unchanged.
- TaskC (PENDING): Input is reset to defaults, then dataflow from TaskB populates it.
executeReactive()runs with the new data.
If the user edits TaskA's defaults and wants to re-run the entire pipeline, the graph must first be reset (resetGraph()) to return all tasks to PENDING.
The output cache is a TaskOutputRepository that maps (taskType, input) to output. During run():
// Check cache
if (task.cacheable) {
const cached = await outputCache.getOutput(task.type, inputs);
if (cached) {
// Cache hit: use cached result, skip execute()
task.runOutputData = cached;
// Still call executeReactive() for UI state update
task.runOutputData = await executeTaskReactive(inputs, cached);
return;
}
}
// Cache miss: execute and store
const result = await task.execute(input, context);
if (task.cacheable && result !== undefined) {
await outputCache.saveOutput(task.type, inputs, result);
}Caching can be configured at multiple levels:
// Task-level: static property
class MyTask extends Task {
static readonly cacheable = true;
}
// Instance-level: config override
const task = new MyTask({ cacheable: false });
// Runtime-level: runConfig override
await task.run({}, { cacheable: true });
// Graph-level: enable global cache
await graph.run({}, { outputCache: true });
// Graph-level: custom cache backend
await graph.run({}, { outputCache: myPostgresCache });The precedence order (highest to lowest):
runConfig.cacheable(runtime override)config.cacheable(instance-level)static cacheable(class-level)
Reactive execution never reads from or writes to the cache. It operates purely on temporary in-memory state.
- No performance constraint beyond reasonable execution time
- Use
context.updateProgress(progress, message)to report progress - Check
context.signal.abortedperiodically for long operations - Use the
timeoutconfig property for safety bounds
async execute(input, context) {
for (let i = 0; i < items.length; i++) {
if (context.signal.aborted) throw new TaskAbortedError();
await processItem(items[i]);
await context.updateProgress(Math.round((i / items.length) * 100));
}
return { processed: items.length };
}- Must complete in under 1 millisecond
- No async I/O, no network calls, no heavy computation
- Return previews, summaries, or the existing output unchanged
- Use synchronous operations only when possible
async executeReactive(input, output, context) {
// Quick string preview: microseconds
if (input.text) {
return { ...output, preview: `${input.text.length} characters` };
}
return output;
}- Use
runReactive()for interactive feedback during editing - Call
run()only when the user explicitly requests execution - Use
graph.subscribeToTaskProgress()to show a progress bar - Set
timeoutandmaxTasksto prevent runaway execution
| Method | Mode | Description |
|---|---|---|
task.run(overrides?, runConfig?) |
Full | Execute and return immutable output |
task.runReactive(overrides?) |
Reactive | Execute reactive preview and return temporary output |
task.execute(input, context) |
Full | Override this to implement task logic |
task.executeReactive(input, output, ctx) |
Reactive | Override for sub-1ms preview logic |
task.abort() |
Both | Abort the task via its AbortController |
task.disable() |
Both | Set task to DISABLED status |
task.resetInputData() |
Both | Reset runInputData to construction defaults |
task.setInput(partial) |
Both | Merge partial input into runInputData |
task.setDefaults(partial) |
Both | Update default input values |
task.validateInput(input) |
Both | Validate input against compiled schema |
| Method | Mode | Description |
|---|---|---|
graph.run(input?, config?) |
Full | Execute all tasks in topological order |
graph.runReactive(input?, config?) |
Reactive | Reactive execution for UI previews |
graph.abort() |
Both | Abort all running tasks |
graph.resetGraph() |
Both | Reset all tasks to PENDING |
| Property | Type | Description |
|---|---|---|
running |
boolean |
Whether a full run is in progress |
reactiveRunning |
boolean |
Whether a reactive run is in progress |
task |
ITask |
The task being managed |
inputStreams |
Map<string, Stream> |
Input streams for pass-through streaming |
shouldAccumulate |
boolean |
Whether to accumulate streaming deltas |
Provided to execute():
interface IExecuteContext {
signal: AbortSignal; // For cancellation
updateProgress: (progress: number, message?: string) => Promise<void>;
own: <T>(task: T) => T; // Register child task
registry: ServiceRegistry; // DI container
inputStreams?: Map<string, ReadableStream>; // Upstream streams
}Provided to executeReactive():
interface IExecuteReactiveContext {
own: <T>(task: T) => T; // Register child task
}Update the preview as the user types, then run full execution on submit:
class TextAnalysisTask extends Task<{ text: string }, { wordCount: number; analysis: string }> {
// Full execution: call an AI model
async execute(input, context) {
const analysis = await callAIModel(input.text, { signal: context.signal });
return { wordCount: input.text.split(/\s+/).length, analysis };
}
// Reactive preview: instant word count, placeholder analysis
async executeReactive(input, output) {
return {
wordCount: (input.text || "").split(/\s+/).filter(Boolean).length,
analysis: output.analysis ?? "Run to generate analysis...",
};
}
}Only compute a preview when the input has actually changed:
class ImageFilterTask extends Task<{ image: Blob; filter: string }, { preview: string }> {
private lastFilter: string | undefined;
async executeReactive(input, output) {
if (input.filter === this.lastFilter) {
return output; // No change, return existing output
}
this.lastFilter = input.filter;
return { preview: `Preview: ${input.filter} applied` };
}
}Handle a graph where some tasks are complete and others are pending:
const graph = new TaskGraph();
// ... add tasks and dataflows ...
// First run completes everything
await graph.run({ text: "hello" });
// All tasks are now COMPLETED
// User edits a middle task's input -- need to re-run from that point
// Reset only downstream tasks (or reset entire graph)
graph.resetGraph();
// Now all tasks are PENDING again; run with new input
await graph.run({ text: "hello world" });| Aspect | run() |
runReactive() |
|---|---|---|
| Purpose | Full, authoritative run | UI previews |
| Method called | execute() |
executeReactive() |
| Final status | COMPLETED | Unchanged (stays PENDING) |
| Output | Locked, immutable | Temporary, mutable |
| Caching | Read + Write | Neither |
| Dataflow updates | Always applied | Only for PENDING tasks |
| Performance target | No constraint | < 1ms per task |
| Telemetry | Spans recorded | No telemetry |
| Abort support | Full (signal + timeout) | No abort support |
| Progress events | Emitted | Not emitted |