A package of task types for common operations, workflow management, and data processing. This package provides building blocks for creating complex task graphs with support for HTTP requests, JavaScript execution, delays, logging, and dynamic task creation.
- Table of Contents
- Installation
- Quick Start
- Available Tasks
- Workflow Integration
- Error Handling
- Configuration
- Testing
- License
bun add @workglow/tasksimport { Workflow, fetch, debugLog, delay } from "@workglow/tasks";
// Simple workflow example (fluent API)
const workflow = new Workflow()
.fetch({ url: "https://api.example.com/data", response_type: "json" })
.debugLog(undefined, { log_level: "info" })
.delay(undefined, { delay: 1000 });
const results = await workflow.run();import { FetchUrlTask, DebugLogTask, DelayTask } from "@workglow/tasks";
// Simple sequence using Task classes directly
const fetchResult = await new FetchUrlTask({
url: "https://api.example.com/data",
response_type: "json",
}).run();
await new DebugLogTask({ console: fetchResult.json }, { log_level: "info" }).run();
await new DelayTask({}, { delay: 1000 }).run();import { fetch, debugLog, delay } from "@workglow/tasks";
const data = await fetch({
url: "https://example.com/readme.txt",
response_type: "text",
});
await debugLog({ console: data.text }, { log_level: "info" });Makes HTTP requests with built-in retry logic, progress tracking, and multiple response types.
Input Schema:
url(string, required): The URL to fetch frommethod(string, optional): HTTP method ("GET", "POST", "PUT", "DELETE", "PATCH"). Default: "GET"headers(object, optional): Headers to send with the requestbody(string, optional): Request body for POST/PUT requestsresponse_type(string, optional): Response format ("json", "text", "blob", "arraybuffer"). Default: "json"timeout(number, optional): Request timeout in millisecondsqueue(boolean|string, optional): Queue handling (falseruns inline when possible,trueuses the task's default queue, strings target a specific registered queue). Default:true
Output Schema:
json(any, optional): JSON response datatext(string, optional): Text response datablob(Blob, optional): Blob response dataarraybuffer(ArrayBuffer, optional): ArrayBuffer response data
Examples:
// Simple GET request
const response = await new FetchUrlTask({
url: "https://api.example.com/users",
response_type: "json",
}).run();
console.log(response.json);
// POST request with headers
const postResponse = await new FetchUrlTask({
url: "https://api.example.com/users",
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer token",
},
body: JSON.stringify({ name: "John", email: "john@example.com" }),
response_type: "json",
}).run();
// Text response
const textResponse = await new FetchUrlTask({
url: "https://example.com/readme.txt",
response_type: "text",
}).run();
console.log(textResponse.text);Features:
- Automatic retry on 429/503 status codes with Retry-After header support (requires creation of a
@workglow/job-queueinstance) - Progress tracking for large downloads
- Request timeout handling
- Queue-based rate limiting (requires creation of a
@workglow/job-queueinstance) - Comprehensive error handling
Provides console logging functionality with multiple log levels for debugging task graphs.
Input Schema:
- Any inputs are accepted and passed through to outputs unchanged.
Config Schema:
log_level(string, optional): Log level ("dir", "log", "debug", "info", "warn", "error"). Default: "log"
Output Schema:
- All inputs passed through unchanged.
Examples:
// Basic logging
await new DebugLogTask({ console: "Processing user data" }, { log_level: "info" }).run();
// Object inspection with dir
await new DebugLogTask(
{ console: { user: { id: 1, name: "John" }, status: "active" } },
{ log_level: "dir" }
).run();
// In workflow with data flow
const workflow = new Workflow()
.fetch({ url: "https://api.example.com/data" })
.debugLog(undefined, { log_level: "dir" }) // Logs the fetched data
.delay(undefined, { delay: 1000 });Features:
- Multiple log levels for different debugging needs
- Deep object inspection with
dirlevel - Pass-through functionality preserves data flow
- Non-cacheable for real-time debugging
Introduces timed delays in workflows with progress tracking and cancellation support.
Input Schema:
- Any inputs are accepted and passed through to outputs unchanged.
Config Schema:
delay(number, optional): Delay duration in milliseconds. Default: 1
Output Schema:
- All inputs passed through unchanged.
Examples:
// Simple delay
await new DelayTask({}, { delay: 5000 }).run(); // 5 second delay
// Delay with data pass-through
const result = await new DelayTask(
{ message: "Data preserved through delay" },
{ delay: 3000 }
).run();
console.log(result); // { message: "Data preserved through delay" }
// In workflow
const workflow = new Workflow()
.fetch({ url: "https://api.example.com/data" })
.delay(undefined, { delay: 2000 }) // 2 second delay
.debugLog(undefined, { log_level: "info" });Features:
- Progress tracking for delays over 100ms
- Cancellation support via AbortSignal
- Chunked delay execution for responsiveness
- All inputs passed through to outputs
Executes JavaScript code strings using a safe interpreter with input/output handling.
Input Schema:
code(string, required): JavaScript code to executeinput(any, optional): Input data available to the code
Output Schema:
output(any): The result of the JavaScript execution
Examples:
// Simple calculation
const result = await JavaScript({
code: "2 + 3 * 4",
});
console.log(result.output); // 14
// Using input data
const processed = await new JavaScriptTask({
code: `
const numbers = input.values;
const sum = numbers.reduce((a, b) => a + b, 0);
const average = sum / numbers.length;
return { sum, average, count: numbers.length };
`,
input: { values: [1, 2, 3, 4, 5] },
}).run();
console.log(processed.output); // { sum: 15, average: 3, count: 5 }
// In workflow
const workflow = new Workflow()
.fetch({ url: "https://api.example.com/data" })
.javaScript({
code: `
const data = input.json;
return data.filter(item => item.active === true);
`,
})
.debugLog({ log_level: "info" });Features:
- Safe JavaScript execution using interpreter
- Access to input data within code
- Error handling and logging
- Suitable for data transformation and filtering
Executes custom JavaScript functions with full access to task context and configuration.
Input Schema:
- Accepts any input data (flexible schema)
Output Schema:
- Returns whatever the provided function outputs
Configuration:
execute: Function for standard executionexecuteReactive: Function for reactive execution with output parameter
Examples:
// Function with execute pattern
const result = await Lambda(
{ numbers: [1, 2, 3, 4, 5] },
{
execute: async (input, context) => {
const sum = input.numbers.reduce((a, b) => a + b, 0);
await context.updateProgress(50, "Calculating sum");
const average = sum / input.numbers.length;
await context.updateProgress(100, "Complete");
return { sum, average };
},
}
);
// Reactive pattern with output parameter
const reactiveResult = await new LambdaTask(
{ message: "Hello" },
{
executeReactive: async (input, output, context) => {
output.processed = input.message.toUpperCase();
output.timestamp = new Date().toISOString();
return output;
},
}
).run();
// Data transformation function
const transformer = await new LambdaTask(
{
data: [
{ name: "John", age: 30 },
{ name: "Jane", age: 25 },
],
},
{
execute: async (input) => {
return {
users: input.data.map((user) => ({
...user,
isAdult: user.age >= 18,
category: user.age < 30 ? "young" : "mature",
})),
};
},
}
).run();
// Async operation with external API
const apiProcessor = await new LambdaTask(
{ userId: 123 },
{
execute: async (input, context) => {
await context.updateProgress(25, "Fetching user data");
const userData = await fetch(`/api/users/${input.userId}`).then((r) => r.json());
await context.updateProgress(75, "Processing data");
const enrichedData = {
...userData,
processedAt: new Date().toISOString(),
isActive: userData.lastLogin > Date.now() - 86400000, // 24 hours
};
await context.updateProgress(100, "Complete");
return enrichedData;
},
}
).run();Features:
- Full access to execution context and progress tracking
- Support for both standard and reactive execution patterns
- Async/await support
- Flexible input/output schemas
- Cacheable by default
Creates and executes task graphs from JSON configurations, enabling dynamic workflow creation.
Input Schema:
json(string, required): JSON string defining tasks and their dependencies
Output Schema:
output(any): Output depends on the generated task graph
JSON Format:
interface JsonTaskItem {
id: string; // Unique task identifier
type: string; // Task type (e.g., "FetchUrlTask", "DelayTask")
input?: any; // Task input data
config?: any; // Task configuration
dependencies?: {
// Input dependencies from other tasks
[inputField: string]:
| {
id: string; // Source task ID
output: string; // Output field from source task
}
| Array<{ id: string; output: string }>;
};
}Examples:
// Simple linear workflow
const linearWorkflow = await new JsonTask({
json: JSON.stringify([
{
id: "fetch-data",
type: "FetchUrlTask",
input: {
url: "https://api.example.com/users",
response_type: "json",
},
},
{
id: "log-data",
type: "DebugLogTask",
config: { log_level: "info" },
dependencies: {
console: { id: "fetch-data", output: "json" },
},
},
{
id: "delay",
type: "DelayTask",
config: { delay: 1000 },
},
]),
}).run();
// Complex workflow with data dependencies
const complexWorkflow = await new JsonTask({
json: JSON.stringify([
{
id: "fetch-users",
type: "FetchUrlTask",
input: {
url: "https://api.example.com/users",
response_type: "json",
},
},
{
id: "fetch-posts",
type: "FetchUrlTask",
input: {
url: "https://api.example.com/posts",
response_type: "json",
},
},
{
id: "combine-data",
type: "JavaScriptTask",
input: {
code: `
const users = input.users;
const posts = input.posts;
return users.map(user => ({
...user,
posts: posts.filter(post => post.userId === user.id)
}));
`,
},
dependencies: {
input: [
{ id: "fetch-users", output: "json" },
{ id: "fetch-posts", output: "json" },
],
},
},
{
id: "log-result",
type: "DebugLogTask",
config: { log_level: "dir" },
dependencies: {
console: { id: "combine-data", output: "output" },
},
},
]),
}).run();
// Dynamic task creation from external config
const configResponse = await fetch("/api/workflow-config");
const workflowConfig = await configResponse.json();
const dynamicWorkflow = await new JsonTask({
json: JSON.stringify(workflowConfig.tasks),
}).run();Features:
- Dynamic task graph creation from JSON
- Support for complex dependency relationships
- All registered task types are available
- Automatic data flow between tasks
- Enables configuration-driven workflows
A compound task that processes arrays by either executing directly for non-array inputs or creating parallel task instances for array inputs. Supports parallel processing of array elements and combination generation when multiple inputs are arrays.
Key Features:
- Automatically handles single values or arrays
- Parallel execution for array inputs
- Generates all combinations when multiple inputs are arrays
- Uses
x-replicateannotation to mark array-capable inputs
Examples:
import { ArrayTask, DataPortSchema } from "@workglow/tasks";
class ArrayProcessorTask extends ArrayTask<{ items: string[] }, { results: string[] }> {
static readonly type = "ArrayProcessorTask";
static inputSchema() {
return {
type: "object",
properties: {
items: {
type: "array",
items: {
type: "string",
},
},
},
required: ["items"],
additionalProperties: false,
} as const satisfies DataPortSchema;
}
static outputSchema() {
return {
type: "object",
properties: {
results: {
type: "array",
items: {
type: "string",
},
},
},
required: ["results"],
additionalProperties: false,
} as const satisfies DataPortSchema;
}
async execute(input: { items: string[] }) {
return { results: input.items.map((item) => item.toUpperCase()) };
}
}
// Process array items in parallel
const task = new ArrayProcessorTask({
items: ["hello", "world", "foo", "bar"],
});
const result = await task.run();
// { results: ["HELLO", "WORLD", "FOO", "BAR"] }Features:
- Parallel processing of array elements
- Automatic task instance creation per array element
- Combination generation for multiple array inputs
- Seamless single-value and array handling
All tasks can be used standalone or integrated into workflows:
import { Workflow } from "@workglow/tasks";
// Fluent workflow API
const workflow = new Workflow()
.fetch({
url: "https://api.example.com/data",
response_type: "json",
})
.javaScript({
code: "return input.json.filter(item => item.status === 'active');",
})
.debugLog(undefined, { log_level: "info" })
.delay(undefined, { delay: 500 })
.lambda(
{},
{
execute: async (input) => ({
processed: true,
count: input.output.length,
timestamp: new Date().toISOString(),
}),
}
);
const result = await workflow.run();Tasks include comprehensive error handling:
try {
const result = await new FetchUrlTask({
url: "https://api.example.com/data",
response_type: "json",
timeout: 5000,
}).run();
} catch (error) {
if (error instanceof TaskInvalidInputError) {
console.error("Invalid input:", error.message);
} else if (error instanceof RetryableJobError) {
console.error("Retryable error:", error.message);
// Will be retried automatically
} else if (error instanceof PermanentJobError) {
console.error("Permanent error:", error.message);
// Will not be retried
}
}Tasks support various configuration options:
// Task-specific configuration
const fetchTask = new FetchUrlTask(
{
url: "https://api.example.com/data",
},
{
queue: "api-requests",
timeout: 10000,
retryAttempts: 3,
}
);
// Global workflow configuration
const workflow = new Workflow({
maxConcurrency: 5,
timeout: 30000,
});bun testApache 2.0 - See LICENSE for details.