Skip to content

Latest commit

 

History

History
552 lines (422 loc) · 20.1 KB

File metadata and controls

552 lines (422 loc) · 20.1 KB

Task Graph DAG Engine

Overview

The Workglow Task Graph engine is the foundational execution layer of the framework. It models computation as a directed acyclic graph (DAG) where nodes are tasks and edges are dataflows that carry typed data between output ports and input ports. The engine is responsible for resolving execution order via topological sorting, propagating data through the graph along dataflow edges, and managing the lifecycle of every task from PENDING through COMPLETED or FAILED.

The engine lives in the @workglow/task-graph package and provides two primary abstractions:

  • TaskGraph -- a low-level DAG container with explicit addTask, addDataflow, and run methods.
  • Workflow -- a high-level builder that chains tasks with pipe(), parallel(), and control-flow helpers, then compiles down to a TaskGraph for execution.

Both abstractions share the same runtime: TaskGraphRunner executes the graph, TaskRunner executes individual tasks, and Dataflow objects shuttle data between them.


Core Concepts

Task

A Task is the atomic unit of computation. Every task is an instance of a class that extends the base Task<Input, Output, Config> class. Tasks declare their shape through static properties and JSON Schema definitions, and provide execution logic via the execute() method.

Key characteristics of a task:

  • Statically typed ports: Input and output ports are defined by inputSchema() and outputSchema() static methods that return JSON Schema (DataPortSchema) objects.
  • Lifecycle-managed: Each task transitions through well-defined statuses (PENDING, PROCESSING, STREAMING, COMPLETED, FAILED, ABORTING, DISABLED).
  • Independently runnable: A task can be executed standalone via task.run() or as part of a graph.
  • Event-driven: Tasks emit events (start, complete, error, progress, status, stream_start, stream_chunk, stream_end) that allow external code to observe execution.

TaskGraph

A TaskGraph wraps a DirectedAcyclicGraph data structure specialized for tasks and dataflows. It enforces the acyclic invariant at the structural level -- you cannot add an edge that would create a cycle.

The TaskGraph provides:

  • Node management (addTask, removeTask, getTask, getTasks)
  • Edge management (addDataflow, removeDataflow, getDataflow, getDataflows)
  • Topological ordering (topologicallySortedNodes)
  • Execution (run, runReactive)
  • Serialization (toJSON, toDependencyJSON)
  • Event subscription (subscribe, subscribeToTaskStatus, subscribeToTaskProgress, subscribeToDataflowStatus, subscribeToTaskStreaming)

Dataflow

A Dataflow is a directed edge that connects one task's output port to another task's input port. It is identified by four components:

sourceTaskId[sourceTaskPortId] ==> targetTaskId[targetTaskPortId]

For example, a dataflow from task A's result port to task B's value port:

new Dataflow("taskA", "result", "taskB", "value");

Dataflows carry a value property that is populated during execution and can also carry streaming data via a ReadableStream<StreamEvent>.

Topological Execution

When a TaskGraph runs, the TaskGraphRunner retrieves all tasks in topological order -- an ordering that guarantees every task executes only after all of its upstream dependencies have completed. For each task in order, the runner:

  1. Copies output data from incoming dataflows into the task's runInputData.
  2. Executes the task.
  3. Pushes the task's output data onto all outgoing dataflows.

This ensures deterministic, dependency-respecting execution without the caller needing to manually manage ordering.


Task Definition

Every task class must declare several static properties and two static schema methods. Here is the minimal structure:

import { Task } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";

interface MyInput { text: string }
interface MyOutput { wordCount: number }

class WordCountTask extends Task<MyInput, MyOutput> {
  static readonly type = "WordCountTask";
  static readonly category = "Text";
  static readonly title = "Word Count";
  static readonly description = "Counts words in a string";
  static readonly cacheable = true;

