The Workglow storage layer (@workglow/storage) provides four unified interfaces -- IKvStorage, ITabularStorage, IQueueStorage, and IVectorStorage -- that abstract over multiple persistence backends. Application code programs against these interfaces, and the concrete backend is selected at configuration time. This design allows the same pipeline to run with in-memory storage during development, SQLite for single-machine deployments, PostgreSQL for production clusters, IndexedDB in the browser, and Supabase for managed cloud infrastructure.
Every storage interface is event-driven, schema-typed, and backend-agnostic. The storage layer also integrates with Workglow's service registry for dependency injection and with the task system's input resolution mechanism for runtime schema-based lookups.
The simplest storage interface. It maps typed keys to typed values, similar to a Map but with persistence, events, and bulk operations.
interface IKvStorage<
Key extends string | number = string,
Value extends any = any,
Combined = { key: Key; value: Value },
> {
put(key: Key, value: Value): Promise<void>;
putBulk(items: Array<{ key: Key; value: Value }>): Promise<void>;
get(key: Key): Promise<Value | undefined>;
delete(key: Key): Promise<void>;
getAll(): Promise<Combined[] | undefined>;
deleteAll(): Promise<void>;
size(): Promise<number>;
getObjectAsIdString(object: JSONValue): Promise<string>;
// Event methods
on<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
off<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
emit<Event extends KvEventName>(name: Event, ...args: ...): void;
once<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
waitOn<Event extends KvEventName>(name: Event): Promise<...>;
}| Event | Parameters | Description |
|---|---|---|
put |
(key, value) |
A value was stored |
get |
(key, value | undefined) |
A value was retrieved |
getAll |
(results | undefined) |
All values were retrieved |
delete |
(key) |
A value was deleted |
deleteall |
() |
All values were deleted |
| Backend | Class | Runtime | Notes |
|---|---|---|---|
| In-Memory | InMemoryKvStorage |
All | Fast, non-persistent |
| SQLite | SqliteKvStorage |
Node, Bun | File-based persistence |
| PostgreSQL | PostgresKvStorage |
Node, Bun | Network-based |
| Supabase | SupabaseKvStorage |
All | Cloud PostgreSQL |
| IndexedDB | IndexedDbKvStorage |
Browser | Browser-native |
| Filesystem (JSON) | FsFolderJsonKvStorage |
Node, Bun | One JSON file per key |
| Filesystem | FsFolderKvStorage |
Node, Bun | Raw file storage |
| Via Tabular | KvViaTabularStorage |
All | Adapts tabular storage as KV |
| Telemetry | TelemetryKvStorage |
All | Wraps another KV with tracing |
import { InMemoryKvStorage } from "@workglow/storage";
const cache = new InMemoryKvStorage();
await cache.put("user:123", { name: "Alice", role: "admin" });
const user = await cache.get("user:123");
// { name: "Alice", role: "admin" }
cache.on("put", (key, value) => {
console.log(`Cached: ${key}`);
});The workhorse interface for structured data with typed schemas, compound primary keys, querying, pagination, change subscriptions, and auto-generated keys. It is the foundation for document storage, knowledge base metadata, model registries, and more.
interface ITabularStorage<
Schema extends DataPortSchemaObject,
PrimaryKeyNames extends ReadonlyArray<keyof Schema["properties"]>,
Entity = FromSchema<Schema, TypedArraySchemaOptions>,
PrimaryKey = SimplifyPrimaryKey<Entity, PrimaryKeyNames>,
InsertType = InsertEntity<Entity, AutoGeneratedKeys<Schema>>,
> {
put(value: InsertType): Promise<Entity>;
putBulk(values: InsertType[]): Promise<Entity[]>;
get(key: PrimaryKey): Promise<Entity | undefined>;
delete(key: PrimaryKey | Entity): Promise<void>;
getAll(options?: QueryOptions<Entity>): Promise<Entity[] | undefined>;
deleteAll(): Promise<void>;
size(): Promise<number>;
deleteSearch(criteria: DeleteSearchCriteria<Entity>): Promise<void>;
getBulk(offset: number, limit: number): Promise<Entity[] | undefined>;
records(pageSize?: number): AsyncGenerator<Entity, void, undefined>;
pages(pageSize?: number): AsyncGenerator<Entity[], void, undefined>;
query(criteria: SearchCriteria<Entity>, options?: QueryOptions<Entity>): Promise<Entity[] | undefined>;
subscribeToChanges(callback: (change: TabularChangePayload<Entity>) => void, options?: TabularSubscribeOptions): () => void;
setupDatabase(): Promise<void>;
destroy(): void;
}Schemas are JSON Schema objects (DataPortSchemaObject) that define both the shape of the data and metadata used by the storage layer:
import type { DataPortSchemaObject } from "@workglow/util/schema";
const UserSchema = {
type: "object",
properties: {
user_id: { type: "string", "x-auto-generated": true },
name: { type: "string" },
email: { type: "string" },
created_at: { type: "string" },
},
required: ["user_id", "name", "email", "created_at"],
additionalProperties: false,
} as const satisfies DataPortSchemaObject;
const UserPrimaryKey = ["user_id"] as const;The x-auto-generated: true annotation marks fields whose values are assigned by the storage backend. For integer types, values auto-increment. For string types, a UUID is generated. When inserting, these fields are optional in the InsertType.
The query method supports equality and comparison operators:
// Equality match
const results = await storage.query({ category: "electronics" });
// With comparison operator
const recent = await storage.query({
created_at: { value: "2025-01-01", operator: ">=" },
});
// Multiple criteria (AND logic)
const filtered = await storage.query(
{
category: "electronics",
price: { value: 100, operator: "<" },
},
{
orderBy: [{ column: "price", direction: "ASC" }],
limit: 10,
offset: 0,
}
);Two async generator methods support efficient pagination over large datasets:
// Iterate individual records
for await (const record of storage.records(100)) {
process(record);
}
// Iterate pages
for await (const page of storage.pages(100)) {
await processBatch(page);
}Subscribe to INSERT, UPDATE, and DELETE events -- including changes from other processes:
const unsubscribe = storage.subscribeToChanges((change) => {
console.log(change.type); // "INSERT" | "UPDATE" | "DELETE"
console.log(change.old); // previous entity (for UPDATE/DELETE)
console.log(change.new); // new entity (for INSERT/UPDATE)
}, { pollingIntervalMs: 1000 });
// Later: stop listening
unsubscribe();| Event | Parameters | Description |
|---|---|---|
put |
(entity) |
An entity was inserted or updated |
get |
(key, entity | undefined) |
An entity was retrieved |
query |
(criteria, entities | undefined) |
A query was executed |
delete |
(key) |
An entity was deleted |
clearall |
() |
All entities were deleted |
| Backend | Class | Runtime | Notes |
|---|---|---|---|
| In-Memory | InMemoryTabularStorage |
All | Fast, non-persistent |
| Shared In-Memory | SharedInMemoryTabularStorage |
All | Shared between instances |
| SQLite | SqliteTabularStorage |
Node, Bun | File-based persistence |
| PostgreSQL | PostgresTabularStorage |
Node, Bun | Network-based |
| Supabase | SupabaseTabularStorage |
All | Cloud PostgreSQL |
| IndexedDB | IndexedDbTabularStorage |
Browser | Browser-native |
| Filesystem | FsFolderTabularStorage |
Node, Bun | Directory-based |
| HuggingFace | HuggingFaceTabularStorage |
All | HuggingFace datasets |
| Cached | CachedTabularStorage |
All | Read-through cache wrapper |
| Telemetry | TelemetryTabularStorage |
All | Wraps with tracing |
Specialized storage for job queue persistence. It extends beyond basic CRUD to support atomic job claiming, abort signaling, progress tracking, and TTL-based cleanup.
interface IQueueStorage<Input, Output> {
add(job: JobStorageFormat<Input, Output>): Promise<unknown>;
get(id: unknown): Promise<JobStorageFormat<Input, Output> | undefined>;
next(workerId: string): Promise<JobStorageFormat<Input, Output> | undefined>;
peek(status?: JobStatus, num?: number): Promise<Array<JobStorageFormat<Input, Output>>>;
size(status?: JobStatus): Promise<number>;
complete(job: JobStorageFormat<Input, Output>): Promise<void>;
deleteAll(): Promise<void>;
outputForInput(input: Input): Promise<Output | null>;
abort(id: unknown): Promise<void>;
getByRunId(runId: string): Promise<Array<JobStorageFormat<Input, Output>>>;
saveProgress(id: unknown, progress: number, message: string, details: Record<string, any> | null): Promise<void>;
delete(id: unknown): Promise<void>;
deleteJobsByStatusAndAge(status: JobStatus, olderThanMs: number): Promise<void>;
setupDatabase(): Promise<void>;
subscribeToChanges(callback: (change: QueueChangePayload<Input, Output>) => void, options?: QueueSubscribeOptions): () => void;
}next(workerId) is the critical method. It atomically finds the next PENDING job whose run_after time has passed, sets its status to PROCESSING, increments run_attempts, records the workerId, and returns it. This atomic claim prevents two workers from processing the same job.
abort(id) sets a job's status to ABORTING. The worker detects this on its next checkForAbortingJobs pass and fires the abort controller.
saveProgress(id, progress, message, details) persists progress information without changing the job's status.
deleteJobsByStatusAndAge(status, olderThanMs) enables TTL-based cleanup of completed or failed jobs.
Jobs are stored in a flat format matching database column layouts:
type JobStorageFormat<Input, Output> = {
id?: unknown;
job_run_id?: string;
queue?: string;
input: Input;
output?: Output | null;
error?: string | null;
error_code?: string | null;
fingerprint?: string;
max_retries?: number;
status?: JobStatus;
run_after: string | null;
completed_at: string | null;
worker_id?: string | null;
progress?: number;
progress_message?: string;
// ...
};const JobStatus = {
PENDING: "PENDING",
PROCESSING: "PROCESSING",
COMPLETED: "COMPLETED",
ABORTING: "ABORTING",
FAILED: "FAILED",
DISABLED: "DISABLED",
} as const;Queue storage supports prefix columns for multi-tenant scenarios:
const storage = new SqliteQueueStorage(db, "jobs", {
prefixes: [
{ name: "user_id", type: "uuid" },
{ name: "project_id", type: "uuid" },
],
prefixValues: {
user_id: "abc-123",
project_id: "proj-456",
},
});Subscriptions can filter by prefix:
// Only this user's jobs
storage.subscribeToChanges(callback);
// All projects for this user
storage.subscribeToChanges(callback, {
prefixFilter: { user_id: "abc-123" },
});
// All jobs (admin view)
storage.subscribeToChanges(callback, { prefixFilter: {} });| Backend | Class | Runtime |
|---|---|---|
| In-Memory | InMemoryQueueStorage |
All |
| SQLite | SqliteQueueStorage |
Node, Bun |
| PostgreSQL | PostgresQueueStorage |
Node, Bun |
| Supabase | SupabaseQueueStorage |
All |
| IndexedDB | IndexedDbQueueStorage |
Browser |
| Telemetry | TelemetryQueueStorage |
All |
Extends ITabularStorage with vector similarity search capabilities. It stores vector embeddings alongside metadata and supports both pure vector search and hybrid (vector + full-text) search.
interface IVectorStorage<
Metadata extends Record<string, unknown> | undefined,
Schema extends DataPortSchemaObject,
Entity,
PrimaryKeyNames,
PrimaryKey,
InsertType,
> extends ITabularStorage<Schema, PrimaryKeyNames, Entity, PrimaryKey, InsertType> {
getVectorDimensions(): number;
similaritySearch(query: TypedArray, options?: VectorSearchOptions<Metadata>): Promise<(Entity & { score: number })[]>;
hybridSearch?(query: TypedArray, options: HybridSearchOptions<Metadata>): Promise<(Entity & { score: number })[]>;
}const results = await vectorStorage.similaritySearch(queryVector, {
topK: 10,
filter: { doc_id: "doc-123" },
scoreThreshold: 0.7,
});
for (const result of results) {
console.log(result.metadata.text, result.score);
}Combines vector similarity with full-text search for improved retrieval quality:
const results = await vectorStorage.hybridSearch(queryVector, {
textQuery: "machine learning transformers",
topK: 10,
vectorWeight: 0.7, // 70% vector, 30% text relevance
});Not all backends support hybrid search. Check with typeof storage.hybridSearch === "function" before calling.
interface VectorSearchOptions<Metadata> {
readonly topK?: number; // Max results (default varies by backend)
readonly filter?: Partial<Metadata>; // Metadata filter
readonly scoreThreshold?: number; // Minimum similarity score
}
interface HybridSearchOptions<Metadata> extends VectorSearchOptions<Metadata> {
readonly textQuery: string; // Full-text search query
readonly vectorWeight?: number; // Weight for vector vs text (0-1)
}The module provides helper functions for introspecting vector schemas:
import { getVectorProperty, getMetadataProperty } from "@workglow/storage";
const vectorColumn = getVectorProperty(schema); // e.g., "vector"
const metadataColumn = getMetadataProperty(schema); // e.g., "metadata"| Backend | Class | Runtime | Notes |
|---|---|---|---|
| In-Memory | InMemoryVectorStorage |
All | Brute-force cosine similarity |
| SQLite | SqliteVectorStorage |
Node, Bun | sqlite-vec extension |
| SQLite AI | SqliteAiVectorStorage |
Node, Bun | Optimized for AI workflows |
| PostgreSQL | PostgresVectorStorage |
Node, Bun | pgvector extension |
| IndexedDB | IndexedDbVectorStorage |
Browser | Brute-force in browser |
Storage instances can be registered in the global service registry and resolved at runtime by tasks through the input resolution system. Task schemas use format annotations to declare the kind of storage they need:
static inputSchema(): DataPortSchema {
return {
type: "object",
properties: {
storage: {
type: "string",
format: "storage:tabular",
title: "Storage",
},
},
} as const satisfies DataPortSchema;
}When a task runs, the framework's input resolver maps the string identifier to the actual storage instance from the registry. This decouples task definitions from concrete storage backends.
| Token | Type | Description |
|---|---|---|
QUEUE_STORAGE |
IQueueStorage<any, any> |
Default queue storage |
RATE_LIMITER_STORAGE |
IRateLimiterStorage |
Default rate limiter storage |
All non-in-memory backends require a setupDatabase() call before use. This creates tables, indices, and any required schema. For production deployments, this setup should be done through proper database migrations rather than calling setupDatabase() at runtime.
const storage = new SqliteTabularStorage(db, "users", UserSchema, UserPrimaryKey);
await storage.setupDatabase(); // Creates table and indices
// Now safe to use
await storage.put({ name: "Alice", email: "alice@example.com", created_at: new Date().toISOString() });put(key, value): Promise<void>-- Store a value.putBulk(items): Promise<void>-- Store multiple values.get(key): Promise<Value | undefined>-- Retrieve a value.delete(key): Promise<void>-- Delete a value.getAll(): Promise<Combined[] | undefined>-- Retrieve all entries.deleteAll(): Promise<void>-- Delete all entries.size(): Promise<number>-- Count entries.
put(value): Promise<Entity>-- Insert or update (returns entity with auto-generated fields).putBulk(values): Promise<Entity[]>-- Bulk insert/update.get(key): Promise<Entity | undefined>-- Get by primary key.delete(key): Promise<void>-- Delete by primary key.getAll(options?): Promise<Entity[] | undefined>-- Get all with optional ordering/limit.deleteAll(): Promise<void>-- Delete all entities.size(): Promise<number>-- Count entities.deleteSearch(criteria): Promise<void>-- Delete by search criteria.getBulk(offset, limit): Promise<Entity[] | undefined>-- Paginated fetch.records(pageSize?): AsyncGenerator<Entity>-- Iterate individual records.pages(pageSize?): AsyncGenerator<Entity[]>-- Iterate pages.query(criteria, options?): Promise<Entity[] | undefined>-- Search with criteria and options.subscribeToChanges(callback, options?): () => void-- Subscribe to change notifications.setupDatabase(): Promise<void>-- Initialize the database schema.destroy(): void-- Free resources.
add(job): Promise<unknown>-- Add a job, returns its ID.get(id): Promise<JobStorageFormat | undefined>-- Get a job by ID.next(workerId): Promise<JobStorageFormat | undefined>-- Atomically claim the next pending job.peek(status?, num?): Promise<JobStorageFormat[]>-- Peek at jobs without claiming.size(status?): Promise<number>-- Count jobs by status.complete(job): Promise<void>-- Update a job's final state.abort(id): Promise<void>-- Mark a job for abortion.getByRunId(runId): Promise<JobStorageFormat[]>-- Get jobs by run ID.saveProgress(id, progress, message, details): Promise<void>-- Update progress.delete(id): Promise<void>-- Delete a specific job.deleteJobsByStatusAndAge(status, olderThanMs): Promise<void>-- TTL cleanup.deleteAll(): Promise<void>-- Delete all jobs.setupDatabase(): Promise<void>-- Initialize schema.subscribeToChanges(callback, options?): () => void-- Subscribe to changes.
Inherits all ITabularStorage methods, plus:
getVectorDimensions(): number-- Get the configured vector dimension.similaritySearch(query, options?): Promise<(Entity & { score: number })[]>-- Find similar vectors.hybridSearch?(query, options): Promise<(Entity & { score: number })[]>-- Combined vector + text search.