feat(core,server): add agent rollouts and experimentation#15356
feat(core,server): add agent rollouts and experimentation#15356DanielSLew wants to merge 7 commits intomainfrom
Conversation
Adds canary rollouts and A/B experiments for agents. Traffic is split across agent versions using deterministic hash-based routing so users get a consistent version throughout a rollout. A background accumulator monitors scorer results and auto-rolls back if scores drop below configured thresholds. New rollout storage domain (mastra_rollouts) with implementations for LibSQL, Postgres, MongoDB, and in-memory. HTTP endpoints under /agents/:agentId/rollout for the full lifecycle: start, update weights, promote, rollback, cancel, and query experiment results. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
🦋 Changeset detectedLatest commit: 56c52d7 The changes in this PR will be included in the next version bump. This PR includes changesets to release 24 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds rollout/experiment features: deterministic routing and allocation selection, an in-memory rolling-score accumulator with periodic rule evaluation and auto-rollback, a rollouts storage domain with InMemory/LibSQL/PG/MongoDB adapters, server REST endpoints/schemas, editor/Mastra integration, and extensive unit/lifecycle tests. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/server/src/server/handlers/agents.ts (1)
1057-1081:⚠️ Potential issue | 🟠 MajorResolve the rollout after the routing key is populated into
requestContext.These handlers pick the agent version before they merge
body.requestContextor derive the effectiveresourceId. With the defaultroutingKey = 'resourceId', requests that only sendmemory.resource, legacyresourceId, orbody.requestContext.resourceIdwill resolve against an empty context and fall back tostableVersionId, so the rollout never applies on the common generate/stream paths.Move the request-context merge / effective-resource derivation ahead of
getAgentFromSystem(...), or build a temporary context for rollout resolution first.Also applies to: 1142-1159, 1203-1220, 1353-1376
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agents.ts` around lines 1057 - 1081, The agent version is being resolved by getAgentFromSystem before bodyRequestContext (and derived resourceId/memory.resource) are merged into serverRequestContext, causing rollout/routingKey (e.g., routingKey='resourceId') to miss client-supplied values; fix by moving the block that merges bodyRequestContext into serverRequestContext (the for...of loop referencing serverRequestContext.set/get and bodyRequestContext) and any effective-resource derivation ahead of the call to getAgentFromSystem, or alternatively construct a temporary RequestContext that merges serverRequestContext with bodyRequestContext/memory.resource and pass that to getAgentFromSystem so rollout resolution uses client-provided resourceId/routingKey; apply same change where getAgentFromSystem is called at the other locations noted (around lines 1142-1159, 1203-1220, 1353-1376).
🧹 Nitpick comments (6)
packages/core/src/storage/types.ts (1)
2469-2481: TightenRolloutRecord.completedAtto a single nullable shape.
completedAtis currently optional, which broadens the contract toDate | null | undefined. Since persisted records are expected to carry lifecycle timestamps, make it consistently nullable (Date | null) to avoid unnecessary undefined-branching in consumers.♻️ Proposed type-contract tweak
export interface RolloutRecord { id: string; agentId: string; type: RolloutType; status: RolloutStatus; stableVersionId: string; allocations: RolloutAllocation[]; routingKey?: string; rules?: RolloutRule[]; createdAt: Date; updatedAt: Date; - completedAt?: Date | null; + completedAt: Date | null; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/storage/types.ts` around lines 2469 - 2481, The RolloutRecord type defines completedAt as optional which yields Date | null | undefined; change the field to a non-optional nullable shape (completedAt: Date | null) so consumers only handle Date or null. Update the RolloutRecord interface definition (and any related types/usages that create or read RolloutRecord) to remove the optional marker for completedAt, ensure any constructors/factories/setters explicitly set null instead of leaving it undefined, and adjust serialization/deserialization logic if necessary to always emit null when no completion timestamp exists.stores/mongodb/src/storage/domains/rollouts/index.ts (3)
231-234: Consider usingfindOneAndUpdatefor atomicity.The current pattern does a separate
updateOnefollowed byfindOne, which is not atomic. Another process could modify the document between these calls. UsingfindOneAndUpdatewithreturnDocument: 'after'would return the updated document atomically.♻️ Proposed atomic update
- await collection.updateOne({ id: input.id }, { $set: updateData }); - - const updated = await collection.findOne({ id: input.id }); - return transformRolloutRow(updated as Record<string, unknown>); + const updated = await collection.findOneAndUpdate( + { id: input.id }, + { $set: updateData }, + { returnDocument: 'after' } + ); + return transformRolloutRow(updated as Record<string, unknown>);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/mongodb/src/storage/domains/rollouts/index.ts` around lines 231 - 234, Replace the non-atomic updateOne + findOne sequence with a single atomic call: use collection.findOneAndUpdate with the filter { id: input.id }, the update { $set: updateData }, and options to return the post-update document (e.g., returnDocument: 'after' or returnOriginal: false depending on driver version); then pass the returned document directly to transformRolloutRow so you no longer call collection.findOne after collection.updateOne.
254-257: Same error handling inconsistency incompleteRollout.Apply the same
MastraErrorpattern here for the not-found case to maintain consistency with the PostgreSQL implementation.♻️ Proposed fix
if (!existing) { - throw new Error(`Rollout not found: ${id}`); + throw new MastraError({ + id: createStorageErrorId('MONGODB', 'COMPLETE_ROLLOUT', 'NOT_FOUND'), + domain: ErrorDomain.STORAGE, + category: ErrorCategory.USER, + details: { rolloutId: id }, + }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/mongodb/src/storage/domains/rollouts/index.ts` around lines 254 - 257, The completeRollout code throws a generic Error when a rollout isn’t found after calling collection.findOne({ id }); replace that with the MastraError pattern used in PostgreSQL to maintain consistency: throw a new MastraError with the same not-found message/metadata (e.g., "Rollout not found", include id) instead of Error. Update the error construction where existing is null in completeRollout so callers can rely on MastraError semantics.
220-225: Inconsistent error handling compared to PostgreSQL implementation.This uses plain
Errorfor not-found and invalid-status cases, while the PostgreSQL implementation usesMastraErrorwith proper error categories (ErrorCategory.USERfor user errors). This causes inconsistent error semantics across storage backends.♻️ Proposed fix for consistent error handling
+import { ErrorCategory, ErrorDomain, MastraError } from '@mastra/core/error'; +import { createStorageErrorId } from '@mastra/core/storage'; // In updateRollout: if (!existing) { - throw new Error(`Rollout not found: ${input.id}`); + throw new MastraError({ + id: createStorageErrorId('MONGODB', 'UPDATE_ROLLOUT', 'NOT_FOUND'), + domain: ErrorDomain.STORAGE, + category: ErrorCategory.USER, + details: { rolloutId: input.id }, + }); } if (existing.status !== 'active') { - throw new Error(`Cannot update rollout with status: ${existing.status}`); + throw new MastraError({ + id: createStorageErrorId('MONGODB', 'UPDATE_ROLLOUT', 'INVALID_STATUS'), + domain: ErrorDomain.STORAGE, + category: ErrorCategory.USER, + details: { rolloutId: input.id, status: existing.status }, + }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/mongodb/src/storage/domains/rollouts/index.ts` around lines 220 - 225, The code throws plain Error for missing or invalid-status rollouts; change these to throw MastraError using ErrorCategory.USER so errors match the PostgreSQL backend: replace throw new Error(`Rollout not found: ${input.id}`) with throw new MastraError(ErrorCategory.USER, `Rollout not found: ${input.id}`) and replace throw new Error(`Cannot update rollout with status: ${existing.status}`) with throw new MastraError(ErrorCategory.USER, `Cannot update rollout with status: ${existing.status}`), and add the necessary imports for MastraError and ErrorCategory at the top of the file.packages/server/src/server/handlers/agent-rollouts.ts (1)
477-495: Large pagination limit may cause performance issues.Fetching up to 10,000 scores per allocation could cause memory pressure and slow responses for high-volume agents. Consider either:
- Streaming/cursor-based aggregation
- Pre-aggregated score statistics stored during score writes
- A lower limit with time-windowed queries
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 477 - 495, The current code in agent-rollouts uses scoresStore.listScoresByEntityId with pagination { perPage: 10000 } to load allScores into memory (used in the byScorer grouping for alloc.versionId), which risks memory/latency problems; change the implementation to page through results or use a cursor/streaming API instead of a single huge page: call listScoresByEntityId with a reasonable perPage (e.g., 100-1000) and loop over pages until empty, performing the same versionId filter and updating the byScorer map incrementally (or switch to a server-side/worker-produced pre-aggregated stats table written at score write time and query that instead of allScores); update references to listScoresByEntityId, allScores, byScorer and the alloc.versionId comparison so aggregation happens incrementally rather than loading 10k records at once.stores/pg/src/storage/domains/rollouts/index.ts (1)
158-173: Timestamp columns inconsistency with update/complete methods.
createRolloutonly setscreatedAtandupdatedAt, butupdateRollout(lines 222-223) andcompleteRollout(lines 271-272) also set the*Zcolumns (updatedAtZ,completedAtZ). This meanstransformRowmay return stale dates when it falls back to the*Zcolumns for rows created by this method and later updated.For consistency, either:
- Set
*Zcolumns here as well, or- Remove the
*Zcolumn updates fromupdateRolloutandcompleteRolloutif they're not needed.♻️ Proposed fix to add `*Z` columns
await this.#db.insert({ tableName: TABLE_ROLLOUTS, record: { id, agentId: input.agentId, type: input.type, status: 'active', stableVersionId: input.stableVersionId, allocations: JSON.stringify(input.allocations), routingKey: input.routingKey ?? null, rules: input.rules ? JSON.stringify(input.rules) : null, createdAt: nowIso, + createdAtZ: nowIso, updatedAt: nowIso, + updatedAtZ: nowIso, completedAt: null, + completedAtZ: null, }, });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/pg/src/storage/domains/rollouts/index.ts` around lines 158 - 173, createRollout inserts a new row into TABLE_ROLLOUTS but only sets createdAt/updatedAt, causing transformRow to read stale *_Z fallback columns used by updateRollout and completeRollout; update createRollout (the insert block in the method) to also populate the timezone/UTC columns (e.g., createdAtZ and updatedAtZ) with the same ISO timestamp and set completedAtZ to null so rows created by createRollout are consistent with updateRollout and completeRollout and transformRow will return correct dates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.changeset/lazy-deer-stare.md:
- Around line 9-17: The changeset currently lists internal implementation
details (createRollout, getActiveRollout, getRollout, updateRollout,
completeRollout, listRollouts and indexing notes); replace that with a short,
user-facing one- or two-sentence summary that highlights the outcome (e.g.,
support for full rollout lifecycle and faster lookups) and remove the
method-by-method and index details so the entry is concise and outcome-focused.
In @.changeset/lovely-glasses-bet.md:
- Around line 1-66: Condense this changeset to a 1–2 sentence summary and remove
all API tables and code examples: briefly state that HTTP endpoints for managing
agent rollouts (canary deployments and A/B tests) were added and that
generate/stream endpoints (GENERATE_AGENT_ROUTE, STREAM_GENERATE_ROUTE,
GENERATE_LEGACY_ROUTE, STREAM_GENERATE_LEGACY_ROUTE, NETWORK_ROUTE) now resolve
agent versions from active rollouts when agentVersionId is omitted; if other
packages (`@mastra/core`, `@mastra/editor`, `@mastra/libsql`, `@mastra/pg`,
`@mastra/mongodb`) have separate changes, create distinct, minimal changeset files
for each.
In @.changeset/sharp-friends-appear.md:
- Around line 5-83: The changeset .changeset/sharp-friends-appear.md is too
detailed for a release note; replace the long schema/type/function/design
content (search for ROLLOUTS_SCHEMA,
RolloutAllocation/RolloutRule/RolloutRecord, RolloutAccumulator,
resolveVersionFromRollout, evaluateRules) with a one- or two-sentence,
outcome-focused summary like "Added agent rollout and experimentation support:
canary rollouts, A/B experiments, and sticky routing for deterministic version
assignment" and move all implementation details (schema, types, accumulator,
helper signatures) into the repo docs or design doc instead.
In `@packages/core/src/agent/__tests__/rollout.test.ts`:
- Around line 70-75: The test asserting different buckets for the same user with
different agentIds is non-deterministic; update the test in rollout.test.ts to
be deterministic by either asserting a fixed expected bucket for a known input
pair using deterministicBucket('user-1', 'agent_a') (compare to the known
numeric bucket you compute once) or by verifying divergence across many samples
(e.g., generate buckets for a large set of distinct agentIds with
deterministicBucket and assert more than one unique bucket exists). Target the
deterministicBucket function and replace the flaky expect(a).not.toBe(b) with
one of these deterministic checks so the test never randomly fails.
In `@packages/core/src/agent/rollout.ts`:
- Around line 50-60: The pickAllocation function can crash when allocations is
empty; add a defensive check at the top of pickAllocation to handle an empty
allocations array (the allocations parameter of type RolloutAllocation[]), and
if it's empty either throw a clear Error (e.g., "pickAllocation: allocations
must contain at least one entry") or return a safe sentinel (e.g., undefined or
empty string) consistently used by callers; ensure the change is applied in the
pickAllocation function so the fallback access allocations[allocations.length -
1]!.versionId is never reached when allocations.length === 0.
In `@packages/core/src/mastra/hooks.ts`:
- Around line 90-97: The rollout accumulator push is skipped if
validateAndSaveScore(storage, payload) throws, so move or protect the
mastra.getRolloutAccumulator()?.push call to ensure it's executed regardless of
legacy score-store failures: capture resolvedVersionId (from
hookData.entity.resolvedVersionId) and runResult.score, then either (a) call
mastra.getRolloutAccumulator()?.push(entityId, resolvedVersionId, scorerId,
runResult.score) before invoking validateAndSaveScore, or (b) wrap
validateAndSaveScore in try/catch/finally and invoke the push in the finally (or
in the catch after logging) so rollout accumulation always runs even if
validateAndSaveScore fails.
In `@packages/core/src/storage/domains/rollouts/inmemory.ts`:
- Around line 25-37: The in-memory store is leaking mutable rollout state
because getActiveRollout, getRollout, listRollouts (and createRollout) only
shallow-copy RolloutRecord, leaving nested arrays/objects like allocations and
rules shared with the backing Map; fix by returning and storing fully
deep-cloned RolloutRecord instances (clone nested allocations/rules) so callers
cannot mutate internal state and behavior matches SQL stores—update
getActiveRollout, getRollout, listRollouts, and createRollout to use a
deep-clone mechanism (e.g., structured cloning or JSON round-trip) for both
reads and writes before returning/storing.
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 551-554: The handler is passing a 1-indexed API page directly to
storage (listRollouts) which expects 0-indexed pagination; update the
agent-rollouts handler where you call rolloutsStore.listRollouts to convert the
validated page from listRolloutsQuerySchema into 0-indexed before sending to
storage (e.g., compute storagePage = (page ?? 1) - 1 or if page is present use
page - 1, default 0) and pass pagination: { page: storagePage, perPage },
leaving the schema (listRolloutsQuerySchema) unchanged so API remains 1-indexed.
In `@packages/server/src/server/server-adapter/routes/index.ts`:
- Line 146: The ServerRoutes tuple is missing the rollout routes, so add the
rollout entries to the tuple type by including ...typeof AGENT_ROLLOUT_ROUTES
inside the ServerRoutes tuple definition (where ServerRoutes is declared) so
route-level type inference picks up those endpoints; update the tuple to include
the spread of AGENT_ROLLOUT_ROUTES (and adjust any surrounding tuple order if
needed) to restore complete typing for the rollout routes.
In `@stores/libsql/src/storage/domains/rollouts/index.ts`:
- Around line 34-41: The current non-unique index idx_rollouts_agentid_status
doesn't prevent concurrent inserts of multiple active rollouts; change the DB
layer to enforce uniqueness by creating a unique partial index/constraint on
TABLE_ROLLOUTS for (agentId) WHERE status='active' (add this in init() alongside
`#db.createTable` and remove/replace the existing idx_rollouts_agentid_status),
and update createRollout() to handle conflicts (use an INSERT that respects the
unique partial index—e.g., ON CONFLICT DO NOTHING/RETURN or surface a clear
error) and make getActiveRollout() deterministic (ensure it queries WHERE
status='active' and handles the single-row guarantee). Also apply the same
DB-level uniqueness and conflict-handling changes referenced around the other
affected blocks (the createRollout()/getActiveRollout() logic noted in the
review) so the single-active-rollout invariant is enforced at the database level
rather than by application convention.
---
Outside diff comments:
In `@packages/server/src/server/handlers/agents.ts`:
- Around line 1057-1081: The agent version is being resolved by
getAgentFromSystem before bodyRequestContext (and derived
resourceId/memory.resource) are merged into serverRequestContext, causing
rollout/routingKey (e.g., routingKey='resourceId') to miss client-supplied
values; fix by moving the block that merges bodyRequestContext into
serverRequestContext (the for...of loop referencing serverRequestContext.set/get
and bodyRequestContext) and any effective-resource derivation ahead of the call
to getAgentFromSystem, or alternatively construct a temporary RequestContext
that merges serverRequestContext with bodyRequestContext/memory.resource and
pass that to getAgentFromSystem so rollout resolution uses client-provided
resourceId/routingKey; apply same change where getAgentFromSystem is called at
the other locations noted (around lines 1142-1159, 1203-1220, 1353-1376).
---
Nitpick comments:
In `@packages/core/src/storage/types.ts`:
- Around line 2469-2481: The RolloutRecord type defines completedAt as optional
which yields Date | null | undefined; change the field to a non-optional
nullable shape (completedAt: Date | null) so consumers only handle Date or null.
Update the RolloutRecord interface definition (and any related types/usages that
create or read RolloutRecord) to remove the optional marker for completedAt,
ensure any constructors/factories/setters explicitly set null instead of leaving
it undefined, and adjust serialization/deserialization logic if necessary to
always emit null when no completion timestamp exists.
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 477-495: The current code in agent-rollouts uses
scoresStore.listScoresByEntityId with pagination { perPage: 10000 } to load
allScores into memory (used in the byScorer grouping for alloc.versionId), which
risks memory/latency problems; change the implementation to page through results
or use a cursor/streaming API instead of a single huge page: call
listScoresByEntityId with a reasonable perPage (e.g., 100-1000) and loop over
pages until empty, performing the same versionId filter and updating the
byScorer map incrementally (or switch to a server-side/worker-produced
pre-aggregated stats table written at score write time and query that instead of
allScores); update references to listScoresByEntityId, allScores, byScorer and
the alloc.versionId comparison so aggregation happens incrementally rather than
loading 10k records at once.
In `@stores/mongodb/src/storage/domains/rollouts/index.ts`:
- Around line 231-234: Replace the non-atomic updateOne + findOne sequence with
a single atomic call: use collection.findOneAndUpdate with the filter { id:
input.id }, the update { $set: updateData }, and options to return the
post-update document (e.g., returnDocument: 'after' or returnOriginal: false
depending on driver version); then pass the returned document directly to
transformRolloutRow so you no longer call collection.findOne after
collection.updateOne.
- Around line 254-257: The completeRollout code throws a generic Error when a
rollout isn’t found after calling collection.findOne({ id }); replace that with
the MastraError pattern used in PostgreSQL to maintain consistency: throw a new
MastraError with the same not-found message/metadata (e.g., "Rollout not found",
include id) instead of Error. Update the error construction where existing is
null in completeRollout so callers can rely on MastraError semantics.
- Around line 220-225: The code throws plain Error for missing or invalid-status
rollouts; change these to throw MastraError using ErrorCategory.USER so errors
match the PostgreSQL backend: replace throw new Error(`Rollout not found:
${input.id}`) with throw new MastraError(ErrorCategory.USER, `Rollout not found:
${input.id}`) and replace throw new Error(`Cannot update rollout with status:
${existing.status}`) with throw new MastraError(ErrorCategory.USER, `Cannot
update rollout with status: ${existing.status}`), and add the necessary imports
for MastraError and ErrorCategory at the top of the file.
In `@stores/pg/src/storage/domains/rollouts/index.ts`:
- Around line 158-173: createRollout inserts a new row into TABLE_ROLLOUTS but
only sets createdAt/updatedAt, causing transformRow to read stale *_Z fallback
columns used by updateRollout and completeRollout; update createRollout (the
insert block in the method) to also populate the timezone/UTC columns (e.g.,
createdAtZ and updatedAtZ) with the same ISO timestamp and set completedAtZ to
null so rows created by createRollout are consistent with updateRollout and
completeRollout and transformRow will return correct dates.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f6d5b176-2124-446f-949a-f67b355e0b05
📒 Files selected for processing (33)
.changeset/lazy-deer-stare.md.changeset/lovely-glasses-bet.md.changeset/sharp-friends-appear.mdpackages/core/src/agent/__tests__/rollout-lifecycle.test.tspackages/core/src/agent/__tests__/rollout.test.tspackages/core/src/agent/agent.tspackages/core/src/agent/index.tspackages/core/src/agent/rollout.tspackages/core/src/editor/types.tspackages/core/src/mastra/hooks.tspackages/core/src/mastra/index.tspackages/core/src/storage/base.tspackages/core/src/storage/constants.tspackages/core/src/storage/domains/index.tspackages/core/src/storage/domains/inmemory-db.tspackages/core/src/storage/domains/operations/inmemory.tspackages/core/src/storage/domains/rollouts/base.tspackages/core/src/storage/domains/rollouts/index.tspackages/core/src/storage/domains/rollouts/inmemory.tspackages/core/src/storage/mock.tspackages/core/src/storage/types.tspackages/editor/src/namespaces/agent.tspackages/server/src/server/handlers/agent-rollouts.tspackages/server/src/server/handlers/agents.tspackages/server/src/server/schemas/agent-rollouts.tspackages/server/src/server/server-adapter/routes/agent-rollouts.tspackages/server/src/server/server-adapter/routes/index.tsstores/libsql/src/storage/domains/rollouts/index.tsstores/libsql/src/storage/index.tsstores/mongodb/src/storage/domains/rollouts/index.tsstores/mongodb/src/storage/index.tsstores/pg/src/storage/domains/rollouts/index.tsstores/pg/src/storage/index.ts
- Add TABLE_ROLLOUTS to Cloudflare RecordTypes and ClickHouse TABLE_ENGINES - Deep-clone rollout records in in-memory store to prevent mutable state leaks - Fix 1-indexed API page vs 0-indexed storage pagination mismatch - Merge bodyRequestContext before getAgentFromSystem in generate/stream routes so rollout routing sees client-supplied resourceId - Add unique partial index (WHERE status='active') in LibSQL, PG, and MongoDB to enforce single active rollout per agent - Replace plain Error with MastraError in MongoDB rollouts domain - Use findOneAndUpdate for atomic update+read in MongoDB rollouts - Page through scores in GET_ROLLOUT_RESULTS_ROUTE instead of loading 10K at once - Fix completedAt type from Date | null | undefined to Date | null - Set createdAtZ/updatedAtZ/completedAtZ in PG createRollout Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
All 8 rollout handlers now call requireEditor(mastra) upfront, returning a 400 with a clear message if the editor is not configured. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
packages/core/src/storage/domains/rollouts/inmemory.ts (1)
13-18:⚠️ Potential issue | 🟠 MajorDefensive cloning is still incomplete here.
Most of the earlier shallow-copy leak is fixed, but this path still shares mutable objects:
cloneRollout()returns the sameDateinstances,updateRollout()stores caller-ownedallocations/rules, andcompleteRollout()stores a caller-ownedcompletedAt. Mutating any of those after the call still rewrites the backingMapoutside the domain API.Suggested fix
function cloneRollout(r: RolloutRecord): RolloutRecord { return { ...r, + createdAt: new Date(r.createdAt), + updatedAt: new Date(r.updatedAt), + completedAt: r.completedAt ? new Date(r.completedAt) : null, allocations: r.allocations.map(a => ({ ...a })), rules: r.rules?.map(rule => ({ ...rule })), }; } @@ const updated: RolloutRecord = { ...existing, - allocations: input.allocations ?? existing.allocations, - rules: input.rules ?? existing.rules, + allocations: input.allocations?.map(a => ({ ...a })) ?? existing.allocations, + rules: input.rules?.map(rule => ({ ...rule })) ?? existing.rules, updatedAt: new Date(), }; @@ - const now = completedAt ?? new Date(); + const now = completedAt ? new Date(completedAt) : new Date();Also applies to: 66-77, 89-95
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/storage/domains/rollouts/inmemory.ts` around lines 13 - 18, cloneRollout, updateRollout, and completeRollout are still storing caller-owned mutable objects (Date instances, allocation/rule objects/arrays, completedAt) causing external mutations to affect the in-memory Map; fix by defensively deep-cloning all nested mutable values before returning or storing: in cloneRollout create fresh Date copies for any Date fields and deep-clone allocations and rules (new objects for each nested property), and in updateRollout and completeRollout ensure you store copies of the incoming allocations, rules, and completedAt (not the original references) so the internal Map never holds caller-owned mutable objects.
🧹 Nitpick comments (5)
packages/server/src/server/handlers/agent-rollouts.ts (3)
61-86: Fire-and-forget binding has potential race condition.The
ensureAccumulatorfunction usesvoid storage.getStore('rollouts').then(...)which executes asynchronously without awaiting. If a request arrives before binding completes, the accumulator won't have storage bound yet. Whileaccumulator.boundcheck prevents re-binding, scores could be lost during the binding window.Consider awaiting the binding or adding a bound-check before score operations.
♻️ Alternative: Make ensureAccumulator async and await it
-function ensureAccumulator(mastra: Mastra): void { +async function ensureAccumulator(mastra: Mastra): Promise<void> { const accumulator = mastra.getRolloutAccumulator(); if (!accumulator || accumulator.bound) return; const storage = mastra.getStorage(); if (!storage) return; - void storage.getStore('rollouts').then((rolloutsStore: RolloutsStorage | undefined) => { + const rolloutsStore = await storage.getStore('rollouts'); + if (!rolloutsStore) return; + + accumulator.bind(rolloutsStore, async (agentId: string, rolloutId: string) => { // ... rollback handler + }); + + accumulator.start(); - }); }Then update callers:
// Ensure the accumulator is running for rule evaluation - ensureAccumulator(mastra); + await ensureAccumulator(mastra);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 61 - 86, The current ensureAccumulator uses fire-and-forget storage.getStore('rollouts') which can leave the accumulator unbound when requests occur; update ensureAccumulator to be async and await the store binding (await mastra.getStorage()?.getStore('rollouts')) before calling accumulator.bind(...) and accumulator.start(), or alternatively ensure callers await ensureAccumulator so binding completes before any score operations; also keep the accumulator.bound guard (from getRolloutAccumulator()/accumulator.bound) to avoid double-binding and ensure any code that updates scores checks accumulator.bound before acting if you choose not to make callers await.
477-504: Consider adding a maximum iteration limit to prevent unbounded memory growth.The pagination loop fetches all scores for an agent, which could be unbounded for long-running agents. While pagination limits per-request memory, the accumulated
scoresByVersionmap could grow very large.♻️ Optional: Add safety limit
let page = 0; const pageSize = 500; let hasMore = true; + const maxPages = 100; // Limit to 50k scores max - while (hasMore) { + while (hasMore && page < maxPages) { const result = await scoresStore.listScoresByEntityId({ // ... }); // ... hasMore = result.pagination.hasMore; page++; } + + if (page >= maxPages) { + mastra.getLogger()?.warn('Rollout results truncated due to score volume', { agentId, pagesProcessed: page }); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 477 - 504, The pagination loop that uses scoresStore.listScoresByEntityId can accumulate an unbounded scoresByVersion map; add a safety cap (e.g., MAX_PAGES or MAX_TOTAL_ENTRIES) and enforce it inside the while loop (track pagesFetched and/or totalScoresCollected), break out and log a warning/error when the cap is reached to avoid unbounded memory growth; update references around page, pageSize, hasMore, and scoresByVersion so you stop fetching further pages and return partial results or an error when the limit is hit.
341-348: A/B test promotion defaults to stable version when none specified.When
versionIdis not provided for an A/B test promotion, the code defaults torollout.stableVersionId(line 343). This could be confusing since A/B tests typically require an explicit winner selection.Consider requiring
versionIdfor A/B test promotions or documenting this fallback behavior clearly.♻️ Optional: Require explicit winner for A/B tests
} else { // A/B test: caller specifies which version wins - promoteVersionId = versionId ?? rollout.stableVersionId; + if (!versionId) { + throw new HTTPException(400, { message: 'A/B test promotion requires specifying the winning versionId' }); + } + promoteVersionId = versionId; // Validate the version is part of the rollout🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 341 - 348, The current A/B promotion branch silently falls back to rollout.stableVersionId when versionId is omitted; change this to require an explicit versionId for A/B test promotions by throwing a 400 if versionId is null/undefined, rather than assigning rollout.stableVersionId. Locate the block using promoteVersionId, versionId, and rollout.stableVersionId (in the agent-rollouts handler) and replace the fallback logic with a validation that ensures versionId is provided and is present in rollout.allocations before setting promoteVersionId.stores/mongodb/src/storage/domains/rollouts/index.ts (2)
221-259: TOCTOU race in updateRollout between status check and update.The status validation (
existing.status !== 'active') at line 233 happens in a separate query from the update at line 246. A concurrent request could complete/rollback the rollout between these operations, causing the update to succeed on a non-active rollout.Consider combining the check into the atomic update filter:
♻️ Atomic status check in update
const updateData: Record<string, unknown> = { updatedAt: new Date() }; if (input.allocations) updateData.allocations = input.allocations; if (input.rules) updateData.rules = input.rules; const updated = await collection.findOneAndUpdate( - { id: input.id }, + { id: input.id, status: 'active' }, { $set: updateData }, { returnDocument: 'after' }, ); if (!updated) { throw new MastraError({ - id: createStorageErrorId('ROLLOUTS', 'UPDATE', 'NOT_FOUND'), + id: createStorageErrorId('ROLLOUTS', 'UPDATE', 'NOT_FOUND_OR_NOT_ACTIVE'), domain: ErrorDomain.STORAGE, category: ErrorCategory.USER, details: { rolloutId: input.id }, }); }This removes the TOCTOU window and the separate findOne can still be kept for more specific error messages if desired (not found vs. not active).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/mongodb/src/storage/domains/rollouts/index.ts` around lines 221 - 259, The updateRollout method has a TOCTOU race: it reads existing via collection.findOne then updates via collection.findOneAndUpdate, so status could change between calls; fix by making the status check part of the atomic update filter in collection.findOneAndUpdate (e.g., include { id: input.id, status: 'active' } in the query) and handle the returned result: if null => decide between not found vs not active (you can still optionally do a preliminary findOne for more specific MastraError using createStorageErrorId), ensure updateData is used with $set and then pass the result to transformRolloutRow; update references: updateRollout, TABLE_ROLLOUTS, collection.findOneAndUpdate, existing, createStorageErrorId, MastraError, transformRolloutRow.
30-34: Consider stricter date handling to avoid silent failures.The
toDatefunction returnsnew Date()as a fallback for invalid input, which could mask data corruption or bugs by silently substituting the current time. Consider logging a warning or throwing when encountering unexpected input types.♻️ Optional: Add warning for unexpected date input
function toDate(value: unknown): Date { if (value instanceof Date) return value; if (typeof value === 'string' || typeof value === 'number') return new Date(value); + // Log warning for unexpected input type that defaults to now + console.warn('toDate received unexpected value type, defaulting to current time:', typeof value); return new Date(); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stores/mongodb/src/storage/domains/rollouts/index.ts` around lines 30 - 34, The toDate function currently masks invalid inputs by returning new Date() which can hide bugs; update the toDate(value: unknown): Date implementation to validate inputs and fail loudly: if value is a Date return it, if it's a parsable string/number convert and verify resulting Date is valid (isNaN(date.getTime()) check), and for any other type or invalid parse either throw a descriptive Error (e.g., "Invalid date input in toDate") or at minimum log a warning via the module logger before throwing; ensure callers of toDate (same module) handle or propagate the thrown error appropriately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/core/src/storage/domains/rollouts/inmemory.ts`:
- Around line 84-89: completeRollout currently overwrites rollouts even when
they're already in a terminal state; update completeRollout(id, status,
completedAt?) so it only transitions when the existing record is active: fetch
existing via this.db.rollouts.get(id), if existing.status is not
RolloutStatus.Active then return the existing RolloutRecord unchanged (do not
modify completedAt or status), otherwise proceed to set the terminal status and
completedAt and persist; reference the completeRollout method, RolloutStatus
enum (e.g. cancelled/rolled_back), and this.db.rollouts to locate and implement
the guard to make terminal transitions idempotent and history immutable.
- Around line 47-63: createRollout currently allows overwriting an existing
rollout by id and permits multiple active rollouts for the same agentId; update
createRollout to enforce the same invariants as the persistent store by (1)
checking this.db.rollouts for an existing record with the same id and throwing
an error (or rejecting) instead of overwriting, and (2) scanning
this.db.rollouts values for any record with the same agentId and status ===
'active' and throwing an error if one exists so only one active rollout per
agentId is allowed; perform these checks before calling this.db.rollouts.set and
still return cloneRollout(rollout).
In `@stores/libsql/src/storage/domains/rollouts/index.ts`:
- Around line 126-149: The updateRollout implementation currently checks status
then calls this.#db.update unconditionally which allows races; modify the UPDATE
call in updateRollout (and likewise in completeRollout) to include the status
predicate (e.g., WHERE id = ? AND status = 'active') by passing a
conditional/update filter to this.#db.update for TABLE_ROLLOUTS (use the id and
status constraint together), then inspect the returned affected-rows count and
throw a "not found / no longer active" error when 0 rows were updated so stale
readers cannot overwrite terminal state.
- Around line 35-45: The rollout storage uses the completedAt column before it's
defined in the shared schema: update the shared ROLLOUTS_SCHEMA (the schema
constant referenced by init() / createTable for TABLE_ROLLOUTS) to include a
completedAt column with the appropriate type, then rebuild/regen any derived
select/insert/update helpers (e.g., buildSelectColumns or column lists used in
insert/update/select in this module) so operations that reference completedAt
(in insert, update, and reads around idx_rollouts_one_active_per_agent and other
queries) will succeed; ensure the completedAt field name exactly matches the
references in this file.
---
Duplicate comments:
In `@packages/core/src/storage/domains/rollouts/inmemory.ts`:
- Around line 13-18: cloneRollout, updateRollout, and completeRollout are still
storing caller-owned mutable objects (Date instances, allocation/rule
objects/arrays, completedAt) causing external mutations to affect the in-memory
Map; fix by defensively deep-cloning all nested mutable values before returning
or storing: in cloneRollout create fresh Date copies for any Date fields and
deep-clone allocations and rules (new objects for each nested property), and in
updateRollout and completeRollout ensure you store copies of the incoming
allocations, rules, and completedAt (not the original references) so the
internal Map never holds caller-owned mutable objects.
---
Nitpick comments:
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 61-86: The current ensureAccumulator uses fire-and-forget
storage.getStore('rollouts') which can leave the accumulator unbound when
requests occur; update ensureAccumulator to be async and await the store binding
(await mastra.getStorage()?.getStore('rollouts')) before calling
accumulator.bind(...) and accumulator.start(), or alternatively ensure callers
await ensureAccumulator so binding completes before any score operations; also
keep the accumulator.bound guard (from
getRolloutAccumulator()/accumulator.bound) to avoid double-binding and ensure
any code that updates scores checks accumulator.bound before acting if you
choose not to make callers await.
- Around line 477-504: The pagination loop that uses
scoresStore.listScoresByEntityId can accumulate an unbounded scoresByVersion
map; add a safety cap (e.g., MAX_PAGES or MAX_TOTAL_ENTRIES) and enforce it
inside the while loop (track pagesFetched and/or totalScoresCollected), break
out and log a warning/error when the cap is reached to avoid unbounded memory
growth; update references around page, pageSize, hasMore, and scoresByVersion so
you stop fetching further pages and return partial results or an error when the
limit is hit.
- Around line 341-348: The current A/B promotion branch silently falls back to
rollout.stableVersionId when versionId is omitted; change this to require an
explicit versionId for A/B test promotions by throwing a 400 if versionId is
null/undefined, rather than assigning rollout.stableVersionId. Locate the block
using promoteVersionId, versionId, and rollout.stableVersionId (in the
agent-rollouts handler) and replace the fallback logic with a validation that
ensures versionId is provided and is present in rollout.allocations before
setting promoteVersionId.
In `@stores/mongodb/src/storage/domains/rollouts/index.ts`:
- Around line 221-259: The updateRollout method has a TOCTOU race: it reads
existing via collection.findOne then updates via collection.findOneAndUpdate, so
status could change between calls; fix by making the status check part of the
atomic update filter in collection.findOneAndUpdate (e.g., include { id:
input.id, status: 'active' } in the query) and handle the returned result: if
null => decide between not found vs not active (you can still optionally do a
preliminary findOne for more specific MastraError using createStorageErrorId),
ensure updateData is used with $set and then pass the result to
transformRolloutRow; update references: updateRollout, TABLE_ROLLOUTS,
collection.findOneAndUpdate, existing, createStorageErrorId, MastraError,
transformRolloutRow.
- Around line 30-34: The toDate function currently masks invalid inputs by
returning new Date() which can hide bugs; update the toDate(value: unknown):
Date implementation to validate inputs and fail loudly: if value is a Date
return it, if it's a parsable string/number convert and verify resulting Date is
valid (isNaN(date.getTime()) check), and for any other type or invalid parse
either throw a descriptive Error (e.g., "Invalid date input in toDate") or at
minimum log a warning via the module logger before throwing; ensure callers of
toDate (same module) handle or propagate the thrown error appropriately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 7a98c479-d83d-40cb-92a7-1d345e0a3a37
📒 Files selected for processing (9)
packages/core/src/storage/domains/rollouts/inmemory.tspackages/core/src/storage/types.tspackages/server/src/server/handlers/agent-rollouts.tspackages/server/src/server/handlers/agents.tsstores/clickhouse/src/storage/db/utils.tsstores/cloudflare/src/kv/storage/types.tsstores/libsql/src/storage/domains/rollouts/index.tsstores/mongodb/src/storage/domains/rollouts/index.tsstores/pg/src/storage/domains/rollouts/index.ts
✅ Files skipped from review due to trivial changes (1)
- packages/core/src/storage/types.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- stores/pg/src/storage/domains/rollouts/index.ts
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/server/src/server/handlers/agent-rollouts.ts (2)
77-94: Add.catch()to handle promise rejection.If
storage.getStore('rollouts')rejects, the promise chain has no error handler, resulting in an unhandled promise rejection. The innertry/catch(lines 82-90) only handles errors within the.then()callback.♻️ Proposed fix
void storage.getStore('rollouts').then((rolloutsStore: RolloutsStorage | undefined) => { if (!rolloutsStore) return; accumulator.bind(rolloutsStore, async (agentId: string, rolloutId: string) => { // Rollback handler: mark rollout as rolled_back try { await rolloutsStore.completeRollout(rolloutId, 'rolled_back', new Date()); accumulator.clearAgent(agentId); mastra.getLogger()?.info('Rollout auto-rolled back', { agentId, rolloutId }); // Clear editor cache so subsequent requests use the stable version mastra.getEditor()?.agent.clearCache(agentId); } catch (err) { mastra.getLogger()?.error('Failed to auto-rollback rollout', { agentId, rolloutId, error: err }); } }); accumulator.start(); - }); + }).catch(err => { + mastra.getLogger()?.error('Failed to initialize rollout accumulator', { error: err }); + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 77 - 94, storage.getStore('rollouts') is called without a rejection handler so a rejected promise will cause an unhandled rejection; add a .catch() after the .then(...) to handle errors from storage.getStore('rollouts'), log the error using mastra.getLogger()?.error with context (e.g., mentioning rollouts store load failure), and ensure no further work (like accumulator.start()) runs on failure; update the chain where storage.getStore('rollouts') is invoked (the promise that leads into accumulator.bind and accumulator.start) to include this catch.
354-361: Consider requiring explicitversionIdfor A/B test promotion.Defaulting to
stableVersionIdwhen no winner is specified could lead to unintentional outcomes. For A/B tests, users should typically make a deliberate choice about which version won. Consider throwing a 400 error ifversionIdis not provided for A/B tests.💡 Proposed change
} else { // A/B test: caller specifies which version wins - promoteVersionId = versionId ?? rollout.stableVersionId; + if (!versionId) { + throw new HTTPException(400, { message: 'A/B test promotion requires specifying the winning versionId' }); + } + promoteVersionId = versionId; // Validate the version is part of the rollout if (!rollout.allocations.some(a => a.versionId === promoteVersionId)) { throw new HTTPException(400, { message: `Version ${promoteVersionId} is not part of this rollout` }); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/server/src/server/handlers/agent-rollouts.ts` around lines 354 - 361, Change the A/B promotion logic so a winner must be explicitly provided instead of defaulting to rollout.stableVersionId: when handling the branch that currently sets promoteVersionId = versionId ?? rollout.stableVersionId, require that versionId is present for A/B test rollouts (check rollout.type for the A/B value used in this codebase) and throw an HTTPException(400) if it is missing; keep the subsequent validation that promoteVersionId exists in rollout.allocations (referencing promoteVersionId, rollout.stableVersionId, and rollout.allocations) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 448-454: The CANCEL_ROLLOUT_ROUTE completes the rollout and clears
the accumulator but misses clearing the editor cache like ROLLBACK_ROLLOUT_ROUTE
does; add the same editor-cache clearing call used in ROLLBACK_ROLLOUT_ROUTE
(the mastra editor cache clear invocation) into CANCEL_ROLLOUT_ROUTE immediately
after rolloutsStore.completeRollout(...) and before returning so the editor
cache is consistent—use the same identifiers (rollout.id and agentId) as in the
rollback handler.
---
Nitpick comments:
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 77-94: storage.getStore('rollouts') is called without a rejection
handler so a rejected promise will cause an unhandled rejection; add a .catch()
after the .then(...) to handle errors from storage.getStore('rollouts'), log the
error using mastra.getLogger()?.error with context (e.g., mentioning rollouts
store load failure), and ensure no further work (like accumulator.start()) runs
on failure; update the chain where storage.getStore('rollouts') is invoked (the
promise that leads into accumulator.bind and accumulator.start) to include this
catch.
- Around line 354-361: Change the A/B promotion logic so a winner must be
explicitly provided instead of defaulting to rollout.stableVersionId: when
handling the branch that currently sets promoteVersionId = versionId ??
rollout.stableVersionId, require that versionId is present for A/B test rollouts
(check rollout.type for the A/B value used in this codebase) and throw an
HTTPException(400) if it is missing; keep the subsequent validation that
promoteVersionId exists in rollout.allocations (referencing promoteVersionId,
rollout.stableVersionId, and rollout.allocations) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 5dc891a1-c422-437c-a27a-0bac6959cb1c
📒 Files selected for processing (1)
packages/server/src/server/handlers/agent-rollouts.ts
- Add active-rollout invariant to in-memory createRollout - Guard completeRollout against non-active rollouts (in-memory) - Add WHERE status='active' to LibSQL/PG/MongoDB updateRollout/completeRollout - Decouple rollout accumulation from legacy score-store persistence - Add defensive check for empty allocations in pickAllocation - Make hash test deterministic - Trim changesets to outcome-focused summaries Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/core/src/storage/domains/rollouts/inmemory.ts`:
- Around line 84-91: updateRollout is storing input.allocations and input.rules
directly into the in-memory Map which allows callers to mutate stored state;
instead, clone the arrays/objects before assigning to the RolloutRecord (similar
to createRollout's protection) so the backing this.db.rollouts holds copies,
e.g. create a new RolloutRecord in updateRollout that spreads existing and uses
cloned allocations and rules (or call cloneRollout on the assembled record)
before calling this.db.rollouts.set(input.id, ...).
In `@stores/libsql/src/storage/domains/rollouts/index.ts`:
- Around line 166-192: The listRollouts implementation always applies "LIMIT ?
OFFSET ?" and computes hasMore from perPage even when the caller passed perPage:
false (meaning "return all"), so change listRollouts to detect the original
perPageInput (or perPage === false) and in that case run the SELECT without
LIMIT/OFFSET and without adding those args, set pagination.perPage to false (or
perPageForResponse === false) and set hasMore to false; otherwise keep the
current behavior (build SQL with ORDER BY ... LIMIT ? OFFSET ? and pass args
[agentId, perPage, offset]) — update the SQL/args construction and the returned
pagination.hasMore logic in listRollouts to branch on perPageInput/perPage to
preserve the in-memory semantics.
In `@stores/mongodb/src/storage/domains/rollouts/index.ts`:
- Around line 104-113: The code currently swallows errors in
createDefaultIndexes() when collection.createIndex(...) fails, which can leave
out the required unique partial index used by createRollout() and
getActiveRollout(); change the behavior so failures for required indexes fail
initialization instead of only warning. In practice, in createDefaultIndexes()
(and while iterating getDefaultIndexDefinitions()) detect index creation
failures and rethrow the error (or throw a new Error) after logging so startup
aborts; ensure you still respect `#skipDefaultIndexes` but do not suppress errors
from createIndex for the unique partial index definition returned by
getDefaultIndexDefinitions(), so callers of createDefaultIndexes() cannot
proceed if the index wasn't created. Reference: createDefaultIndexes,
getDefaultIndexDefinitions, collection.createIndex, createRollout,
getActiveRollout, and this.logger.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: eb027a78-ce41-4fb4-a3df-fda3407537e6
📒 Files selected for processing (12)
.changeset/lazy-deer-stare.md.changeset/lovely-glasses-bet.md.changeset/sharp-friends-appear.mdpackages/core/src/agent/__tests__/rollout-lifecycle.test.tspackages/core/src/agent/__tests__/rollout.test.tspackages/core/src/agent/rollout.tspackages/core/src/mastra/hooks.tspackages/core/src/storage/domains/rollouts/inmemory.tspackages/server/src/server/handlers/agent-rollouts.tsstores/libsql/src/storage/domains/rollouts/index.tsstores/mongodb/src/storage/domains/rollouts/index.tsstores/pg/src/storage/domains/rollouts/index.ts
✅ Files skipped from review due to trivial changes (4)
- .changeset/lovely-glasses-bet.md
- .changeset/sharp-friends-appear.md
- .changeset/lazy-deer-stare.md
- packages/core/src/agent/tests/rollout-lifecycle.test.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/core/src/agent/tests/rollout.test.ts
- stores/pg/src/storage/domains/rollouts/index.ts
- packages/server/src/server/handlers/agent-rollouts.ts
- packages/core/src/agent/rollout.ts
…ollout on version activate - Add ensureAgentExists helper to all 8 rollout API handlers so they return 404 for non-existent agents (matching other agent routes) - Auto-cancel any active rollout when a version is explicitly activated via the publish/activate endpoint — publishing is a deliberate override - Add getRolloutAccumulator to mock editor in server-adapter test helpers - Pre-populate an active rollout fixture for test-agent in test context - Exclude start-rollout and promote-rollout routes from auto-generated adapter tests (require complex setup: discriminated union body, agent in store) Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/server/src/server/handlers/agent-rollouts.ts`:
- Around line 535-550: The loop currently accumulates all historical scores for
agentId (via scoresStore.listScoresByEntityId) and groups them into
scoresByVersion without regard to when they were produced; restrict accumulation
to only scores created during the current rollout window by checking each
score's timestamp (e.g., score.createdAt or score.timestamp) against the
rollout's start/end times before grouping. Update the inner loop inside the
while(hasMore) to parse the score timestamp and continue (skip) if it's before
rollout.startTime or after rollout.endTime (or if endTime is not set, only allow
>= startTime); then only add the score to scoresByVersion when it falls inside
the window. Ensure you reference the existing symbols:
scoresStore.listScoresByEntityId, score.entity.resolvedVersionId,
scoresByVersion, and the rollout window variables you have in scope (e.g.,
rollout.startTime / rollout.endTime).
- Around line 240-251: The code allows candidateVersionId to equal
stableVersionId which creates duplicate allocations; add a validation in the
agent-rollouts handler (before building allocations/persisting) that rejects
when body.candidateVersionId === stableVersionId (e.g., throw an HTTPException
400 with a clear message) after fetching candidateVersion via
agentsStore.getVersion and before constructing the allocations array so
UPDATE_ROLLOUT_ROUTE and PROMOTE_ROLLOUT_ROUTE retain a distinct candidate.
- Around line 207-228: The current logic allows rollouts for code-only agents by
using mastra.getAgentById().toRawConfig().resolvedVersionId as stableVersionId
but later calls agentsStore.update(...) during promotion which will fail if no
DB row exists; change the flow so that if agentsStore.getById(agentId) returns
null you either (A) reject the request with an HTTPException (400) requiring a
persisted agent, or (B) create/persist a minimal agent record before proceeding
so subsequent agentsStore.update(...) succeeds; implement the chosen approach by
checking the result of agentsStore.getById, and if absent use
agentsStore.create(...) to insert a minimal agent row (including id and
resolvedVersionId) or throw the error, and then continue using stableVersionId
from toRawConfig().
In `@packages/server/src/server/handlers/agent-versions.ts`:
- Around line 290-305: The activation currently swallows all errors during
rollout cancellation so failures in rolloutsStore.completeRollout() still let
activation set activeVersionId and return 200; change this so only the "rollouts
store unavailable" case is ignored and any other errors abort the request.
Specifically, when calling storage.getStore('rollouts') /
rolloutsStore.getActiveRollout(agentId) and then
rolloutsStore.completeRollout(activeRollout.id,...), do not catch and ignore all
exceptions — detect the specific error that indicates the rollouts store is
missing (or check rolloutsStore === undefined) and swallow only that; otherwise
rethrow or return an error response so activation does not proceed (ensure
cancelledRolloutId remains undefined on failure and propagate the error up). Use
the same symbols shown (storage.getStore, rolloutsStore, getActiveRollout,
completeRollout, cancelledRolloutId) to locate the code to change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 88f90918-ccfa-40a3-92d0-35255a3ec32a
📒 Files selected for processing (4)
packages/server/src/server/handlers/agent-rollouts.tspackages/server/src/server/handlers/agent-versions.tsserver-adapters/_test-utils/src/route-adapter-test-suite.tsserver-adapters/_test-utils/src/test-helpers.ts
- Clone input.allocations/rules in in-memory updateRollout to prevent mutation leak - Fix LibSQL/MongoDB listRollouts to skip LIMIT/OFFSET when perPage is false - Re-throw unique index creation failures in MongoDB (critical for invariants) - Reject rollouts for code-only agents that have no stored record - Disallow candidateVersionId === stableVersionId in canary rollouts - Filter rollout results to only include scores within the rollout window - Narrow try/catch in activate-version so completeRollout failures propagate Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
|
|
||
| export const rolloutAllocationSchema = z.object({ | ||
| versionId: z.string().describe('Agent version ID for this allocation'), | ||
| weight: z.number().int().min(0).max(100).describe('Traffic weight (0-100)'), |
There was a problem hiding this comment.
I'd suggest making weight fractional... IE between 0 and 1.
For large customers starting, 1% may be too much traffic for an initial candidate.
Agent Rollouts & Experimentation
Adds canary rollouts and A/B experiments for agents so you can gradually shift traffic between agent versions and measure which performs better.
How it works
Traffic is split using deterministic hash-based routing —
hash(resourceId + agentId) % 100— so each user gets a consistent version throughout a rollout. No version flip-flopping mid-conversation.When an active rollout exists and no explicit
agentVersionIdis in the request, generate and stream endpoints automatically resolve the version from the rollout's traffic split. This is transparent to callers.A background accumulator watches scorer results every 30s and auto-rolls back to the stable version if scores drop below configured thresholds. The accumulator uses in-memory circular buffers (O(1) push) so there's no per-request DB overhead for rule evaluation.
New storage schema:
mastra_rolloutsNew endpoints
GET/agents/:agentId/rolloutPOST/agents/:agentId/rolloutPATCH/agents/:agentId/rolloutPOST/agents/:agentId/rollout/promotePOST/agents/:agentId/rollout/rollbackDELETE/agents/:agentId/rolloutGET/agents/:agentId/rollout/resultsGET/agents/:agentId/rolloutsUsage examples
Start a canary rollout — route 10% of traffic to a new version, auto-rollback if quality drops:
Start an A/B experiment — 50/50 split to compare two versions:
Ramp up a canary:
Promote the candidate:
POST /agents/:agentId/rollout/promoteKey types
Auto-rollback
The
RolloutAccumulatorcollects scorer results in-memory during active rollouts. Every 30s it checks rollout rules against accumulated scores:Packages changed
@mastra/core@mastra/server@mastra/editorEditorAgentNamespace@mastra/libsqlmastra_rollouts@mastra/pgmastra_rollouts@mastra/mongodbmastra_rolloutsTests
55 new tests (34 unit + 21 integration), zero regressions on the existing 6173 test suite.
ELI5
This PR lets you roll out new agent versions to a subset of traffic (canary or A/B), monitor their performance, and automatically or manually promote/rollback based on live scorer metrics so bad versions can be removed before they impact everyone.
Overview
Implements deterministic, sticky agent rollouts and experiments with transparent version resolution for generate/stream calls when no explicit version is provided. Adds in-memory live score accumulation, periodic rule evaluation for auto-rollback, full rollout lifecycle APIs, storage schema and adapters, and test coverage.
Key Features
HTTP API Endpoints (under /agents/:agentId/rollout*)
API handlers validate agent existence, prevent creating multiple active rollouts per agent, enforce allocation/ownership/weight rules, and wire accumulator lifecycle.
Storage
Types & Schemas
Server & Editor Integration
Packages Changed
Testing
Notable Hardening & Fixes