The Workglow task graph engine connects tasks through dataflow edges that carry typed data between output and input ports. While the DAG structure defines the dependency order, dataflows define the data contracts -- which output port feeds which input port, how values are propagated, and whether data flows as a complete materialized value or as a stream of incremental events.
This document covers two interconnected systems:
- Dataflow -- the edge abstraction that connects task ports, carries values, and manages lifecycle status.
- Streaming -- the system for incremental output delivery, including stream modes, stream events, the
x-streamschema annotation, and delta accumulation.
Together, these systems enable everything from simple port-to-port value passing to real-time token-by-token streaming of AI model output through a multi-task pipeline.
A Dataflow represents a single directed edge from one task's output port to another task's input port. It is defined by four identifiers:
const dataflow = new Dataflow(
sourceTaskId, // ID of the source task
sourceTaskPortId, // Name of the output port on the source task
targetTaskId, // ID of the target task
targetTaskPortId // Name of the input port on the target task
);The dataflow's id is a deterministic string derived from these four components:
sourceTaskId[sourceTaskPortId] ==> targetTaskId[targetTaskPortId]
For example: "task-abc[result] ==> task-def[value]".
Each dataflow maintains its own lifecycle state, mirroring the task lifecycle:
| Property | Type | Description |
|---|---|---|
value |
any |
The materialized data carried by the dataflow |
status |
TaskStatus |
Current lifecycle status |
error |
TaskError |
Error object if the dataflow failed |
stream |
ReadableStream<StreamEvent> |
Active stream for streaming tasks |
The dataflow status transitions mirror the source task's execution:
PENDING --> PROCESSING --> STREAMING --> COMPLETED
|
+--> FAILED
When the TaskGraphRunner pushes output from a completed task onto outgoing dataflows, it calls setPortData(entireDataBlock):
// If sourceTaskPortId is a specific port name:
dataflow.value = entireDataBlock[sourceTaskPortId];
// If sourceTaskPortId is "*" (DATAFLOW_ALL_PORTS):
dataflow.value = entireDataBlock; // Entire output object
// If sourceTaskPortId is "[error]" (DATAFLOW_ERROR_PORT):
dataflow.error = entireDataBlock;When the runner copies input from dataflows into a target task, it calls getPortData():
// If targetTaskPortId is a specific port name:
return { [targetTaskPortId]: dataflow.value };
// If targetTaskPortId is "*" (DATAFLOW_ALL_PORTS):
return dataflow.value; // Entire value object
// If targetTaskPortId is "[error]" (DATAFLOW_ERROR_PORT):
return { "[error]": dataflow.error };| Constant | Value | Description |
|---|---|---|
DATAFLOW_ALL_PORTS |
"*" |
Pass the entire output/input object, not a single port |
DATAFLOW_ERROR_PORT |
"[error]" |
Route error objects between tasks for error handling |
The wildcard port "*" is used when a task should receive the complete output object from its upstream dependency, rather than a single named property.
Dataflows validate that source and target ports are semantically compatible by inspecting their JSON Schema types:
dataflow.semanticallyCompatible(graph, dataflow);
// Returns: "static" | "runtime" | "incompatible"| Result | Meaning |
|---|---|
"static" |
Types are statically compatible |
"runtime" |
Compatibility can only be determined at runtime |
"incompatible" |
Types are known to be incompatible |
Compatibility results are cached for tasks with stable schemas. Tasks with hasDynamicSchemas = true bypass the cache because their schemas may change between checks.
Calling dataflow.reset() returns the dataflow to its initial state:
dataflow.reset();
// value = undefined
// status = PENDING
// error = undefined
// stream = undefined
// compatibility cache clearedTask input and output ports are defined by the properties of the JSON Schema returned by inputSchema() and outputSchema(). Each property name is a port identifier, and the property's schema defines the port's type contract.
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
text: { type: "string", title: "Generated Text" },
confidence: { type: "number", minimum: 0, maximum: 1 },
},
} as const satisfies DataPortSchema;
}This task has two output ports: text (string) and confidence (number).
Ports support several custom annotations beyond standard JSON Schema:
| Annotation | Type | Description |
|---|---|---|
x-stream |
string |
Streaming mode: "append", "replace", or "object" |
x-ui-hidden |
boolean |
Hide from UI display |
x-ui-iteration |
boolean |
Iteration context port (hidden from parent display) |
x-ui-manual |
boolean |
User-added port (dynamic) |
x-auto-generated |
boolean |
Auto-generated primary key |
x-structured-output |
boolean |
Port schema used for structured AI output |
format |
string |
Semantic type hint (e.g., "model", "storage:tabular", "knowledge-base") |
Workglow's streaming layer uses three primitives that serve distinct, non-overlapping roles. Authors only ever write one of them. The other two are engine internals.
| Primitive | Role | Where it appears | Who writes it |
|---|---|---|---|
AsyncIterable<StreamEvent> |
Authoring | Task.executeStream(), AiProviderStreamFn |
Task / provider authors |
ReadableStream<StreamEvent> |
Engine-internal tee | Dataflow.stream, TaskGraphRunner fan-out |
Engine only (do not use in tasks) |
EventEmitter (task events) |
Observation only | stream_start, stream_chunk, stream_end on Task |
Engine emits; consumers subscribe |
All streaming tasks and providers MUST return AsyncIterable<StreamEvent> -- typically as an async function* generator.
// Task: use `async *executeStream()`
async *executeStream(input, context): AsyncIterable<StreamEvent> {
yield { type: "text-delta", port: "text", textDelta: "Hello" };
yield { type: "finish", data: {} as Output };
}
// Provider: the `AiProviderStreamFn` signature IS an AsyncIterable-returning function
export const MyProvider_Stream: AiProviderStreamFn = async function* (input, model, signal) {
for await (const delta of underlyingSdk.stream(input, { signal })) {
yield { type: "text-delta", port: "text", textDelta: delta };
}
yield { type: "finish", data: {} };
};Why AsyncIterable for authoring:
- Pull-based backpressure is inherent. When the consumer is slow,
yieldsuspends the generator. No buffer bloat without writing any extra code. - Cancel cleanup is natural. A
try { ... } finally { ... }in the generator runs when the consumer stops iterating -- including onAbortSignalfiring. - Trivial to write. One
async function*; no manual controller plumbing.
ReadableStream<StreamEvent> exists in exactly two engine-internal places:
Dataflow.stream-- the edge-level stream attached to dataflows that carry streaming data between tasks.TaskGraphRunnerfan-out -- when one upstream streaming task feeds multiple downstream consumers, the engine usesReadableStream.tee()to split the stream cleanly.
Why ReadableStream is the right tool here:
tee()is the correct fan-out primitive -- splitting anAsyncIterableto N consumers would require building (and maintaining) a custom broadcaster.cancel()back-propagates to the producer, matching the existing abort plumbing.- It is a Web-platform standard available uniformly in browsers, workers, Node, and Bun.
Authors do not write ReadableStream directly. The engine wraps task-authored AsyncIterables into ReadableStreams at the dataflow edge. If you find yourself reaching for new ReadableStream(...) in a task, you are on the wrong layer.
One exception lives in HFT_Pipeline.ts: that code wraps an HTTP Response.body (already a ReadableStream<Uint8Array>) into an abort-aware, pull-based ReadableStream<Uint8Array> for multi-GB model downloads. That wrapping is below the task layer -- it is shaping an external I/O primitive, not authoring a task stream.
Tasks emit stream_start, stream_chunk, and stream_end via EventEmitter for UI and telemetry observers. These events:
- MUST NOT be used for data transfer between tasks. Data transfer goes through dataflows (AsyncIterable -> engine tee -> downstream consumer).
- Carry no backpressure. A slow listener cannot slow a producer. This is intentional -- observers must never be able to stall the pipeline.
- Are safe to subscribe to from any number of listeners.
| Task | Answer |
|---|---|
| Writing a new task that streams output | async *executeStream() returning AsyncIterable |
| Writing a new AI provider stream function | AiProviderStreamFn (async generator) |
| Passing streaming data between two tasks | Don't -- the engine handles this via dataflows |
| Forking one stream to multiple downstream tasks | Don't -- the engine tees via ReadableStream.tee() |
| Subscribing to streaming progress from the UI | graph.subscribeToTaskStreaming({...}) (EventEmitter) |
| Logging or telemetry on streaming lifecycle | task.on("stream_chunk", ...) (EventEmitter) |
The x-stream annotation on output port schemas declares how a task produces streaming output. When a task's output schema includes ports with x-stream, and the task implements executeStream(), the TaskRunner uses the streaming execution path.
No streaming. The task returns its output as a complete Promise<Output> from execute(). This is the default when x-stream is absent.
// No x-stream annotation: standard execution
properties: {
result: {
type: "string",
},
}Each chunk is a text delta -- a new token or fragment to be appended to the accumulated output. The consumer is responsible for concatenating deltas.
properties: {
text: {
type: "string",
"x-stream": "append", // Token-by-token streaming
}
}Produces StreamTextDelta events:
{ type: "text-delta", port: "text", textDelta: " Hello" }
{ type: "text-delta", port: "text", textDelta: " world" }
{ type: "text-delta", port: "text", textDelta: "!" }
// Accumulated: " Hello world!"Each chunk is a complete snapshot of the output so far. The consumer replaces its state with the latest snapshot.
properties: {
image: {
type: "object",
"x-stream": "replace", // Progressive refinement
}
}Produces StreamSnapshot events:
{ type: "snapshot", data: { image: lowResVersion } }
{ type: "snapshot", data: { image: mediumResVersion } }
{ type: "snapshot", data: { image: highResVersion } }Each chunk is a progressively more complete partial object. The consumer replaces (not merges) its state with each update.
properties: {
structured: {
type: "object",
"x-stream": "object", // Structured streaming
}
}Produces StreamObjectDelta events:
{ type: "object-delta", port: "structured", objectDelta: { name: "Al" } }
{ type: "object-delta", port: "structured", objectDelta: { name: "Alice", age: 30 } }
{ type: "object-delta", port: "structured", objectDelta: { name: "Alice", age: 30, role: "Engineer" } }When multiple output ports use different stream modes, the overall task stream mode is "mixed". This is detected automatically by getOutputStreamMode():
properties: {
text: { type: "string", "x-stream": "append" },
metadata: { type: "object", "x-stream": "object" },
}
// getOutputStreamMode() returns "mixed"All streaming data flows through the StreamEvent discriminated union type:
type StreamEvent<Output = Record<string, any>> =
| StreamTextDelta
| StreamObjectDelta
| StreamSnapshot<Output>
| StreamFinish<Output>
| StreamError;interface StreamTextDelta {
type: "text-delta";
port: string; // Output port name
textDelta: string; // Incremental text fragment
}Used with x-stream: "append". Each event carries a fragment of text that should be appended to the accumulated result for the named port.
interface StreamObjectDelta {
type: "object-delta";
port: string; // Output port name
objectDelta: Record<string, unknown> | unknown[]; // Progressive partial object
}Used with x-stream: "object". Each event carries a progressively more complete object snapshot. Consumers should replace (not merge) their state with the latest delta.
interface StreamSnapshot<Output = Record<string, any>> {
type: "snapshot";
data: Output; // Complete snapshot of current output state
}Used with x-stream: "replace". Each event carries a full snapshot of the output. During graph execution, the runner updates task.runOutputData with the snapshot before emitting the stream_chunk event.
interface StreamFinish<Output = Record<string, any>> {
type: "finish";
data: Output; // Final output data
}Signals that the stream has ended. In append mode, the TaskRunner enriches this event with accumulated text (when shouldAccumulate is true). In replace mode, data contains the final output.
Provider convention: Provider stream functions must yield { type: "finish", data: {} as Output } -- an empty finish event. The TaskRunner handles accumulation. Providers must not accumulate deltas themselves.
interface StreamError {
type: "error";
error: Error; // The error that occurred
}Signals that the stream encountered a fatal error. The TaskRunner throws this error, transitioning the task to FAILED status.
Before executing a task, the TaskRunner checks whether streaming is appropriate:
function isTaskStreamable(task): boolean {
// Must implement executeStream()
if (typeof task.executeStream !== "function") return false;
// Must have at least one x-stream annotated output port
return getOutputStreamMode(task.outputSchema()) !== "none";
}If a task declares streaming output via x-stream but does not implement executeStream(), the runner falls back to non-streaming execute() and logs a warning.
When streaming begins, the TaskRunner:
- Validates the output schema has appropriate
x-streamannotations - Emits
stream_startevent on the task - Calls
task.executeStream(input, context)to obtain the async iterable - Begins consuming events
For each event from the async iterable:
text-delta:
- Accumulate text per-port (if shouldAccumulate)
- Emit "stream_chunk" on the task
- Update progress (asymptotic curve: 1 - e^(-0.05*chunkCount))
object-delta:
- Accumulate per-port (if shouldAccumulate)
- Update runOutputData progressively
- Emit "stream_chunk"
snapshot:
- Update runOutputData BEFORE emitting (so listeners see latest state)
- Emit "stream_chunk"
finish:
- If accumulating: merge accumulated text/objects into finish data
- Set finalOutput
- Emit enriched "stream_chunk" with complete data
error:
- Throw the error (handled by run()'s catch block)
After the first chunk, the task status transitions to STREAMING.
After all events are consumed:
- Check if the task was aborted during streaming
- Set
task.runOutputDatato the final accumulated output - Emit
stream_endevent with the complete output - Call
executeTaskReactive()for any reactive overlay - Return the final output
The TaskRunner's shouldAccumulate flag controls whether text-delta and object-delta events are accumulated into a final output value:
true(default): Text deltas are concatenated per-port. Object deltas are stored per-port. The finish event is enriched with accumulated data before emission.false: All events pass through unmodified. No accumulation maps are maintained.
The graph runner sets shouldAccumulate based on whether any downstream edge needs materialized data:
- Accumulate: When the task has downstream dataflows that need complete values, or when caching is enabled.
- Don't accumulate: When all downstream edges are also streaming (pure pass-through) and no cache is needed.
Given a streaming AI task with x-stream: "append" on the text port:
Event 1: { type: "text-delta", port: "text", textDelta: "Hello" }
Event 2: { type: "text-delta", port: "text", textDelta: " world" }
Event 3: { type: "text-delta", port: "text", textDelta: "!" }
Event 4: { type: "finish", data: { model: "gpt-4" } } // Provider finish (no text)
With shouldAccumulate = true, the runner:
- Accumulates:
text -> "Hello world!" - On finish: merges accumulated text into finish data
- Emits enriched finish:
{ type: "finish", data: { text: "Hello world!", model: "gpt-4" } } - Downstream dataflows receive
{ text: "Hello world!", model: "gpt-4" }as the materialized value
When a dataflow carries a stream (rather than a materialized value), calling dataflow.awaitStreamValue() consumes the stream to completion:
await dataflow.awaitStreamValue();
// After: dataflow.value contains the materialized port data
// After: dataflow.stream is cleared (set to undefined)The method prioritizes events:
snapshotevents: Use the last snapshot datafinishevents: Use the finish data (which may include accumulated text from the source)text-delta/object-delta: Ignored here (source task handles accumulation)
The edgeNeedsAccumulation() function determines whether a specific dataflow edge needs its source's stream to be accumulated:
function edgeNeedsAccumulation(sourceSchema, sourcePort, targetSchema, targetPort): boolean {
const sourceMode = getPortStreamMode(sourceSchema, sourcePort);
if (sourceMode === "none") return false;
const targetMode = getPortStreamMode(targetSchema, targetPort);
return sourceMode !== targetMode;
}If the source port streams in "append" mode but the target port does not declare "append" on its input, the edge needs accumulation to materialize the value.
Every task execution carries an AbortSignal. What happens when it fires, and what each task is responsible for, is defined here.
Each TaskRunner owns an AbortController, created in handleStart() and wired into the task via IExecuteContext.signal:
┌─────────────────┐ AbortSignal ┌──────────────────────────┐
│ TaskRunner │ ─────────────> │ task.execute(input, ctx) │
│ AbortController│ │ task.executeStream(...) │
└─────────────────┘ │ ctx.signal ───────────┼──> provider / fetch / SDK
└──────────────────────────┘
taskRunner.abort()callsAbortController.abort()on its own controller, and recursively aborts any owned subgraph (task.subGraph.abort()).- Graph-level
graph.abort()aborts every active task runner in the graph. - Aborting an upstream task causes its dataflows to transition to
ABORTING(the terminal aborted state); downstream tasks that have not yet started will never start.
- Either poll
context.signal.abortedat safe loop boundaries, or forwardcontext.signalto any I/O it performs (fetch, SDK client, subprocess, child task). - Stop producing output promptly. A streaming task's generator must return (or throw) on the next
yieldpoint after abort, not continue producing events. - Release resources. Close readers, cancel timers, unregister listeners. A
try { ... } finally { ... }block in anasync *executeStream()generator is the idiomatic cleanup hook --finallyruns when the consumer stops iterating (including on abort). - Throw
TaskAbortedErrorif the abort is detected insideexecute()/executeStream(). The TaskRunner also detects abort on its own and converts it to aTaskAbortedErrorwhen the generator finishes after the signal fires, via the post-stream abort check inTaskRunner.executeStreamingTask.
Tasks do not need to manually set the task status -- the runner's handleAbort() transitions the task to ABORTING (the terminal aborted state) and attaches a TaskAbortedError.
When a streaming task is aborted mid-stream:
task.runOutputDatamay contain partially accumulated data (whatever text deltas or object deltas were received before the abort).task.statustransitions toABORTING(the terminal aborted state -- there is no separateABORTEDvalue).task.erroris set to aTaskAbortedError.- Downstream consumers MUST treat
ABORTINGstatus as a failure, not as a valid result, even ifrunOutputDatalooks superficially complete.
If you need guaranteed-complete results for a downstream computation, check task.status === COMPLETED before reading runOutputData.
When an upstream task aborts, the graph runner's behavior depends on the downstream task's current state:
| Downstream state | What happens |
|---|---|
PENDING |
Task is never started. Its dataflows transition to ABORTING. |
PROCESSING |
Receives abort via its own AbortSignal (which was derived from the graph). Task follows the normal cancel path. |
STREAMING |
Receives an error StreamEvent on its input stream; runner throws TaskAbortedError. |
COMPLETED |
No effect -- completed tasks are immutable. |
Any task overriding executeStream() (or execute() with non-trivial cancel behavior) SHOULD document its cancel contract in a @cancel JSDoc tag on the class or method. Minimum contents:
- What I/O or resources are opened during execution.
- What happens to partial state on abort (discarded, flushed, etc.).
- Whether side effects are reversible.
Example:
/**
* Streams text from an HTTP endpoint.
*
* @cancel Forwards `context.signal` to the underlying `fetch`. On abort:
* the HTTP connection is torn down by the browser/runtime, the reader is
* released in the generator's `finally` block, and any partial text
* accumulated on the task is discarded (status becomes `ABORTING`).
* No side effects to clean up.
*/
async *executeStream(input, context): AsyncIterable<StreamEvent> { ... }For provider stream functions (AiProviderStreamFn), the equivalent contract is documented on the function type itself -- signal forwarding to the underlying SDK is mandatory.
- Per-provider cancel forwarding audit -- most providers (Anthropic, OpenAI, Gemini, Ollama, llamacpp) rely on their SDK's handling of
AbortSignal. A systematic audit confirming each SDK promptly stops yielding after abort is still open. - Bounded tee buffer --
ReadableStream.tee()buffers for the slowest consumer with no cap. AmaxBufferedEventssafety limit is a possible future hardening.
Dataflows emit events for lifecycle changes:
| Event | Parameters | Description |
|---|---|---|
start |
-- | Dataflow status set to PROCESSING |
streaming |
-- | Dataflow status set to STREAMING |
complete |
-- | Dataflow status set to COMPLETED |
error |
TaskError |
Dataflow status set to FAILED |
abort |
-- | Dataflow status set to ABORTING |
disabled |
-- | Dataflow status set to DISABLED |
reset |
-- | Dataflow reset to initial state |
status |
TaskStatus |
Any status change |
const unsub = dataflow.subscribe("status", (status) => {
console.log(`Dataflow ${dataflow.id}: ${status}`);
});
// One-time listener
dataflow.once("complete", () => {
console.log(`Value: ${dataflow.value}`);
});
// Promise-based wait
await dataflow.waitOn("complete");const unsub = graph.subscribeToDataflowStatus((dataflowId, status) => {
console.log(`Dataflow ${dataflowId}: ${status}`);
});class Dataflow {
// Construction
constructor(sourceTaskId, sourceTaskPortId, targetTaskId, targetTaskPortId);
static createId(sourceTaskId, sourcePortId, targetTaskId, targetPortId): DataflowIdType;
// Identity
readonly id: DataflowIdType;
sourceTaskId: TaskIdType;
sourceTaskPortId: string;
targetTaskId: TaskIdType;
targetTaskPortId: string;
// State
value: any;
status: TaskStatus;
error: TaskError | undefined;
stream: ReadableStream<StreamEvent> | undefined;
// Value management
setPortData(entireDataBlock: any): void;
getPortData(): TaskOutput;
setStatus(status: TaskStatus): void;
reset(): void;
// Stream management
setStream(stream: ReadableStream<StreamEvent>): void;
getStream(): ReadableStream<StreamEvent> | undefined;
awaitStreamValue(): Promise<void>;
// Compatibility
semanticallyCompatible(graph, dataflow): "static" | "runtime" | "incompatible";
invalidateCompatibilityCache(): void;
// Events
subscribe(event, callback): () => void;
on(event, callback): void;
off(event, callback): void;
once(event, callback): void;
waitOn(event): Promise<any>;
emit(event, ...args): void;
// Serialization
toJSON(): DataflowJson;
}For constructing dataflows from the string ID format:
const dataflow = new DataflowArrow("taskA[result] ==> taskB[value]");
// Equivalent to: new Dataflow("taskA", "result", "taskB", "value")// Get the stream mode of a specific port
function getPortStreamMode(schema: DataPortSchema, portId: string): StreamMode;
// Get all streaming ports with their modes
function getStreamingPorts(schema: DataPortSchema): Array<{ port: string; mode: StreamMode }>;
// Get the dominant output stream mode for a task
function getOutputStreamMode(outputSchema: DataPortSchema): StreamMode;
// Check if a task supports streaming execution
function isTaskStreamable(task: { outputSchema(); executeStream? }): boolean;
// Get the first append-mode port name
function getAppendPortId(schema: DataPortSchema): string | undefined;
// Get the first object-mode port name
function getObjectPortId(schema: DataPortSchema): string | undefined;
// Check if a dataflow edge needs value accumulation
function edgeNeedsAccumulation(sourceSchema, sourcePort, targetSchema, targetPort): boolean;
// Get schemas for structured output ports
function getStructuredOutputSchemas(schema: DataPortSchema): Map<string, JsonSchema>;
// Check if any port has structured output
function hasStructuredOutput(schema: DataPortSchema): boolean;import { Task, TaskGraph, Dataflow } from "@workglow/task-graph";
const producer = new ProducerTask({ id: "producer" });
const consumer = new ConsumerTask({ id: "consumer" });
const graph = new TaskGraph();
graph.addTasks([producer, consumer]);
graph.addDataflow(new Dataflow("producer", "output", "consumer", "input"));
const results = await graph.run();class StreamingTextTask extends Task<{ prompt: string }, { text: string }> {
static readonly type = "StreamingTextTask";
static readonly cacheable = false;
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
text: {
type: "string",
"x-stream": "append", // Enable append-mode streaming
},
},
} as const satisfies DataPortSchema;
}
async *executeStream(input, context): AsyncIterable<StreamEvent> {
const response = await fetch("/api/generate", {
method: "POST",
body: JSON.stringify({ prompt: input.prompt }),
signal: context.signal,
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield {
type: "text-delta",
port: "text",
textDelta: decoder.decode(value),
};
}
yield { type: "finish", data: {} as any };
}
}const graph = new TaskGraph();
// ... add streaming tasks ...
// Listen for streaming events at the graph level
const unsub = graph.subscribeToTaskStreaming({
onStreamStart: (taskId) => {
console.log(`Stream started: ${taskId}`);
showSpinner(taskId);
},
onStreamChunk: (taskId, event) => {
if (event.type === "text-delta") {
appendToUI(taskId, event.textDelta);
} else if (event.type === "object-delta") {
updateStructuredView(taskId, event.objectDelta);
}
},
onStreamEnd: (taskId, output) => {
console.log(`Stream ended: ${taskId}`, output);
hideSpinner(taskId);
},
});
await graph.run({ prompt: "Explain quantum computing" });
unsub();// Pass entire output object as input using DATAFLOW_ALL_PORTS
import { DATAFLOW_ALL_PORTS } from "@workglow/task-graph";
graph.addDataflow(
new Dataflow(
"producer",
DATAFLOW_ALL_PORTS, // All output properties
"consumer",
DATAFLOW_ALL_PORTS // Spread into all input properties
)
);const dataflow = new Dataflow("taskA", "text", "taskB", "input");
graph.addDataflow(dataflow);
const compat = dataflow.semanticallyCompatible(graph, dataflow);
if (compat === "incompatible") {
console.warn(`Port types are incompatible: ${dataflow.id}`);
}class StructuredOutputTask extends Task<{ query: string }, { result: object }> {
static outputSchema(): DataPortSchema {
return {
type: "object",
properties: {
result: {
type: "object",
"x-stream": "object",
"x-structured-output": true,
properties: {
name: { type: "string" },
age: { type: "number" },
skills: { type: "array", items: { type: "string" } },
},
},
},
} as const satisfies DataPortSchema;
}
async *executeStream(input, context): AsyncIterable<StreamEvent> {
// Simulate progressive object construction
yield { type: "object-delta", port: "result", objectDelta: { name: "Alice" } };
yield { type: "object-delta", port: "result", objectDelta: { name: "Alice", age: 30 } };
yield {
type: "object-delta",
port: "result",
objectDelta: { name: "Alice", age: 30, skills: ["TypeScript", "Rust"] },
};
yield { type: "finish", data: {} as any };
}
}