The Workglow rate limiting system (@workglow/job-queue/limiter) provides a composable set of limiters that control how fast and how many jobs can execute within a queue. Rate limiting is critical when interacting with external APIs that enforce request quotas, when protecting shared infrastructure from overload, or when ensuring fair resource allocation across multiple queues.
The system is built around a single interface -- ILimiter -- that every limiter implements. Limiters can be used individually or combined into a CompositeLimiter that enforces all constraints simultaneously. The job queue worker checks the limiter before every job claim, and the limiter's lifecycle hooks (recordJobStart, recordJobCompletion) keep internal state synchronized with actual execution.
Every limiter in the system implements this interface:
interface ILimiter {
canProceed(): Promise<boolean>;
recordJobStart(): Promise<void>;
recordJobCompletion(): Promise<void>;
getNextAvailableTime(): Promise<Date>;
setNextAvailableTime(date: Date): Promise<void>;
clear(): Promise<void>;
}| Method | Called By | Purpose |
|---|---|---|
canProceed() |
Worker (before claiming) | Returns true if the limiter allows a new job to start right now |
recordJobStart() |
Worker (after claiming) | Notifies the limiter that a job has started executing |
recordJobCompletion() |
Worker (in finally block) |
Notifies the limiter that a job has finished (success or failure) |
getNextAvailableTime() |
Worker (when rescheduling) | Returns the earliest Date at which the next job could execute |
setNextAvailableTime(date) |
External callers | Externally impose a delay (e.g., from a 429 Retry-After header) |
clear() |
Cleanup / testing | Resets the limiter to its initial state |
The worker integration is straightforward. In the processJobs loop:
while running:
if limiter.canProceed():
job = storage.next(workerId)
if job:
limiter.recordJobStart()
try:
execute(job)
finally:
limiter.recordJobCompletion()
else:
wait for notify or poll timeout
The default limiter when none is configured. It imposes no restrictions.
import { NullLimiter } from "@workglow/job-queue";
const limiter = new NullLimiter();
await limiter.canProceed(); // always trueAll methods are no-ops. canProceed() always returns true, and getNextAvailableTime() always returns new Date() (now). This is the limiter the server uses when no explicit limiter is provided.
Controls the maximum number of jobs that can execute simultaneously. This is a pure in-memory limiter with no storage dependency.
import { ConcurrencyLimiter } from "@workglow/job-queue";
const limiter = new ConcurrencyLimiter(5); // max 5 concurrent jobsThe limiter maintains a currentRunningJobs counter. canProceed() returns true only when this counter is below the configured maximum and the current time is past any externally set nextAllowedStartTime. recordJobStart() increments the counter; recordJobCompletion() decrements it (with a floor of 0).
This limiter is ideal for controlling parallelism within a single process -- for example, limiting concurrent GPU inference tasks or database connections.
| Parameter | Type | Description |
|---|---|---|
maxConcurrentJobs |
number |
Maximum number of jobs executing at the same time |
const limiter = new ConcurrencyLimiter(3);
await limiter.canProceed(); // true (0 running)
await limiter.recordJobStart(); // 1 running
await limiter.recordJobStart(); // 2 running
await limiter.recordJobStart(); // 3 running
await limiter.canProceed(); // false (at capacity)
await limiter.recordJobCompletion(); // 2 running
await limiter.canProceed(); // trueA sliding-window rate limiter with exponential backoff and jitter. Unlike the ConcurrencyLimiter, this limiter uses an IRateLimiterStorage backend to persist execution timestamps, making it suitable for cross-process rate limiting.
import { RateLimiter } from "@workglow/job-queue";
import { InMemoryRateLimiterStorage } from "@workglow/storage";
const storage = new InMemoryRateLimiterStorage();
await storage.setupDatabase();
const limiter = new RateLimiter(storage, "openai-embeddings", {
maxExecutions: 100,
windowSizeInSeconds: 60,
initialBackoffDelay: 1_000,
backoffMultiplier: 2,
maxBackoffDelay: 600_000,
});The limiter tracks individual execution timestamps in its storage backend. When canProceed() is called, it counts how many executions have occurred within the current window (now - windowSizeInSeconds). If the count is below maxExecutions, the job may proceed and any existing backoff is cleared.
When recordJobStart() detects that the window is full, it applies an exponential backoff delay with full jitter:
backoffDelay = min(currentBackoff * backoffMultiplier, maxBackoffDelay)
jitteredDelay = backoffDelay + random(0, backoffDelay)
nextAvailableTime = now + jitteredDelay
The jitter prevents thundering-herd problems when multiple workers hit the rate limit simultaneously. Each worker will retry at a slightly different time.
The RateLimiter delegates persistence to an IRateLimiterStorage implementation. Available backends include:
InMemoryRateLimiterStorage-- Fast, single-process.SqliteRateLimiterStorage-- Persistent, single-machine.PostgresRateLimiterStorage-- Persistent, multi-machine.IndexedDbRateLimiterStorage-- Browser environments.SupabaseRateLimiterStorage-- Cloud-hosted PostgreSQL.
The storage interface provides:
interface IRateLimiterStorage {
setupDatabase(): Promise<void>;
recordExecution(queueName: string): Promise<void>;
getExecutionCount(queueName: string, windowStartTime: string): Promise<number>;
getOldestExecutionAtOffset(queueName: string, offset: number): Promise<string | undefined>;
getNextAvailableTime(queueName: string): Promise<string | undefined>;
setNextAvailableTime(queueName: string, nextAvailableAt: string): Promise<void>;
clear(queueName: string): Promise<void>;
}| Option | Type | Default | Description |
|---|---|---|---|
maxExecutions |
number |
(required) | Maximum executions allowed per window |
windowSizeInSeconds |
number |
(required) | Length of the sliding window |
initialBackoffDelay |
number |
1000 |
Initial backoff delay in ms |
backoffMultiplier |
number |
2 |
Multiplier applied on each successive backoff |
maxBackoffDelay |
number |
600000 |
Maximum backoff delay (10 minutes) |
A simple limiter that enforces a minimum delay between consecutive job starts. It does not limit concurrency or track a window -- it simply prevents the next job from starting until a fixed interval has elapsed since the last recordJobStart().
import { DelayLimiter } from "@workglow/job-queue";
const limiter = new DelayLimiter(200); // 200ms between job startsOn recordJobStart(), the limiter sets nextAvailableTime = now + delayInMilliseconds. Subsequent calls to canProceed() return false until that time has passed. This is useful for inserting a small cooldown between requests to avoid burst behavior.
| Parameter | Type | Default | Description |
|---|---|---|---|
delayInMilliseconds |
number |
50 |
Minimum delay between consecutive starts |
A rate limiter that distributes job starts evenly across a time window, taking actual execution duration into account. Rather than allowing a burst of maxExecutions jobs and then blocking for the remainder of the window, this limiter spaces starts at regular intervals.
import { EvenlySpacedRateLimiter } from "@workglow/job-queue";
const limiter = new EvenlySpacedRateLimiter({
maxExecutions: 60,
windowSizeInSeconds: 60,
});
// Ideal interval = 60s / 60 = 1 start per secondThe ideal interval between starts is windowSizeMs / maxExecutions. However, if jobs take measurable time to complete, the limiter adjusts the wait time:
waitMs = max(0, idealInterval - averageDuration)
nextAvailableTime = now + waitMs
The limiter maintains a running window of the most recent maxExecutions job durations. recordJobCompletion() records the duration of the just-finished job. recordJobStart() uses the average of these durations to calculate the next available time.
This approach is ideal for API calls where you need a steady request rate rather than bursty behavior. For example, an API with a 60 RPM limit benefits from one request per second rather than 60 requests in the first second followed by 59 seconds of waiting.
| Option | Type | Description |
|---|---|---|
maxExecutions |
number |
Maximum executions allowed per window |
windowSizeInSeconds |
number |
Length of the time window |
The CompositeLimiter combines multiple limiters using AND semantics: a job can only proceed if every constituent limiter approves. This is the primary mechanism for layering different rate limiting strategies.
import { CompositeLimiter, ConcurrencyLimiter, RateLimiter } from "@workglow/job-queue";
const limiter = new CompositeLimiter([
new ConcurrencyLimiter(5),
new RateLimiter(storage, "openai", {
maxExecutions: 100,
windowSizeInSeconds: 60,
}),
]);canProceed()iterates through all limiters and returnsfalseat the first one that denies. Short-circuit evaluation means expensive checks can be placed later in the list.recordJobStart()andrecordJobCompletion()are forwarded to all limiters.getNextAvailableTime()returns the latest (most restrictive) time across all limiters.setNextAvailableTime(date)is forwarded to all limiters.clear()clears all limiters.
Limiters can be added at runtime:
const composite = new CompositeLimiter();
composite.addLimiter(new ConcurrencyLimiter(10));
composite.addLimiter(new DelayLimiter(100));The limiter is passed to the JobQueueServer via the limiter option. The server passes it to every worker it creates. The worker consults the limiter in two places:
-
Main processing loop: Before calling
storage.next(), the worker callslimiter.canProceed(). If the limiter denies, the worker waits for a wake signal or poll timeout. -
Job rescheduling: When a
RetryableJobErroris caught and the job still has retries remaining, the worker callslimiter.getNextAvailableTime()to determine therunAfterdate for the rescheduled job (unless the error provides its ownretryDate).
const server = new JobQueueServer(MyJob, {
storage,
queueName: "my-queue",
limiter: new CompositeLimiter([
new ConcurrencyLimiter(3),
new EvenlySpacedRateLimiter({
maxExecutions: 30,
windowSizeInSeconds: 60,
}),
]),
workerCount: 3,
});When an external API returns a rate limit response (e.g., HTTP 429 with Retry-After), your job can throw a RetryableJobError with a specific retryDate. You can also directly set the limiter's next available time:
import { RetryableJobError } from "@workglow/job-queue";
class ApiJob extends Job<ApiInput, ApiOutput> {
async execute(input: ApiInput, context: IJobExecuteContext): Promise<ApiOutput> {
const response = await fetch(input.url);
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get("Retry-After") || "60", 10);
throw new RetryableJobError(
"Rate limited by API",
new Date(Date.now() + retryAfter * 1000)
);
}
return await response.json();
}
}Limit concurrent requests to an AI provider while respecting their per-minute quota:
const anthropicLimiter = new CompositeLimiter([
new ConcurrencyLimiter(5), // max 5 in-flight requests
new RateLimiter(rateLimiterStorage, "anthropic", {
maxExecutions: 50, // 50 requests per minute
windowSizeInSeconds: 60,
initialBackoffDelay: 2_000,
backoffMultiplier: 2,
maxBackoffDelay: 120_000,
}),
]);Space out requests evenly with a small additional delay:
const crawlLimiter = new CompositeLimiter([
new EvenlySpacedRateLimiter({
maxExecutions: 10,
windowSizeInSeconds: 60,
}),
new DelayLimiter(500), // at least 500ms between starts
]);Use different limiters for different queues, each respecting the provider's specific limits:
const openaiServer = new JobQueueServer(OpenAiJob, {
storage: openaiStorage,
queueName: "openai",
limiter: new CompositeLimiter([
new ConcurrencyLimiter(10),
new RateLimiter(storage, "openai", {
maxExecutions: 500,
windowSizeInSeconds: 60,
}),
]),
workerCount: 4,
});
const ollamaServer = new JobQueueServer(OllamaJob, {
storage: ollamaStorage,
queueName: "ollama",
limiter: new ConcurrencyLimiter(2), // local GPU, limit concurrency only
workerCount: 1,
});For local compute tasks that do not need throttling:
const server = new JobQueueServer(LocalJob, {
storage,
queueName: "local-compute",
// No limiter option = NullLimiter (no restrictions)
workerCount: 8,
});The limiter system uses Workglow's dependency injection to register default instances:
| Token | Description |
|---|---|
JOB_LIMITER |
Generic limiter token ("jobqueue.limiter") |
CONCURRENT_JOB_LIMITER |
Concurrency limiter token ("jobqueue.limiter.concurrent") |
EVENLY_SPACED_JOB_RATE_LIMITER |
Evenly-spaced limiter token ("jobqueue.limiter.rate.evenlyspaced") |
NULL_JOB_LIMITER |
Null limiter token ("jobqueue.limiter.null") |
canProceed(): Promise<boolean>-- Check whether a new job can start now.recordJobStart(): Promise<void>-- Record that a job has begun executing.recordJobCompletion(): Promise<void>-- Record that a job has finished.getNextAvailableTime(): Promise<Date>-- Earliest time a new job could start.setNextAvailableTime(date: Date): Promise<void>-- Externally impose a delay.clear(): Promise<void>-- Reset the limiter to its initial state.
new NullLimiter()-- No-op limiter. All methods are passthrough.
new ConcurrencyLimiter(maxConcurrentJobs: number)-- Create a concurrency limiter.
new RateLimiter(storage: IRateLimiterStorage, queueName: string, options: RateLimiterWithBackoffOptions)-- Create a sliding-window rate limiter with exponential backoff.
new DelayLimiter(delayInMilliseconds?: number)-- Create a fixed-delay limiter (default: 50ms).
new EvenlySpacedRateLimiter(options: RateLimiterOptions)-- Create an evenly-spaced rate limiter.
new CompositeLimiter(limiters?: ILimiter[])-- Create a composite limiter from an array of limiters.addLimiter(limiter: ILimiter): void-- Add a limiter at runtime.
The ILimiter interface is intentionally flat. Every limiter -- whether it tracks concurrency, sliding windows, or fixed delays -- implements the same six methods. This makes composition trivial: a CompositeLimiter just iterates its children. There is no need for adapter classes, strategy patterns, or complex inheritance hierarchies. If you need a new limiting strategy, you implement ILimiter and slot it into a CompositeLimiter alongside existing limiters.
All ILimiter methods return Promise, even for in-memory limiters where the operation is synchronous. This is deliberate: the RateLimiter delegates to storage backends that may involve network I/O (PostgreSQL, Supabase), and the interface must accommodate the most general case. In-memory limiters simply resolve their promises immediately.
The ConcurrencyLimiter, DelayLimiter, and EvenlySpacedRateLimiter maintain in-memory state. They are accurate within a single process but do not coordinate across multiple processes. If you run multiple server processes that share a queue, use the RateLimiter with a shared storage backend (PostgreSQL or Supabase) to ensure global rate limiting.
For concurrency limiting across processes, the IQueueStorage.next(workerId) method itself provides implicit concurrency control through atomic claims. The ConcurrencyLimiter provides additional throttling within a single process to limit resource consumption (e.g., GPU memory, file descriptors).