  static inputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: {
        text: { type: "string", title: "Input Text" },
      },
      required: ["text"],
    } as const satisfies DataPortSchema;
  }

  static outputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: {
        wordCount: { type: "integer", title: "Word Count" },
      },
    } as const satisfies DataPortSchema;
  }

  async execute(input: MyInput): Promise<MyOutput> {
    const words = input.text.trim().split(/\s+/);
    return { wordCount: words.length };
  }
}

Required Static Properties

Property Type Description
type string Unique identifier for this task class in the registry
category string Grouping label for UI organization
title string Human-readable name
description string Brief description of the task's purpose
cacheable boolean Whether results can be cached across runs

Optional Static Properties

Property Type Default Description
hasDynamicSchemas boolean false Set true if schemas change at runtime
passthroughInputsToOutputs boolean false Mirror dynamic input ports to output
customizable boolean false Allow saving as a preset in the builder
isGraphOutput boolean false Mark as the definitive output node of a graph
hasDynamicEntitlements boolean false Entitlements depend on runtime state

The execute() Method

The execute() method receives the validated input and an IExecuteContext object:

async execute(input: Input, context: IExecuteContext): Promise<Output | undefined> {
  // context.signal -- AbortSignal for cancellation
  // context.updateProgress -- report progress (0-100)
  // context.own -- register a child task
  // context.registry -- ServiceRegistry for DI lookups
  return { result: computeSomething(input) };
}

If the task returns undefined, the output is treated as an empty object {}.

The executeReactive() Method

Tasks may optionally override executeReactive() for lightweight, sub-millisecond preview updates:

async executeReactive(
  input: Input,
  output: Output,
  context: IExecuteReactiveContext
): Promise<Output | undefined> {
  // Return a quick preview based on input
  return { ...output, preview: input.text.substring(0, 100) };
}

This method is called during runReactive() and must complete in under 1 millisecond. Heavy computation belongs exclusively in execute().


TaskGraph API Reference

Construction

const graph = new TaskGraph();
// Or with an output cache:
const graph = new TaskGraph({ outputCache: myOutputRepository });

Adding Tasks

// Single task
graph.addTask(new WordCountTask({ defaults: { text: "hello world" } }));

// Multiple tasks
graph.addTasks([taskA, taskB, taskC]);

Adding Dataflows

// Connect taskA's "result" output to taskB's "value" input
graph.addDataflow(new Dataflow(taskA.id, "result", taskB.id, "value"));

// Bulk add
graph.addDataflows([
  new Dataflow(taskA.id, "output", taskB.id, "input"),
  new Dataflow(taskB.id, "output", taskC.id, "input"),
]);

Special Port Identifiers

Constant Value Purpose
DATAFLOW_ALL_PORTS "*" Pass entire output object as input
DATAFLOW_ERROR_PORT "[error]" Route error objects between tasks

Querying the Graph

graph.getTask(taskId);                    // Get task by ID
graph.getTasks();                         // All tasks
graph.topologicallySortedNodes();         // Tasks in execution order
graph.getDataflow(dataflowId);            // Get dataflow by ID
graph.getDataflows();                     // All dataflows
graph.getSourceDataflows(taskId);         // Incoming dataflows for a task
graph.getTargetDataflows(taskId);         // Outgoing dataflows from a task
graph.getSourceTasks(taskId);             // Upstream tasks
graph.getTargetTasks(taskId);             // Downstream tasks

Running the Graph

// Full execution
const results = await graph.run<MyOutput>(
  { text: "hello world" },  // Input for root tasks
  {
    outputCache: true,       // Enable caching
    timeout: 30000,          // 30 second timeout
    maxTasks: 100,           // Safety limit
    parentSignal: controller.signal, // External abort
  }
);

// Reactive execution (previews)
const previews = await graph.runReactive<MyOutput>(
  { text: "hello world" }
);

Run Configuration

The TaskGraphRunConfig interface provides these options:

