66 * paying the full graph-DB cost. Each new memory referencing an entity
77 * appends an event node; the predecessor pointer lets us traverse the
88 * chain backward at query time for EO/MSR/TR questions.
9- *
10- * The CROME proposal calls this primitive necessary for temporal reasoning
11- * at 10M scale; Mem0 explicitly admits their architecture lacks it.
129 */
1310
1411import pg from 'pg' ;
@@ -20,13 +17,25 @@ export interface TLLEvent {
2017 positionInChain : number ;
2118}
2219
20+ // Stable namespace for pg_advisory_xact_lock keying. Keeps TLL appends from
21+ // colliding with unrelated advisory-lock callers in the same process.
22+ const TLL_ADVISORY_LOCK_NAMESPACE = 0x544c4c00 ; // "TLL\0"
23+
2324export class TllRepository {
2425 constructor ( private pool : pg . Pool ) { }
2526
2627 /**
2728 * Append an event node to each entity's chain. Idempotent on
2829 * (user_id, entity_id, memory_id). Predecessor is the most-recent
2930 * existing event for the entity; position is len(chain).
31+ *
32+ * Race-safety: each (user_id, entity_id) append runs inside a transaction
33+ * guarded by `pg_advisory_xact_lock`. Concurrent appends targeting the
34+ * same chain serialize on the lock, then compute the next position from
35+ * committed rows via INSERT...SELECT. The
36+ * `(user_id, entity_id, position_in_chain)` unique index is the
37+ * defense-in-depth backstop that fails loudly if any caller bypasses the
38+ * lock path.
3039 */
3140 async append (
3241 userId : string ,
@@ -37,47 +46,59 @@ export class TllRepository {
3746 if ( entityIds . length === 0 ) return ;
3847 const uniqueEntities = [ ...new Set ( entityIds ) ] ;
3948
40- // Find current chain tip per entity in one query
41- const tipResult = await this . pool . query (
42- `SELECT entity_id,
43- memory_id AS predecessor_memory_id,
44- position_in_chain
45- FROM temporal_linkage_list t1
46- WHERE user_id = $1 AND entity_id = ANY($2::uuid[])
47- AND position_in_chain = (
48- SELECT MAX(position_in_chain)
49- FROM temporal_linkage_list t2
50- WHERE t2.user_id = t1.user_id AND t2.entity_id = t1.entity_id
51- )` ,
52- [ userId , uniqueEntities ] ,
53- ) ;
54- const tips = new Map < string , { predecessorId : string ; position : number } > ( ) ;
55- for ( const row of tipResult . rows ) {
56- tips . set ( row . entity_id , {
57- predecessorId : row . predecessor_memory_id ,
58- position : row . position_in_chain ,
59- } ) ;
49+ // Per-entity transactions keep the advisory-lock scope narrow and let
50+ // independent chains proceed in parallel.
51+ for ( const entityId of uniqueEntities ) {
52+ await this . appendOne ( userId , memoryId , entityId , observationDate ) ;
6053 }
54+ }
6155
62- // Batch insert
63- const values : string [ ] = [ ] ;
64- const params : unknown [ ] = [ ] ;
65- let p = 1 ;
66- for ( const entityId of uniqueEntities ) {
67- const tip = tips . get ( entityId ) ;
68- const predecessor = tip ? tip . predecessorId : null ;
69- const position = tip ? tip . position + 1 : 0 ;
70- values . push ( `($${ p } , $${ p + 1 } , $${ p + 2 } , $${ p + 3 } , $${ p + 4 } , $${ p + 5 } )` ) ;
71- params . push ( userId , entityId , memoryId , predecessor , observationDate , position ) ;
72- p += 6 ;
56+ /**
57+ * Append one (entity, memory) row under an advisory lock keyed on the
58+ * chain. The INSERT...SELECT computes predecessor + position inline from
59+ * the latest committed row, so the read-then-write window the previous
60+ * implementation exposed cannot reorder concurrent appends.
61+ */
62+ private async appendOne (
63+ userId : string ,
64+ memoryId : string ,
65+ entityId : string ,
66+ observationDate : Date ,
67+ ) : Promise < void > {
68+ const client = await this . pool . connect ( ) ;
69+ try {
70+ await client . query ( 'BEGIN' ) ;
71+ await client . query ( 'SELECT pg_advisory_xact_lock($1, hashtext($2))' , [
72+ TLL_ADVISORY_LOCK_NAMESPACE ,
73+ `${ userId } :${ entityId } ` ,
74+ ] ) ;
75+ await client . query (
76+ `INSERT INTO temporal_linkage_list
77+ (user_id, entity_id, memory_id, predecessor_memory_id,
78+ observation_date, position_in_chain)
79+ SELECT
80+ $1, $2, $3,
81+ (SELECT memory_id FROM temporal_linkage_list
82+ WHERE user_id = $1 AND entity_id = $2
83+ ORDER BY position_in_chain DESC LIMIT 1),
84+ $4,
85+ COALESCE(
86+ (SELECT MAX(position_in_chain) FROM temporal_linkage_list
87+ WHERE user_id = $1 AND entity_id = $2),
88+ -1
89+ ) + 1
90+ ON CONFLICT (user_id, entity_id, memory_id) DO NOTHING` ,
91+ [ userId , entityId , memoryId , observationDate ] ,
92+ ) ;
93+ await client . query ( 'COMMIT' ) ;
94+ } catch ( err ) {
95+ await client . query ( 'ROLLBACK' ) . catch ( ( rollbackErr ) =>
96+ console . error ( '[tll] rollback failed:' , rollbackErr instanceof Error ? rollbackErr . message : rollbackErr ) ,
97+ ) ;
98+ throw err ;
99+ } finally {
100+ client . release ( ) ;
73101 }
74- await this . pool . query (
75- `INSERT INTO temporal_linkage_list
76- (user_id, entity_id, memory_id, predecessor_memory_id, observation_date, position_in_chain)
77- VALUES ${ values . join ( ', ' ) }
78- ON CONFLICT (user_id, entity_id, memory_id) DO NOTHING` ,
79- params ,
80- ) ;
81102 }
82103
83104 /**
0 commit comments