Skip to content

Commit dc7b9ff

Browse files
committed
fix(tll): atomic append with advisory lock + unique position index (review #1)
Replace the read-then-insert pattern in TllRepository.append() with an atomic INSERT...SELECT serialized by pg_advisory_xact_lock keyed on (user_id, entity_id). Concurrent appends to the same entity chain now serialize on the lock and compute predecessor + position from the latest committed row, eliminating the TOCTOU race where two parallel callers read the same MAX(position_in_chain) and both wrote at tip+1. Add a UNIQUE (user_id, entity_id, position_in_chain) index as defense-in-depth so any future code path that bypasses the lock fails loudly at the DB layer rather than silently producing duplicate positions. Add an integration test that fires three concurrent appends to the same chain via Promise.all and asserts positions 0,1,2 with correctly wired predecessor pointers.
1 parent 6980e87 commit dc7b9ff

3 files changed

Lines changed: 90 additions & 41 deletions

File tree

src/db/__tests__/repository-tll.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,27 @@ describe('TllRepository', () => {
124124

125125
expect(await countRows(ent.id, mem.id)).toBe(1);
126126
});
127+
128+
it('serializes concurrent appends to the same chain without duplicate positions', async () => {
129+
const ent = await makeEntity('Concurrent', 121);
130+
const mA = await makeMemory('concA', 122, new Date('2026-05-01'));
131+
const mB = await makeMemory('concB', 123, new Date('2026-05-02'));
132+
const mC = await makeMemory('concC', 124, new Date('2026-05-03'));
133+
134+
await Promise.all([
135+
tllRepo.append(TEST_USER, mA.id, [ent.id], mA.date),
136+
tllRepo.append(TEST_USER, mB.id, [ent.id], mB.date),
137+
tllRepo.append(TEST_USER, mC.id, [ent.id], mC.date),
138+
]);
139+
140+
const events = await tllRepo.chain(TEST_USER, ent.id);
141+
expect(events).toHaveLength(3);
142+
expect(events.map((e) => e.positionInChain)).toEqual([0, 1, 2]);
143+
expect(new Set(events.map((e) => e.memoryId)).size).toBe(3);
144+
expect(events[0].predecessorMemoryId).toBeNull();
145+
expect(events[1].predecessorMemoryId).toBe(events[0].memoryId);
146+
expect(events[2].predecessorMemoryId).toBe(events[1].memoryId);
147+
});
127148
});
128149