Option Type Description
outputCache TaskOutputRepository | boolean Cache backend or true to use global
parentSignal AbortSignal Signal to abort the entire graph
registry ServiceRegistry DI registry for this execution
accumulateLeafOutputs boolean Accumulate streaming output for leaf nodes (default true)
timeout number Max execution time in milliseconds
maxTasks number Maximum number of tasks allowed
enforceEntitlements boolean Check entitlements before execution

Abort and Reset

graph.abort();       // Abort all running tasks
graph.resetGraph();  // Reset all tasks to PENDING

Execution Flow

Graph-Level Execution (run)

TaskGraph.run(input, config)
  |
  v
TaskGraphRunner.runGraph(input, config)
  |
  v
For each task in topological order:
  1. copyInputFromEdgesToNode(task)  -- Pull data from incoming dataflows
  2. runTask(task, input)            -- Execute via TaskRunner.run()
  3. pushOutputFromNodeToEdges(task) -- Push output to outgoing dataflows
  |
  v
Collect results from ending nodes (no outgoing dataflows)
Return GraphResultArray<Output>

Task-Level Execution (run)

Task.run(overrides, runConfig)
  |
  v
TaskRunner.run(overrides, config)
  |
  v
1. handleStart()            -- Set status to PROCESSING, create AbortController
2. setInput(overrides)      -- Merge overrides into runInputData
3. resolveSchemaInputs()    -- Resolve format annotations (models, repositories)
4. validateInput()          -- Validate against compiled JSON Schema
5. Check cache              -- If cacheable, look up cached result
6. executeTask()            -- Call task.execute(input, context)
7. Cache result             -- If cacheable, store in output cache
8. handleComplete()         -- Set status to COMPLETED, emit events
  |
  v
Return runOutputData (locked, immutable)

Task Lifecycle States

PENDING --> PROCESSING --> STREAMING --> COMPLETED
                |                          ^
                +---> COMPLETED -----------+
                |
                +---> FAILED
                |
                +---> ABORTING ---> FAILED

PENDING --> DISABLED
Status Description
PENDING Task has not started. Inputs can be modified freely.
PROCESSING Task is currently executing its execute() method.
STREAMING Task has begun producing streaming output chunks.
COMPLETED Execution finished successfully. Output is locked and immutable.
FAILED Execution threw an error.
ABORTING Abort has been requested; cleanup is in progress.
DISABLED Task was disabled (e.g., by a ConditionalTask that deactivated it).

Immutability After Completion

Once a task reaches COMPLETED, its runOutputData is considered immutable. This is a core invariant of the engine. Reactive execution (runReactive) will not modify a completed task's output -- it returns the cached result unchanged.


Workflow Builder

The Workflow class provides a fluent API for constructing task graphs without manually creating dataflow edges.

import { Workflow } from "@workglow/task-graph";

const workflow = new Workflow()
  .addTask(new FetchUrlTask({ defaults: { url: "https://example.com" } }))
  .pipe(new ExtractTextTask())
  .pipe(new WordCountTask());

const results = await workflow.run();

Builder Methods

Method Description
addTask(task) Add a task to the workflow
pipe(...tasks) Chain tasks sequentially with auto-wired dataflows
parallel(tasks) Run tasks in parallel from the current position
group(config) Start a sub-group (wraps in GraphAsTask)
endGroup() Close the current group
map(config) Start a map loop over array inputs
endMap() Close the map loop
reduce(config) Start a reduce loop with accumulator
endReduce() Close the reduce loop
while(config) Start a conditional loop
endWhile() Close the while loop
run(input?, config?) Build the graph and execute
runReactive(input?) Build the graph and execute reactively

Event System

TaskGraph emits events for structural changes and task lifecycle updates:

Graph Structural Events

Event Parameters Description
task_added taskId A task was added to the graph
task_removed taskId A task was removed from the graph
dataflow_added dataflowId A dataflow was added
dataflow_removed dataflowId A dataflow was removed

Streaming Events

