Skip to content

Latest commit

 

History

History
511 lines (407 loc) · 18.7 KB

File metadata and controls

511 lines (407 loc) · 18.7 KB

Storage Abstraction Layer

Overview

The Workglow storage layer (@workglow/storage) provides four unified interfaces -- IKvStorage, ITabularStorage, IQueueStorage, and IVectorStorage -- that abstract over multiple persistence backends. Application code programs against these interfaces, and the concrete backend is selected at configuration time. This design allows the same pipeline to run with in-memory storage during development, SQLite for single-machine deployments, PostgreSQL for production clusters, IndexedDB in the browser, and Supabase for managed cloud infrastructure.

Every storage interface is event-driven, schema-typed, and backend-agnostic. The storage layer also integrates with Workglow's service registry for dependency injection and with the task system's input resolution mechanism for runtime schema-based lookups.

IKvStorage -- Key-Value Storage

The simplest storage interface. It maps typed keys to typed values, similar to a Map but with persistence, events, and bulk operations.

interface IKvStorage<
  Key extends string | number = string,
  Value extends any = any,
  Combined = { key: Key; value: Value },
> {
  put(key: Key, value: Value): Promise<void>;
  putBulk(items: Array<{ key: Key; value: Value }>): Promise<void>;
  get(key: Key): Promise<Value | undefined>;
  delete(key: Key): Promise<void>;
  getAll(): Promise<Combined[] | undefined>;
  deleteAll(): Promise<void>;
  size(): Promise<number>;
  getObjectAsIdString(object: JSONValue): Promise<string>;

  // Event methods
  on<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
  off<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
  emit<Event extends KvEventName>(name: Event, ...args: ...): void;
  once<Event extends KvEventName>(name: Event, fn: KvEventListener<...>): void;
  waitOn<Event extends KvEventName>(name: Event): Promise<...>;
}

Events

Event Parameters Description
put (key, value) A value was stored
get (key, value | undefined) A value was retrieved
getAll (results | undefined) All values were retrieved
delete (key) A value was deleted
deleteall () All values were deleted

Available Backends

Backend Class Runtime Notes
In-Memory InMemoryKvStorage All Fast, non-persistent
SQLite SqliteKvStorage Node, Bun File-based persistence
PostgreSQL PostgresKvStorage Node, Bun Network-based
Supabase SupabaseKvStorage All Cloud PostgreSQL
IndexedDB IndexedDbKvStorage Browser Browser-native
Filesystem (JSON) FsFolderJsonKvStorage Node, Bun One JSON file per key
Filesystem FsFolderKvStorage Node, Bun Raw file storage
Via Tabular KvViaTabularStorage All Adapts tabular storage as KV
Telemetry TelemetryKvStorage All Wraps another KV with tracing

Usage Example

import { InMemoryKvStorage } from "@workglow/storage";

const cache = new InMemoryKvStorage();
await cache.put("user:123", { name: "Alice", role: "admin" });

const user = await cache.get("user:123");
// { name: "Alice", role: "admin" }

cache.on("put", (key, value) => {
  console.log(`Cached: ${key}`);
});

ITabularStorage -- Tabular Storage

The workhorse interface for structured data with typed schemas, compound primary keys, querying, pagination, change subscriptions, and auto-generated keys. It is the foundation for document storage, knowledge base metadata, model registries, and more.

interface ITabularStorage<
  Schema extends DataPortSchemaObject,
  PrimaryKeyNames extends ReadonlyArray<keyof Schema["properties"]>,
  Entity = FromSchema<Schema, TypedArraySchemaOptions>,
  PrimaryKey = SimplifyPrimaryKey<Entity, PrimaryKeyNames>,
  InsertType = InsertEntity<Entity, AutoGeneratedKeys<Schema>>,
> {
  put(value: InsertType): Promise<Entity>;
  putBulk(values: InsertType[]): Promise<Entity[]>;
  get(key: PrimaryKey): Promise<Entity | undefined>;
  delete(key: PrimaryKey | Entity): Promise<void>;
  getAll(options?: QueryOptions<Entity>): Promise<Entity[] | undefined>;
  deleteAll(): Promise<void>;
  size(): Promise<number>;
  deleteSearch(criteria: DeleteSearchCriteria<Entity>): Promise<void>;
  getBulk(offset: number, limit: number): Promise<Entity[] | undefined>;
  records(pageSize?: number): AsyncGenerator<Entity, void, undefined>;
  pages(pageSize?: number): AsyncGenerator<Entity[], void, undefined>;
  query(criteria: SearchCriteria<Entity>, options?: QueryOptions<Entity>): Promise<Entity[] | undefined>;
  subscribeToChanges(callback: (change: TabularChangePayload<Entity>) => void, options?: TabularSubscribeOptions): () => void;
  setupDatabase(): Promise<void>;
  destroy(): void;
}

