Skip to content

PR #5450 edge delta merge incomplete: SqliteMemory.put() still causes edge loss under real workloads #5475

@ahzan-dev

Description

@ahzan-dev

Summary

PR #5450 (issue_5446 branch) adds delta-based edge merging to SqliteMemory.sync() to prevent concurrent walkers from overwriting each other's edge changes. The fix works for simple scenarios but fails under real workloads (e.g., Jac Builder IDE with AI chat) because SqliteMemory.put() — called from Jac.save() — still does immediate INSERT OR REPLACE without any delta merge.

Related Issues

What PR #5450 Fixes

Scenario Result
Minimal repro (add_item + 30 concurrent touch, 10 rounds) 0 edges lost
Jac Builder API (create project + 12 concurrent IDE requests, no AI chat) 0 edges lost

What PR #5450 Does NOT Fix

Scenario Result
Jac Builder with AI chat (create project + ai_chat start + concurrent requests) Root→UserProfile edge lost every time

Evidence

With PR #5450 installed (jaclang from issue_5446 branch), starting AI chat on a project causes the Root→UserProfile edge to be erased within seconds:

# First 3 me calls — correct profile
{'user_id': '87d20ed0-b577-42af-8996-c2ed3d47ce2e', 'display_name': 'aitest_8f8e7f'}
{'user_id': '87d20ed0-b577-42af-8996-c2ed3d47ce2e', 'display_name': 'aitest_8f8e7f'}
{'user_id': '87d20ed0-b577-42af-8996-c2ed3d47ce2e', 'display_name': 'aitest_8f8e7f'}

# After ai_chat start + concurrent requests — profile lost, new one created each time
{'user_id': '5e052641-6b1f-4eb6-a1e7-78c3f72b5cef', 'display_name': ''}  ← NEW
{'user_id': 'fc688e48-1a5e-4a43-90d2-56adadae1a71', 'display_name': ''}  ← NEW
{'user_id': 'ef7499aa-8451-4977-b544-cccd807afbe4', 'display_name': ''}  ← NEW
# ... every subsequent me call creates yet another profile

The profile is lost permanently. All projects (attached to the original profile) become inaccessible.

Root Cause

Two write paths to SQLite — only one has delta merge

Path 1: sync() (commit-time) — HAS delta merge (PR #5450)

request ends → ctx.mem.commit() → TieredMemory.commit() → SqliteMemory.sync()
  → BEGIN IMMEDIATE
  → reads stored from DB
  → computes delta (added/removed edges)
  → merges: (stored_edges ∪ added) - removed
  → writes merged result
  → COMMIT

Path 2: put() (immediate) — NO delta merge

Jac.save(anchor) → ctx.mem.put(anchor) → TieredMemory.put() → SqliteMemory.put()
  → INSERT OR REPLACE INTO anchors (id, data) VALUES (?, dumps(anchor))
  → commit()
  // Full document replace, no merge, no transaction isolation

SqliteMemory.put() at memory.impl.jac:237:

self.__conn__.execute(
    "INSERT OR REPLACE INTO anchors (id, data) VALUES (?, ?)",
    (str(anchor.id), dumps(anchor))  # entire anchor blob, including edge list
);
self.__conn__.commit();  # immediate, outside BEGIN IMMEDIATE

When put() is called

Jac.save()TieredMemory.put()SqliteMemory.put() is triggered by:

  1. build_edge()save(edge)save(target) cascade — When creating an edge via ++>, save() is called on the edge anchor, which cascades to save the target node (if not yet persistent). This writes the target's entire anchor blob (including its edge list) immediately.

  2. Any explicit Jac.save(node) — Direct save calls from walker code or libraries (e.g., jac-coder calls save(root()) and commit() during session management).

Why the real workload triggers it

The Jac Builder IDE fires 12+ concurrent walker requests on page load. When ai_chat start runs:

  1. Walker loads Root → UserProfile → Project into L1 memory
  2. Walker creates Project ++> JacCoderMessage(...) — cascading save writes the new message node via put()
  3. Walker commits via sync() (with delta merge)
  4. Meanwhile, 5+ concurrent walkers (git_ops, me, version_ops, etc.) each have their own TieredMemory with stale copies of the root
  5. These concurrent requests' sync() calls interleave with the put() calls
  6. The put() writes bypass BEGIN IMMEDIATE, so there's no serialization with the concurrent sync() transactions

Suggested Fix

Option A: Add delta merge to put() for NodeAnchors

When SqliteMemory.put() receives a NodeAnchor, use the same delta merge logic as sync():

def put(anchor):
    if isinstance(anchor, NodeAnchor) and anchor.persistent:
        delta = anchor.edge_delta()
        if delta is not None:
            # Read current from DB, apply delta, write merged
            _put_node_atomic(anchor, delta)
            return
    # Fallback for non-node anchors or new nodes
    INSERT OR REPLACE ...

Option B: Remove immediate writes from put()

Make put() only write to L1 memory (__mem__), deferring all SQLite writes to sync(). This is simpler but changes the persistence semantics (data could be lost if the process crashes before sync()).

Option C: Per-anchor write lock

Add a threading.Lock per anchor ID (or per user root) to serialize all writes to the same anchor, whether from put() or sync().

Reproduction

Test script (requires running Jac Builder)

import requests, time, uuid, concurrent.futures

BASE = "http://localhost:8000"

# Register and get token
user = f"racetest_{uuid.uuid4().hex[:6]}"
r = requests.post(f"{BASE}/user/register", json={"username": user, "password": "TestPass123!"})
token = r.json()["data"]["token"]
headers = {"Authorization": f"Bearer {token}"}

def walker(name, data):
    return requests.post(f"{BASE}/walker/{name}", json=data, headers=headers, timeout=15).json()

def report(resp):
    return (resp.get("data") or {}).get("reports", [None])[0]

# Setup
me = report(walker("me", {"display_name_hint": user}))
original_uid = me["user"]["user_id"]
cr = report(walker("project_ops", {"action": "create", "name": "test", "template_id": "preview-template"}))
pid = cr["project"]["id"]

# Start AI chat
walker("ai_chat", {"action": "start", "project_id": pid, "message": "explain this project"})

# Fire concurrent requests while AI processes
for round in range(5):
    time.sleep(1)
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
        futures = [
            pool.submit(walker, "git_ops", {"action": "status", "project_id": pid}),
            pool.submit(walker, "me", {}),
            pool.submit(walker, "version_ops", {"action": "list", "project_id": pid}),
        ]
        concurrent.futures.wait(futures)
    
    me_now = report(walker("me", {}))
    current_uid = me_now["user"]["user_id"]
    if current_uid != original_uid:
        print(f"Round {round}: PROFILE LOST! Was {original_uid}, now {current_uid}")
    else:
        print(f"Round {round}: OK")

Environment

Co-Investigated-By: Claude Opus 4.6 (1M context) noreply@anthropic.com

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions