Modular storage solutions for Workglow.AI platform with multiple backend implementations. Provides consistent interfaces for key-value storage, tabular data storage, and job queue persistence.
- Quick Start
- Installation
- Core Concepts
- Storage Types
- Environment-Specific Usage
- Advanced Features
- API Reference
- Examples
- Testing
- License
// Key-Value Storage (simple data)
import { InMemoryKvStorage } from "@workglow/storage";
const kvStore = new InMemoryKvStorage<string, { name: string; age: number }>();
await kvStore.put("user:123", { name: "Alice", age: 30 });
const kvUser = await kvStore.get("user:123"); // { name: "Alice", age: 30 }// Tabular Storage (structured data with schemas)
import { InMemoryTabularStorage } from "@workglow/storage";
import { JsonSchema } from "@workglow/util";
const userSchema = {
type: "object",
properties: {
id: { type: "string" },
name: { type: "string" },
email: { type: "string" },
age: { type: "number" },
},
required: ["id", "name", "email", "age"],
additionalProperties: false,
} as const satisfies JsonSchema;
const userRepo = new InMemoryTabularStorage<typeof userSchema, ["id"]>(
userSchema,
["id"], // primary key
["email"] // additional indexes
);
await userRepo.put({ id: "123", name: "Alice", email: "alice@example.com", age: 30 });
const user = await userRepo.get({ id: "123" });# Using bun (recommended)
bun install @workglow/storage
# Using npm
npm install @workglow/storage
# Using yarn
yarn add @workglow/storageAll storage implementations are fully typed using TypeScript and JSON Schema for runtime validation:
import { JsonSchema, FromSchema } from "@workglow/util";
// Define your data structure
const ProductSchema = {
type: "object",
properties: {
id: { type: "string" },
name: { type: "string" },
price: { type: "number" },
category: { type: "string" },
inStock: { type: "boolean" },
},
required: ["id", "name", "price", "category", "inStock"],
additionalProperties: false,
} as const satisfies JsonSchema;
// TypeScript automatically infers:
// Entity = FromSchema<typeof ProductSchema>
// PrimaryKey = { id: string }| Storage Type | Node.js | Bun | Browser | Persistence |
|---|---|---|---|---|
| InMemory | ✅ | ✅ | ✅ | ❌ |
| IndexedDB | ❌ | ❌ | ✅ | ✅ |
| SQLite | ✅ | ✅ | ❌ | ✅ |
| PostgreSQL | ✅ | ✅ | ❌ | ✅ |
| Supabase | ✅ | ✅ | ✅ | ✅ |
| FileSystem | ✅ | ✅ | ❌ | ✅ |
The package uses conditional exports, so importing from @workglow/storage automatically selects the right build for your runtime (browser, Node.js, or Bun).
// Import from the top-level package; it resolves to the correct target per environment
import { InMemoryKvStorage, SqliteTabularStorage } from "@workglow/storage";Before you open SQLite with a file path or construct new Sqlite.Database(...), call await Sqlite.init() once per runtime (Node.js, Bun, or browser). Export is available from workglow and @workglow/storage/sqlite. The call is idempotent. On the browser it loads the SQLite WASM build; on Node.js it loads better-sqlite3; on Bun it resolves bun:sqlite.
Simple key-value storage for unstructured or semi-structured data.
import { InMemoryKvStorage, FsFolderJsonKvRepository } from "@workglow/storage";
// In-memory (for testing/caching)
const cache = new InMemoryKvStorage<string, any>();
await cache.put("config", { theme: "dark", language: "en" });
// File-based JSON (persistent)
const settings = new FsFolderJsonKvRepository("./data/settings");
await settings.put("user:preferences", { notifications: true });// Browser (using IndexedDB)
import { IndexedDbKvRepository } from "@workglow/storage";
const browserStore = new IndexedDbKvRepository("my-app-storage");
// Node.js/Bun (using SQLite)
import { SqliteKvRepository } from "@workglow/storage";
import { Sqlite } from "@workglow/storage/sqlite";
await Sqlite.init();
// Pass a file path or a Sqlite.Database instance (see @workglow/storage/sqlite)
const sqliteStore = new SqliteKvRepository("./data.db", "config_table");
// PostgreSQL (Node.js/Bun)
import { PostgresKvRepository } from "@workglow/storage";
import type { Pool } from "@workglow/storage/postgres";
const pool = new Pool({ connectionString: "postgresql://..." });
const pgStore = new PostgresKvRepository(pool, "settings");
// Supabase (Node.js/Bun)
import { SupabaseKvRepository } from "@workglow/storage";
import { createClient } from "@supabase/supabase-js";
const supabase = createClient("https://your-project.supabase.co", "your-anon-key");
const supabaseStore = new SupabaseKvRepository(supabase, "settings");const store = new InMemoryKvStorage<string, { name: string; score: number }>();
// Bulk insert
await store.putBulk([
{ key: "player1", value: { name: "Alice", score: 100 } },
{ key: "player2", value: { name: "Bob", score: 85 } },
]);
// Get all data
const allPlayers = await store.getAll();
// Result: [{ key: "player1", value: { name: "Alice", score: 100 } }, ...]
// Get size
const count = await store.size(); // 2const store = new InMemoryKvStorage<string, any>();
// Listen to storage events
store.on("put", (key, value) => {
console.log(`Stored: ${key} = ${JSON.stringify(value)}`);
});
store.on("get", (key, value) => {
console.log(`Retrieved: ${key} = ${value ? "found" : "not found"}`);
});
await store.put("test", { data: "example" }); // Triggers 'put' event
await store.get("test"); // Triggers 'get' eventStructured storage with schemas, primary keys, and indexing for complex data relationships.
import { JsonSchema } from "@workglow/util";
import { InMemoryTabularStorage } from "@workglow/storage";
// Define your entity schema
const UserSchema = {
type: "object",
properties: {
id: { type: "string" },
email: { type: "string" },
name: { type: "string" },
age: { type: "number" },
department: { type: "string" },
createdAt: { type: "string" },
},
required: ["id", "email", "name", "age", "department", "createdAt"],
additionalProperties: false,
} as const satisfies JsonSchema;
// Create repository with primary key and indexes
const userRepo = new InMemoryTabularStorage<typeof UserSchema, ["id"]>(
UserSchema,
["id"], // Primary key (can be compound: ["dept", "id"])
["email", "department", ["department", "age"]] // Indexes for fast lookups
);TabularStorage supports automatic ID generation by marking schema properties with x-auto-generated: true:
const UserSchema = {
type: "object",
properties: {
id: { type: "integer", "x-auto-generated": true }, // Auto-increment
name: { type: "string" },
email: { type: "string" },
},
required: ["id", "name", "email"],
} as const;
const storage = new PostgresTabularStorage(db, "users", UserSchema, ["id"] as const);
// Insert without providing ID - database generates it
const user = await storage.put({ name: "Alice", email: "alice@example.com" });
console.log(user.id); // 1 (auto-generated)Generation Strategy (inferred from type):
type: "integer"→ Auto-increment (SERIAL, INTEGER PRIMARY KEY, counter)type: "string"→ UUID viauuid4()from@workglow/util
Configuration Options:
new PostgresTabularStorage(
db,
"users",
UserSchema,
["id"],
[],
{ clientProvidedKeys: "if-missing" } // "never" | "if-missing" | "always"
);"if-missing"(default): Use client value if provided, generate otherwise"never": Always generate (most secure)"always": Require client value (for testing)
See Tabular Storage README for detailed documentation.
// Create
await userRepo.put({
id: "user_123",
email: "alice@company.com",
name: "Alice Johnson",
age: 28,
department: "Engineering",
createdAt: new Date().toISOString(),
});
// Read by primary key
const user = await userRepo.get({ id: "user_123" });
// Update (put with same primary key)
await userRepo.put({
...user!,
age: 29, // Birthday!
});
// Delete
await userRepo.delete({ id: "user_123" });// Bulk insert
await userRepo.putBulk([
{
id: "1",
email: "alice@co.com",
name: "Alice",
age: 28,
department: "Engineering",
createdAt: "2024-01-01",
},
{
id: "2",
email: "bob@co.com",
name: "Bob",
age: 32,
department: "Sales",
createdAt: "2024-01-02",
},
{
id: "3",
email: "carol@co.com",
name: "Carol",
age: 26,
department: "Engineering",
createdAt: "2024-01-03",
},
]);
// Get all records
const allUsers = await userRepo.getAll();
// Get repository size
const userCount = await userRepo.size();// Search by partial match (uses indexes when available)
const engineeringUsers = await userRepo.search({ department: "Engineering" });
const adultUsers = await userRepo.search({ age: 25 }); // Exact match
// Delete by search criteria (supports multiple columns)
await userRepo.deleteSearch({ department: "Sales" }); // Equality
await userRepo.deleteSearch({ age: { value: 65, operator: ">=" } }); // Delete users 65 and older
// Multiple criteria (AND logic)
await userRepo.deleteSearch({
department: "Sales",
age: { value: 30, operator: "<" },
}); // Delete young Sales employees// SQLite (Node.js/Bun)
import { SqliteTabularStorage } from "@workglow/storage";
import { Sqlite } from "@workglow/storage/sqlite";
await Sqlite.init();
const sqliteUsers = new SqliteTabularStorage<typeof UserSchema, ["id"]>(
"./users.db",
"users",
UserSchema,
["id"],
["email"]
);
// PostgreSQL (Node.js/Bun)
import { PostgresTabularStorage } from "@workglow/storage";
import type { Pool } from "@workglow/storage/postgres";
const pool = new Pool({ connectionString: "postgresql://..." });
const pgUsers = new PostgresTabularStorage<typeof UserSchema, ["id"]>(
pool,
"users",
UserSchema,
["id"],
["email"]
);
// Supabase (Node.js/Bun)
import { SupabaseTabularStorage } from "@workglow/storage";
import { createClient } from "@supabase/supabase-js";
const supabase = createClient("https://your-project.supabase.co", "your-anon-key");
const supabaseUsers = new SupabaseTabularStorage<typeof UserSchema, ["id"]>(
supabase,
"users",
UserSchema,
["id"],
["email"]
);
// IndexedDB (Browser)
import { IndexedDbTabularStorage } from "@workglow/storage";
const browserUsers = new IndexedDbTabularStorage<typeof UserSchema, ["id"]>(
"users",
UserSchema,
["id"],
["email"]
);
// File-based (Node.js/Bun)
import { FsFolderTabularStorage } from "@workglow/storage";
const fileUsers = new FsFolderTabularStorage<typeof UserSchema, ["id"]>(
"./data/users",
UserSchema,
["id"],
["email"]
);Persistent job queue storage for background processing and task management.
Note: Queue storage is primarily used internally by the job queue system. Direct usage is for advanced scenarios.
import { InMemoryQueueStorage, JobStatus } from "@workglow/storage";
// Define job input/output types
type ProcessingInput = { text: string; options: any };
type ProcessingOutput = { result: string; metadata: any };
const jobQueue = new InMemoryQueueStorage<ProcessingInput, ProcessingOutput>();
// Add job to queue
const jobId = await jobQueue.add({
input: { text: "Hello world", options: { uppercase: true } },
run_after: null, // Run immediately
max_retries: 3,
});
// Get next job for processing
const job = await jobQueue.next();
if (job) {
// Process the job...
const result = { result: "HELLO WORLD", metadata: { processed: true } };
// Mark as complete
await jobQueue.complete({
...job,
output: result,
status: JobStatus.COMPLETED,
});
}// Check queue status
const pendingCount = await jobQueue.size(JobStatus.PENDING);
const processingCount = await jobQueue.size(JobStatus.PROCESSING);
// Peek at jobs without removing them
const nextJobs = await jobQueue.peek(JobStatus.PENDING, 5);
// Progress tracking
await jobQueue.saveProgress(jobId, 50, "Processing...", { step: 1 });
// Handle job failures
await jobQueue.abort(jobId);
// Cleanup old completed jobs
await jobQueue.deleteJobsByStatusAndAge(JobStatus.COMPLETED, 24 * 60 * 60 * 1000); // 24 hoursimport {
IndexedDbKvRepository,
IndexedDbTabularStorage,
IndexedDbQueueStorage,
SupabaseKvRepository,
SupabaseTabularStorage,
SupabaseQueueStorage,
} from "@workglow/storage";
import { createClient } from "@supabase/supabase-js";
// Local browser storage with IndexedDB
const settings = new IndexedDbKvRepository("app-settings");
const userData = new IndexedDbTabularStorage("users", UserSchema, ["id"]);
const jobQueue = new IndexedDbQueueStorage<any, any>("background-jobs");
// Or use Supabase for cloud storage from the browser
const supabase = createClient("https://your-project.supabase.co", "your-anon-key");
const cloudSettings = new SupabaseKvRepository(supabase, "app-settings");
const cloudUserData = new SupabaseTabularStorage(supabase, "users", UserSchema, ["id"]);
const cloudJobQueue = new SupabaseQueueStorage(supabase, "background-jobs");import {
SqliteKvRepository,
PostgresTabularStorage,
FsFolderJsonKvRepository,
} from "@workglow/storage";
// Mix and match storage backends
const cache = new FsFolderJsonKvRepository("./cache");
const users = new PostgresTabularStorage(pool, "users", UserSchema, ["id"]);// Bun has access to all implementations
import {
SqliteTabularStorage,
FsFolderJsonKvRepository,
PostgresQueueStorage,
SupabaseTabularStorage,
} from "@workglow/storage";
import { Sqlite } from "@workglow/storage/sqlite";
import { createClient } from "@supabase/supabase-js";
await Sqlite.init();
const db = new Sqlite.Database("./app.db");
const data = new SqliteTabularStorage(db, "items", ItemSchema, ["id"]);
// Or use Supabase for cloud storage
const supabase = createClient("https://your-project.supabase.co", "your-anon-key");
const cloudData = new SupabaseTabularStorage(supabase, "items", ItemSchema, ["id"]);Repositories can be registered globally by ID, allowing tasks to reference them by name rather than passing direct instances. This is useful for configuring repositories once at application startup and referencing them throughout your task graphs.
import {
registerTabularStorage,
getTabularStorage,
InMemoryTabularStorage,
} from "@workglow/storage";
// Define your schema
const userSchema = {
type: "object",
properties: {
id: { type: "string" },
name: { type: "string" },
email: { type: "string" },
},
required: ["id", "name", "email"],
additionalProperties: false,
} as const;
// Create and register a repository
const userRepo = new InMemoryTabularStorage(userSchema, ["id"] as const);
registerTabularStorage("users", userRepo);
// Later, retrieve the repository by ID
const repo = getTabularStorage("users");When using repositories with tasks, you can pass either the repository ID or a direct instance. The TaskRunner automatically resolves string IDs using the registry.
import { TypeTabularStorage } from "@workglow/storage";
// In your task's input schema, use TypeTabularStorage
static inputSchema() {
return {
type: "object",
properties: {
dataSource: TypeTabularStorage({
title: "User Repository",
description: "Repository containing user records",
}),
},
required: ["dataSource"],
};
}
// Both approaches work:
await task.run({ dataSource: "users" }); // Resolved from registry
await task.run({ dataSource: userRepoInstance }); // Direct instanceThe package provides schema helper functions for defining repository inputs with proper format annotations:
import { TypeTabularStorage } from "@workglow/storage";
// Tabular repository (format: "storage:tabular")
const tabularSchema = TypeTabularStorage({
title: "Data Source",
description: "Tabular data repository",
});All storage implementations support event emission for monitoring and reactive programming:
const store = new InMemoryTabularStorage(UserSchema, ["id"]);
// Monitor all operations
store.on("put", (entity) => console.log("User created/updated:", entity));
store.on("delete", (key) => console.log("User deleted:", key));
store.on("get", (key, entity) => console.log("User accessed:", entity ? "found" : "not found"));
// Wait for specific events
const [entity] = await store.waitOn("put"); // Waits for next put operationimport { JsonSchema } from "@workglow/util";
const OrderLineSchema = {
type: "object",
properties: {
orderId: { type: "string" },
lineNumber: { type: "number" },
productId: { type: "string" },
quantity: { type: "number" },
price: { type: "number" },
},
required: ["orderId", "lineNumber", "productId", "quantity", "price"],
additionalProperties: false,
} as const satisfies JsonSchema;
const orderLines = new InMemoryTabularStorage<typeof OrderLineSchema, ["orderId", "lineNumber"]>(
OrderLineSchema,
["orderId", "lineNumber"], // Compound primary key
["productId"] // Additional index
);
// Use compound keys
await orderLines.put({
orderId: "ORD-123",
lineNumber: 1,
productId: "PROD-A",
quantity: 2,
price: 19.99,
});
const line = await orderLines.get({ orderId: "ORD-123", lineNumber: 1 });import { FsFolderKvRepository } from "@workglow/storage";
import { JsonSchema } from "@workglow/util";
// Control how keys map to file paths and value encoding via schemas
const keySchema = { type: "string" } as const satisfies JsonSchema;
const valueSchema = { type: "string" } as const satisfies JsonSchema;
const files = new FsFolderKvRepository<string, string>(
"./data/files",
(key) => `${key}.txt`,
keySchema,
valueSchema
);
await files.put("note-1", "Hello world");Core interface for key-value storage:
interface IKvStorage<Key, Value> {
// Core operations
put(key: Key, value: Value): Promise<void>;
putBulk(items: Array<{ key: Key; value: Value }>): Promise<void>;
get(key: Key): Promise<Value | undefined>;
delete(key: Key): Promise<void>;
getAll(): Promise<Array<{ key: Key; value: Value }> | undefined>;
deleteAll(): Promise<void>;
size(): Promise<number>;
// Event handling
on(event: "put" | "get" | "getAll" | "delete" | "deleteall", callback: Function): void;
off(event: string, callback: Function): void;
once(event: string, callback: Function): void;
waitOn(event: string): Promise<any[]>;
emit(event: string, ...args: any[]): void;
}Core interface for tabular storage:
interface ITabularStorage<Schema, PrimaryKeyNames, Entity, PrimaryKey, Value> {
// Core operations
put(entity: Entity): Promise<void>;
putBulk(entities: Entity[]): Promise<void>;
get(key: PrimaryKey): Promise<Entity | undefined>;
delete(key: PrimaryKey | Entity): Promise<void>;
getAll(): Promise<Entity[] | undefined>;
deleteAll(): Promise<void>;
size(): Promise<number>;
// Search operations
search(criteria: Partial<Entity>): Promise<Entity[] | undefined>;
deleteSearch(criteria: DeleteSearchCriteria<Entity>): Promise<void>;
// Event handling
on(event: "put" | "get" | "search" | "delete" | "clearall", callback: Function): void;
off(event: string, callback: Function): void;
once(event: string, callback: Function): void;
waitOn(event: string): Promise<any[]>;
emit(event: string, ...args: any[]): void;
}The deleteSearch method accepts a criteria object that supports multiple columns with optional comparison operators:
// Type definitions
type SearchOperator = "=" | "<" | "<=" | ">" | ">=";
interface SearchCondition<T> {
readonly value: T;
readonly operator: SearchOperator;
}
type DeleteSearchCriteria<Entity> = {
readonly [K in keyof Entity]?: Entity[K] | SearchCondition<Entity[K]>;
};
// Usage examples
// Equality match (direct value)
await repo.deleteSearch({ category: "electronics" });
// With comparison operator
await repo.deleteSearch({ createdAt: { value: date, operator: "<" } });
// Multiple criteria (AND logic)
await repo.deleteSearch({
category: "electronics",
value: { value: 100, operator: ">=" },
});Core interface for job queue storage:
interface IQueueStorage<Input, Output> {
add(job: JobStorageFormat<Input, Output>): Promise<unknown>;
get(id: unknown): Promise<JobStorageFormat<Input, Output> | undefined>;
next(): Promise<JobStorageFormat<Input, Output> | undefined>;
complete(job: JobStorageFormat<Input, Output>): Promise<void>;
peek(status?: JobStatus, num?: number): Promise<JobStorageFormat<Input, Output>[]>;
size(status?: JobStatus): Promise<number>;
abort(id: unknown): Promise<void>;
saveProgress(id: unknown, progress: number, message: string, details: any): Promise<void>;
deleteAll(): Promise<void>;
getByRunId(runId: string): Promise<Array<JobStorageFormat<Input, Output>>>;
outputForInput(input: Input): Promise<Output | null>;
delete(id: unknown): Promise<void>;
deleteJobsByStatusAndAge(status: JobStatus, olderThanMs: number): Promise<void>;
}import { JsonSchema, FromSchema } from "@workglow/util";
import { InMemoryTabularStorage, InMemoryKvStorage } from "@workglow/storage";
// User profile with tabular storage
const UserSchema = {
type: "object",
properties: {
id: { type: "string" },
username: { type: "string" },
email: { type: "string" },
firstName: { type: "string" },
lastName: { type: "string" },
role: {
type: "string",
enum: ["admin", "user", "guest"],
},
createdAt: { type: "string" },
lastLoginAt: { type: "string" },
},
required: ["id", "username", "email", "firstName", "lastName", "role", "createdAt"],
additionalProperties: false,
} as const satisfies JsonSchema;
const userRepo = new InMemoryTabularStorage<typeof UserSchema, ["id"]>(
UserSchema,
["id"],
["email", "username"]
);
// User sessions with KV storage
const sessionStore = new InMemoryKvStorage<string, { userId: string; expiresAt: string }>();
// User management class
class UserManager {
constructor(
private userRepo: typeof userRepo,
private sessionStore: typeof sessionStore
) {}
async createUser(userData: Omit<FromSchema<typeof UserSchema>, "id" | "createdAt">) {
const user = {
...userData,
id: crypto.randomUUID(),
createdAt: new Date().toISOString(),
};
await this.userRepo.put(user);
return user;
}
async loginUser(email: string): Promise<string> {
const users = await this.userRepo.search({ email });
if (!users?.length) throw new Error("User not found");
const sessionId = crypto.randomUUID();
await this.sessionStore.put(sessionId, {
userId: users[0].id,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString(),
});
// Update last login
await this.userRepo.put({
...users[0],
lastLoginAt: new Date().toISOString(),
});
return sessionId;
}
async getSessionUser(sessionId: string) {
const session = await this.sessionStore.get(sessionId);
if (!session || new Date(session.expiresAt) < new Date()) {
return null;
}
return this.userRepo.get({ id: session.userId });
}
}// Application settings with typed configuration
type AppConfig = {
database: {
host: string;
port: number;
name: string;
};
features: {
enableNewUI: boolean;
maxUploadSize: number;
};
integrations: {
stripe: { apiKey: string; webhook: string };
sendgrid: { apiKey: string };
};
};
const configStore = new FsFolderJsonKvRepository<string, AppConfig>("./config");
class ConfigManager {
private cache = new Map<string, AppConfig>();
constructor(private store: typeof configStore) {
// Listen for config changes
store.on("put", (key, value) => {
this.cache.set(key, value);
console.log(`Configuration updated: ${key}`);
});
}
async getConfig(environment: string): Promise<AppConfig> {
if (this.cache.has(environment)) {
return this.cache.get(environment)!;
}
const config = await this.store.get(environment);
if (!config) throw new Error(`No configuration for environment: ${environment}`);
this.cache.set(environment, config);
return config;
}
async updateConfig(environment: string, updates: Partial<AppConfig>) {
const current = await this.getConfig(environment);
const updated = { ...current, ...updates };
await this.store.put(environment, updated);
}
}import { createClient } from "@supabase/supabase-js";
import { JsonSchema } from "@workglow/util";
import {
SupabaseTabularStorage,
SupabaseKvRepository,
SupabaseQueueStorage,
} from "@workglow/storage";
// Initialize Supabase client
const supabase = createClient(process.env.SUPABASE_URL!, process.env.SUPABASE_ANON_KEY!);
// Define schemas
const ProductSchema = {
type: "object",
properties: {
id: { type: "string" },
name: { type: "string" },
price: { type: "number" },
category: { type: "string" },
stock: { type: "number", minimum: 0 },
createdAt: { type: "string", format: "date-time" },
},
required: ["id", "name", "price", "category", "stock", "createdAt"],
additionalProperties: false,
} as const satisfies JsonSchema;
const OrderSchema = {
type: "object",
properties: {
id: { type: "string" },
customerId: { type: "string" },
productId: { type: "string" },
quantity: { type: "number", minimum: 1 },
status: {
type: "string",
enum: ["pending", "processing", "completed", "cancelled"],
},
createdAt: { type: "string", format: "date-time" },
},
required: ["id", "customerId", "productId", "quantity", "status", "createdAt"],
additionalProperties: false,
} as const satisfies JsonSchema;
// Create repositories
const products = new SupabaseTabularStorage<typeof ProductSchema, ["id"]>(
supabase,
"products",
ProductSchema,
["id"],
["category", "name"] // Indexed columns for fast searching
);
const orders = new SupabaseTabularStorage<typeof OrderSchema, ["id"]>(
supabase,
"orders",
OrderSchema,
["id"],
["customerId", "status", ["customerId", "status"]] // Compound index
);
// Use KV for caching
const cache = new SupabaseKvRepository(supabase, "cache");
// Use queue for background processing
type EmailJob = { to: string; subject: string; body: string };
const emailQueue = new SupabaseQueueStorage<EmailJob, void>(supabase, "emails");
// Example usage
async function createOrder(customerId: string, productId: string, quantity: number) {
// Check product availability
const product = await products.get({ id: productId });
if (!product || product.stock < quantity) {
throw new Error("Insufficient stock");
}
// Create order
const order = {
id: crypto.randomUUID(),
customerId,
productId,
quantity,
status: "pending" as const,
createdAt: new Date().toISOString(),
};
await orders.put(order);
// Update stock
await products.put({
...product,
stock: product.stock - quantity,
});
// Queue email notification
await emailQueue.add({
input: {
to: customerId,
subject: "Order Confirmation",
body: `Your order ${order.id} has been confirmed!`,
},
run_after: null,
max_retries: 3,
});
return order;
}
// Get customer's orders
async function getCustomerOrders(customerId: string) {
return await orders.search({ customerId });
}
// Get orders by status
async function getOrdersByStatus(status: string) {
return await orders.search({ status });
}Important Note The implementations assume you have an exec_sql RPC function in your Supabase database for table creation, or that you've created the tables through Supabase migrations. For production use, it's recommended to:
- Create tables using Supabase migrations rather than runtime table creation
- Set up proper Row Level Security (RLS) policies in Supabase
- Use service role keys for server-side operations that need elevated permissions
The package includes comprehensive test suites for all storage implementations:
# Run all tests
bun test
# Run specific test suites
bun test --grep "KvRepository"
bun test --grep "TabularStorage"
bun test --grep "QueueStorage"
# Test specific environments
bun test --grep "InMemory" # Cross-platform tests
bun test --grep "IndexedDb" # Browser tests
bun test --grep "Sqlite" # Native testsimport { describe, test, expect, beforeEach } from "vitest";
import { InMemoryTabularStorage } from "@workglow/storage";
describe("UserRepository", () => {
let userRepo: InMemoryTabularStorage<typeof UserSchema, ["id"]>;
beforeEach(() => {
userRepo = new InMemoryTabularStorage<typeof UserSchema, ["id"]>(UserSchema, ["id"], ["email"]);
});
test("should create and retrieve user", async () => {
const user = {
id: "test-123",
email: "test@example.com",
name: "Test User",
age: 25,
department: "Engineering",
createdAt: new Date().toISOString(),
};
await userRepo.put(user);
const retrieved = await userRepo.get({ id: "test-123" });
expect(retrieved).toEqual(user);
});
test("should find users by department", async () => {
const users = [
{
id: "1",
email: "alice@co.com",
name: "Alice",
age: 28,
department: "Engineering",
createdAt: "2024-01-01",
},
{
id: "2",
email: "bob@co.com",
name: "Bob",
age: 32,
department: "Sales",
createdAt: "2024-01-02",
},
];
await userRepo.putBulk(users);
const engineers = await userRepo.search({ department: "Engineering" });
expect(engineers).toHaveLength(1);
expect(engineers![0].name).toBe("Alice");
});
});Apache 2.0 - See LICENSE for details