Schema Integration

Schemas are JSON Schema objects (DataPortSchemaObject) that define both the shape of the data and metadata used by the storage layer:

import type { DataPortSchemaObject } from "@workglow/util/schema";

const UserSchema = {
  type: "object",
  properties: {
    user_id: { type: "string", "x-auto-generated": true },
    name: { type: "string" },
    email: { type: "string" },
    created_at: { type: "string" },
  },
  required: ["user_id", "name", "email", "created_at"],
  additionalProperties: false,
} as const satisfies DataPortSchemaObject;

const UserPrimaryKey = ["user_id"] as const;

The x-auto-generated: true annotation marks fields whose values are assigned by the storage backend. For integer types, values auto-increment. For string types, a UUID is generated. When inserting, these fields are optional in the InsertType.

Querying

The query method supports equality and comparison operators:

// Equality match
const results = await storage.query({ category: "electronics" });

// With comparison operator
const recent = await storage.query({
  created_at: { value: "2025-01-01", operator: ">=" },
});

// Multiple criteria (AND logic)
const filtered = await storage.query(
  {
    category: "electronics",
    price: { value: 100, operator: "<" },
  },
  {
    orderBy: [{ column: "price", direction: "ASC" }],
    limit: 10,
    offset: 0,
  }
);

Pagination

Two async generator methods support efficient pagination over large datasets:

// Iterate individual records
for await (const record of storage.records(100)) {
  process(record);
}

// Iterate pages
for await (const page of storage.pages(100)) {
  await processBatch(page);
}

Change Subscriptions

Subscribe to INSERT, UPDATE, and DELETE events -- including changes from other processes:

const unsubscribe = storage.subscribeToChanges((change) => {
  console.log(change.type); // "INSERT" | "UPDATE" | "DELETE"
  console.log(change.old);  // previous entity (for UPDATE/DELETE)
  console.log(change.new);  // new entity (for INSERT/UPDATE)
}, { pollingIntervalMs: 1000 });

// Later: stop listening
unsubscribe();

Events

Event Parameters Description
put (entity) An entity was inserted or updated
get (key, entity | undefined) An entity was retrieved
query (criteria, entities | undefined) A query was executed
delete (key) An entity was deleted
clearall () All entities were deleted

Available Backends

Backend Class Runtime Notes
In-Memory InMemoryTabularStorage All Fast, non-persistent
Shared In-Memory SharedInMemoryTabularStorage All Shared between instances
SQLite SqliteTabularStorage Node, Bun File-based persistence
PostgreSQL PostgresTabularStorage Node, Bun Network-based
Supabase SupabaseTabularStorage All Cloud PostgreSQL
IndexedDB IndexedDbTabularStorage Browser Browser-native
Filesystem FsFolderTabularStorage Node, Bun Directory-based
HuggingFace HuggingFaceTabularStorage All HuggingFace datasets
Cached CachedTabularStorage All Read-through cache wrapper
Telemetry TelemetryTabularStorage All Wraps with tracing

IQueueStorage -- Queue Storage

Specialized storage for job queue persistence. It extends beyond basic CRUD to support atomic job claiming, abort signaling, progress tracking, and TTL-based cleanup.

interface IQueueStorage<Input, Output> {
  add(job: JobStorageFormat<Input, Output>): Promise<unknown>;
  get(id: unknown): Promise<JobStorageFormat<Input, Output> | undefined>;
  next(workerId: string): Promise<JobStorageFormat<Input, Output> | undefined>;
  peek(status?: JobStatus, num?: number): Promise<Array<JobStorageFormat<Input, Output>>>;
  size(status?: JobStatus): Promise<number>;
  complete(job: JobStorageFormat<Input, Output>): Promise<void>;
  deleteAll(): Promise<void>;
  outputForInput(input: Input): Promise<Output | null>;
  abort(id: unknown): Promise<void>;
  getByRunId(runId: string): Promise<Array<JobStorageFormat<Input, Output>>>;
  saveProgress(id: unknown, progress: number, message: string, details: Record<string, any> | null): Promise<void>;
  delete(id: unknown): Promise<void>;
  deleteJobsByStatusAndAge(status: JobStatus, olderThanMs: number): Promise<void>;
  setupDatabase(): Promise<void>;
  subscribeToChanges(callback: (change: QueueChangePayload<Input, Output>) => void, options?: QueueSubscribeOptions): () => void;
}

Key Operations