129150
describe('chain()', () => {

src/db/repository-tll.ts

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
* paying the full graph-DB cost. Each new memory referencing an entity
77
* appends an event node; the predecessor pointer lets us traverse the
88
* chain backward at query time for EO/MSR/TR questions.
9-
*
10-
* The CROME proposal calls this primitive necessary for temporal reasoning
11-
* at 10M scale; Mem0 explicitly admits their architecture lacks it.
129
*/
1310

1411
import pg from 'pg';
@@ -20,13 +17,25 @@ export interface TLLEvent {
2017
positionInChain: number;
2118
}
2219

20+
// Stable namespace for pg_advisory_xact_lock keying. Keeps TLL appends from
21+
// colliding with unrelated advisory-lock callers in the same process.
22+
const TLL_ADVISORY_LOCK_NAMESPACE = 0x544c4c00; // "TLL\0"
23+
2324
export class TllRepository {
2425
constructor(private pool: pg.Pool) {}
2526

2627
/**
2728
* Append an event node to each entity's chain. Idempotent on
2829
* (user_id, entity_id, memory_id). Predecessor is the most-recent
2930
* existing event for the entity; position is len(chain).
31+
*
32+
* Race-safety: each (user_id, entity_id) append runs inside a transaction
33+
* guarded by `pg_advisory_xact_lock`. Concurrent appends targeting the
34+
* same chain serialize on the lock, then compute the next position from
35+
* committed rows via INSERT...SELECT. The
36+
* `(user_id, entity_id, position_in_chain)` unique index is the
37+
* defense-in-depth backstop that fails loudly if any caller bypasses the
38+
* lock path.
3039
*/
3140
async append(
3241
userId: string,
@@ -37,47 +46,59 @@ export class TllRepository {
3746
if (entityIds.length === 0) return;
3847
const uniqueEntities = [...new Set(entityIds)];
3948

40-
// Find current chain tip per entity in one query
41-
const tipResult = await this.pool.query(
42-
`SELECT entity_id,
43-
memory_id AS predecessor_memory_id,
44-
position_in_chain
45-
FROM temporal_linkage_list t1
46-
WHERE user_id = $1 AND entity_id = ANY($2::uuid[])
47-
AND position_in_chain = (
48-
SELECT MAX(position_in_chain)
49-
FROM temporal_linkage_list t2
50-
WHERE t2.user_id = t1.user_id AND t2.entity_id = t1.entity_id
51-
)`,
52-
[userId, uniqueEntities],
53-
);
54-
const tips = new Map<string, { predecessorId: string; position: number }>();
55-
for (const row of tipResult.rows) {
56-
tips.set(row.entity_id, {
57-
predecessorId: row.predecessor_memory_id,
58-
position: row.position_in_chain,
59-
});
49+
// Per-entity transactions keep the advisory-lock scope narrow and let
50+
// independent chains proceed in parallel.
51+
for (const entityId of uniqueEntities) {
52+
await this.appendOne(userId, memoryId, entityId, observationDate);
6053
}
54+
}
6155

62-
// Batch insert
63-
const values: string[] = [];
64-
const params: unknown[] = [];
65-
let p = 1;
66-
for (const entityId of uniqueEntities) {
67-
const tip = tips.get(entityId);
68-
const predecessor = tip ? tip.predecessorId : null;
69-
const position = tip ? tip.position + 1 : 0;
70-
values.push(`($${p}, $${p + 1}, $${p + 2}, $${p + 3}, $${p + 4}, $${p + 5})`);
71-
params.push(userId, entityId, memoryId, predecessor, observationDate, position);
72-
p += 6;
56+
/**
57+
* Append one (entity, memory) row under an advisory lock keyed on the
58+
* chain. The INSERT...SELECT computes predecessor + position inline from
59+
* the latest committed row, so the read-then-write window the previous
60+
* implementation exposed cannot reorder concurrent appends.
61+
*/
62+
private async appendOne(
63+
userId: string,
64+
memoryId: string,
65+
entityId: string,
66+
observationDate: Date,
67+
): Promise<void> {
68+
const client = await this.pool.connect();
69+
try {
70+
await client.query('BEGIN');
71+
await client.query('SELECT pg_advisory_xact_lock($1, hashtext($2))', [
72+
TLL_ADVISORY_LOCK_NAMESPACE,
73+
`${userId}:${entityId}`,
74+
]);
75+
await client.query(
76+
`INSERT INTO temporal_linkage_list
77+
(user_id, entity_id, memory_id, predecessor_memory_id,
78+
observation_date, position_in_chain)
79+
SELECT
80+
$1, $2, $3,
81+
(SELECT memory_id FROM temporal_linkage_list
82+
WHERE user_id = $1 AND entity_id = $2
83+
ORDER BY position_in_chain DESC LIMIT 1),
84+
$4,
85+
COALESCE(
86+
(SELECT MAX(position_in_chain) FROM temporal_linkage_list
87+
WHERE user_id = $1 AND entity_id = $2),
88+
-1
89+
) + 1
90+
ON CONFLICT (user_id, entity_id, memory_id) DO NOTHING`,
91+
[userId, entityId, memoryId, observationDate],
92+
);
93+
await client.query('COMMIT');
94+
} catch (err) {
95+
await client.query('ROLLBACK').catch((rollbackErr) =>
96+
console.error('[tll] rollback failed:', rollbackErr instanceof Error ? rollbackErr.message : rollbackErr),
97+
);
98+
throw err;
99+
} finally {
100+
client.release();
73101
}
74-
await this.pool.query(
75-
`INSERT INTO temporal_linkage_list
76-
(user_id, entity_id, memory_id, predecessor_memory_id, observation_date, position_in_chain)
77-
VALUES ${values.join(', ')}
78-
ON CONFLICT (user_id, entity_id, memory_id) DO NOTHING`,
79-
params,
80-
);
81102
}
82103

83104
/**

src/db/schema.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,13 @@ CREATE INDEX IF NOT EXISTS idx_tll_entity_chain
346346
CREATE INDEX IF NOT EXISTS idx_tll_memory
347347
ON temporal_linkage_list (memory_id);
348348

349+
-- Defense-in-depth: unique (chain, position) so any future code path that
350+
-- bypasses the advisory-lock append fails at the DB layer instead of
351+
-- silently producing duplicate positions. Idempotent for fresh and
352+
-- existing schemas.
353+
CREATE UNIQUE INDEX IF NOT EXISTS idx_tll_chain_position_unique
354+
ON temporal_linkage_list (user_id, entity_id, position_in_chain);
355+
349356
-- =====================================================================
350357
-- First-mention events (chronological topic-introduction list)
351358
-- =====================================================================

0 commit comments

Comments
 (0)