Event Parameters Description
task_stream_start taskId A streaming task began output
task_stream_chunk taskId, StreamEvent A streaming chunk was produced
task_stream_end taskId, output Streaming completed

Subscription Helpers

// Subscribe to all task status changes
const unsub = graph.subscribeToTaskStatus((taskId, status) => {
  console.log(`Task ${taskId}: ${status}`);
});

// Subscribe to progress updates
graph.subscribeToTaskProgress((taskId, progress, message) => {
  console.log(`Task ${taskId}: ${progress}% - ${message}`);
});

// Subscribe to streaming events
graph.subscribeToTaskStreaming({
  onStreamStart: (taskId) => console.log(`Stream started: ${taskId}`),
  onStreamChunk: (taskId, event) => console.log(`Chunk:`, event),
  onStreamEnd: (taskId, output) => console.log(`Stream ended:`, output),
});

// Clean up
unsub();

Serialization

TaskGraph supports JSON serialization for persistence and debugging:

// Standard JSON with full structure
const json = graph.toJSON();
// { tasks: [...], dataflows: [...] }

// Dependency-oriented JSON (easier to read)
const deps = graph.toDependencyJSON();
// [{ id, type, defaults, dependencies: { portName: { id, output } } }, ...]

Both methods accept an optional TaskGraphJsonOptions parameter:

graph.toJSON({ withBoundaryNodes: true }); // Include input/output boundary nodes

Helper: serialGraph

For simple linear pipelines where all tasks share the same port names, the serialGraph helper creates a TaskGraph with serial dataflows:

import { serialGraph } from "@workglow/task-graph";

const graph = serialGraph(
  [taskA, taskB, taskC],
  "input",   // input port name
  "output"   // output port name
);

This creates dataflows taskA[input] ==> taskB[output] and taskB[input] ==> taskC[output] automatically.


Complete Example

import { Task, TaskGraph, Dataflow, TaskRegistry } from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";

// Define tasks
class DoubleTask extends Task<{ value: number }, { result: number }> {
  static readonly type = "DoubleTask";
  static readonly category = "Math";
  static readonly title = "Double";
  static readonly description = "Doubles the input value";
  static readonly cacheable = true;

  static inputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: { value: { type: "number" } },
      required: ["value"],
    } as const satisfies DataPortSchema;
  }

  static outputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: { result: { type: "number" } },
    } as const satisfies DataPortSchema;
  }

  async execute(input: { value: number }) {
    return { result: input.value * 2 };
  }
}

class AddTask extends Task<{ a: number; b: number }, { sum: number }> {
  static readonly type = "AddTask";
  static readonly category = "Math";
  static readonly title = "Add";
  static readonly description = "Adds two numbers";
  static readonly cacheable = true;

  static inputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: {
        a: { type: "number" },
        b: { type: "number" },
      },
      required: ["a", "b"],
    } as const satisfies DataPortSchema;
  }

  static outputSchema(): DataPortSchema {
    return {
      type: "object",
      properties: { sum: { type: "number" } },
    } as const satisfies DataPortSchema;
  }

  async execute(input: { a: number; b: number }) {
    return { sum: input.a + input.b };
  }
}

// Register tasks
TaskRegistry.registerTask(DoubleTask);
TaskRegistry.registerTask(AddTask);

// Build graph: double two values, then add them
const doubleA = new DoubleTask({ id: "doubleA", defaults: { value: 5 } });
const doubleB = new DoubleTask({ id: "doubleB", defaults: { value: 3 } });
const add = new AddTask({ id: "add" });

const graph = new TaskGraph();
graph.addTasks([doubleA, doubleB, add]);
graph.addDataflows([
  new Dataflow("doubleA", "result", "add", "a"),
  new Dataflow("doubleB", "result", "add", "b"),
]);

// Execute
const results = await graph.run();
// results = [{ id: "add", type: "AddTask", data: { sum: 16 } }]