next(workerId) is the critical method. It atomically finds the next PENDING job whose run_after time has passed, sets its status to PROCESSING, increments run_attempts, records the workerId, and returns it. This atomic claim prevents two workers from processing the same job.

abort(id) sets a job's status to ABORTING. The worker detects this on its next checkForAbortingJobs pass and fires the abort controller.

saveProgress(id, progress, message, details) persists progress information without changing the job's status.

deleteJobsByStatusAndAge(status, olderThanMs) enables TTL-based cleanup of completed or failed jobs.

JobStorageFormat

Jobs are stored in a flat format matching database column layouts:

type JobStorageFormat<Input, Output> = {
  id?: unknown;
  job_run_id?: string;
  queue?: string;
  input: Input;
  output?: Output | null;
  error?: string | null;
  error_code?: string | null;
  fingerprint?: string;
  max_retries?: number;
  status?: JobStatus;
  run_after: string | null;
  completed_at: string | null;
  worker_id?: string | null;
  progress?: number;
  progress_message?: string;
  // ...
};

JobStatus

const JobStatus = {
  PENDING: "PENDING",
  PROCESSING: "PROCESSING",
  COMPLETED: "COMPLETED",
  ABORTING: "ABORTING",
  FAILED: "FAILED",
  DISABLED: "DISABLED",
} as const;

Prefix Filtering

Queue storage supports prefix columns for multi-tenant scenarios:

const storage = new SqliteQueueStorage(db, "jobs", {
  prefixes: [
    { name: "user_id", type: "uuid" },
    { name: "project_id", type: "uuid" },
  ],
  prefixValues: {
    user_id: "abc-123",
    project_id: "proj-456",
  },
});

Subscriptions can filter by prefix:

// Only this user's jobs
storage.subscribeToChanges(callback);

// All projects for this user
storage.subscribeToChanges(callback, {
  prefixFilter: { user_id: "abc-123" },
});

// All jobs (admin view)
storage.subscribeToChanges(callback, { prefixFilter: {} });

Available Backends

Backend Class Runtime
In-Memory InMemoryQueueStorage All
SQLite SqliteQueueStorage Node, Bun
PostgreSQL PostgresQueueStorage Node, Bun
Supabase SupabaseQueueStorage All
IndexedDB IndexedDbQueueStorage Browser
Telemetry TelemetryQueueStorage All

IVectorStorage -- Vector Storage

Extends ITabularStorage with vector similarity search capabilities. It stores vector embeddings alongside metadata and supports both pure vector search and hybrid (vector + full-text) search.

interface IVectorStorage<
  Metadata extends Record<string, unknown> | undefined,
  Schema extends DataPortSchemaObject,
  Entity,
  PrimaryKeyNames,
  PrimaryKey,
  InsertType,
> extends ITabularStorage<Schema, PrimaryKeyNames, Entity, PrimaryKey, InsertType> {
  getVectorDimensions(): number;
  similaritySearch(query: TypedArray, options?: VectorSearchOptions<Metadata>): Promise<(Entity & { score: number })[]>;
  hybridSearch?(query: TypedArray, options: HybridSearchOptions<Metadata>): Promise<(Entity & { score: number })[]>;
}

Similarity Search

const results = await vectorStorage.similaritySearch(queryVector, {
  topK: 10,
  filter: { doc_id: "doc-123" },
  scoreThreshold: 0.7,
});

for (const result of results) {
  console.log(result.metadata.text, result.score);
}

Hybrid Search

Combines vector similarity with full-text search for improved retrieval quality:

const results = await vectorStorage.hybridSearch(queryVector, {
  textQuery: "machine learning transformers",
  topK: 10,
  vectorWeight: 0.7, // 70% vector, 30% text relevance
});

Not all backends support hybrid search. Check with typeof storage.hybridSearch === "function" before calling.

Search Options

interface VectorSearchOptions<Metadata> {
  readonly topK?: number;           // Max results (default varies by backend)
  readonly filter?: Partial<Metadata>;  // Metadata filter
  readonly scoreThreshold?: number;     // Minimum similarity score
}

interface HybridSearchOptions<Metadata> extends VectorSearchOptions<Metadata> {
  readonly textQuery: string;       // Full-text search query
  readonly vectorWeight?: number;   // Weight for vector vs text (0-1)
}

Schema Helpers

The module provides helper functions for introspecting vector schemas:

import { getVectorProperty, getMetadataProperty } from "@workglow/storage";

const vectorColumn = getVectorProperty(schema);   // e.g., "vector"
const metadataColumn = getMetadataProperty(schema); // e.g., "metadata"

Available Backends

Backend Class Runtime Notes
In-Memory InMemoryVectorStorage All Brute-force cosine similarity
SQLite SqliteVectorStorage Node, Bun sqlite-vec extension
SQLite AI SqliteAiVectorStorage Node, Bun Optimized for AI workflows
PostgreSQL PostgresVectorStorage Node, Bun pgvector extension
IndexedDB IndexedDbVectorStorage Browser Brute-force in browser

Registry and Input Resolution

Storage instances can be registered in the global service registry and resolved at runtime by tasks through the input resolution system. Task schemas use format annotations to declare the kind of storage they need:

static inputSchema(): DataPortSchema {
  return {
    type: "object",
    properties: {
      storage: {
        type: "string",
        format: "storage:tabular",
        title: "Storage",
      },
    },
  } as const satisfies DataPortSchema;
}

When a task runs, the framework's input resolver maps the string identifier to the actual storage instance from the registry. This decouples task definitions from concrete storage backends.

Service Tokens

Token Type Description
QUEUE_STORAGE IQueueStorage<any, any> Default queue storage
RATE_LIMITER_STORAGE IRateLimiterStorage Default rate limiter storage

Database Setup

All non-in-memory backends require a setupDatabase() call before use. This creates tables, indices, and any required schema. For production deployments, this setup should be done through proper database migrations rather than calling setupDatabase() at runtime.

const storage = new SqliteTabularStorage(db, "users", UserSchema, UserPrimaryKey);
await storage.setupDatabase(); // Creates table and indices

// Now safe to use
await storage.put({ name: "Alice", email: "alice@example.com", created_at: new Date().toISOString() });

API Reference

IKvStorage

  • put(key, value): Promise<void> -- Store a value.
  • putBulk(items): Promise<void> -- Store multiple values.
  • get(key): Promise<Value | undefined> -- Retrieve a value.
  • delete(key): Promise<void> -- Delete a value.
  • getAll(): Promise<Combined[] | undefined> -- Retrieve all entries.
  • deleteAll(): Promise<void> -- Delete all entries.
  • size(): Promise<number> -- Count entries.

ITabularStorage

  • put(value): Promise<Entity> -- Insert or update (returns entity with auto-generated fields).
  • putBulk(values): Promise<Entity[]> -- Bulk insert/update.
  • get(key): Promise<Entity | undefined> -- Get by primary key.
  • delete(key): Promise<void> -- Delete by primary key.
  • getAll(options?): Promise<Entity[] | undefined> -- Get all with optional ordering/limit.
  • deleteAll(): Promise<void> -- Delete all entities.
  • size(): Promise<number> -- Count entities.
  • deleteSearch(criteria): Promise<void> -- Delete by search criteria.
  • getBulk(offset, limit): Promise<Entity[] | undefined> -- Paginated fetch.
  • records(pageSize?): AsyncGenerator<Entity> -- Iterate individual records.
  • pages(pageSize?): AsyncGenerator<Entity[]> -- Iterate pages.
  • query(criteria, options?): Promise<Entity[] | undefined> -- Search with criteria and options.
  • subscribeToChanges(callback, options?): () => void -- Subscribe to change notifications.
  • setupDatabase(): Promise<void> -- Initialize the database schema.
  • destroy(): void -- Free resources.

IQueueStorage

  • add(job): Promise<unknown> -- Add a job, returns its ID.
  • get(id): Promise<JobStorageFormat | undefined> -- Get a job by ID.
  • next(workerId): Promise<JobStorageFormat | undefined> -- Atomically claim the next pending job.
  • peek(status?, num?): Promise<JobStorageFormat[]> -- Peek at jobs without claiming.
  • size(status?): Promise<number> -- Count jobs by status.
  • complete(job): Promise<void> -- Update a job's final state.
  • abort(id): Promise<void> -- Mark a job for abortion.
  • getByRunId(runId): Promise<JobStorageFormat[]> -- Get jobs by run ID.
  • saveProgress(id, progress, message, details): Promise<void> -- Update progress.
  • delete(id): Promise<void> -- Delete a specific job.
  • deleteJobsByStatusAndAge(status, olderThanMs): Promise<void> -- TTL cleanup.
  • deleteAll(): Promise<void> -- Delete all jobs.
  • setupDatabase(): Promise<void> -- Initialize schema.
  • subscribeToChanges(callback, options?): () => void -- Subscribe to changes.

IVectorStorage

Inherits all ITabularStorage methods, plus:

  • getVectorDimensions(): number -- Get the configured vector dimension.
  • similaritySearch(query, options?): Promise<(Entity & { score: number })[]> -- Find similar vectors.
  • hybridSearch?(query, options): Promise<(Entity & { score: number })[]> -- Combined vector + text search.