diff --git a/.husky/pre-commit b/.husky/pre-commit index 224658c..301f95c 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -13,6 +13,17 @@ # npx fallow dupes --save-baseline=.fallow/dupes-baseline.json BASE=$(git symbolic-ref --short refs/remotes/origin/HEAD 2>/dev/null) BASE=${BASE:-origin/main} +# Git sets GIT_INDEX_FILE/GIT_DIR before invoking pre-commit hooks. Fallow's +# `git worktree add` for base-ref scanning performs a checkout that writes +# to GIT_INDEX_FILE if it is set — which corrupts the main worktree's index +# by replacing it with the base ref's tree (silently deleting files the +# feature branch added that don't exist on the base ref, then committing +# those deletions). Reproduced on this repo's worktrees during the PR #18 +# review-response work. Unset both vars so any nested git invocation runs +# against the worktree's own default index. (Same fix lives at +# atomicmemory-benchmarks `.husky/pre-commit` commit `327326a`.) +unset GIT_INDEX_FILE +unset GIT_DIR npx fallow audit \ --health-baseline=.fallow/health-baseline.json \ --dupes-baseline=.fallow/dupes-baseline.json \ diff --git a/docker/litellm/.env.example b/docker/litellm/.env.example new file mode 100644 index 0000000..09b7148 --- /dev/null +++ b/docker/litellm/.env.example @@ -0,0 +1,48 @@ +# LiteLLM proxy environment template. +# +# Copy to docker/litellm/.env (gitignored) before starting the proxy: +# cp docker/litellm/.env.example docker/litellm/.env +# +# Only fill in the providers you actually need. Unset keys mean the +# corresponding model alias will return 401 at call time — startup is +# unaffected. + +# --- Proxy auth (required) ------------------------------------------------- +# Master key clients send as `Authorization: Bearer `. Generate any +# random opaque string for non-loopback deployments. +LITELLM_MASTER_KEY=sk-litellm-master + +# --- Anthropic ------------------------------------------------------------- +ANTHROPIC_API_KEY= + +# --- OpenAI ---------------------------------------------------------------- +OPENAI_API_KEY= + +# --- Microsoft Foundry / Azure AI Projects -------------------------------- +# Foundry exposes an OpenAI-compatible endpoint at: +# ${FOUNDRY_API_BASE}/openai/v1/chat/completions +# (Project Inference API; NOT the legacy Azure deployments path that requires +# api-version). LiteLLM routes via the `openai/` provider with a custom +# api_base, so we set FOUNDRY_API_BASE_OPENAI to that full path. +# +# Mirrors PROJECT_ENDPOINT used in atomicmemory-benchmarks +# (data/exp-cr-mini/foundry-client.ts). The proxy needs a static API key — +# if your Foundry deployment is Entra-only (no key), keep using the direct +# foundry-client.ts path and route only non-Foundry models through this proxy. +FOUNDRY_API_BASE= +FOUNDRY_API_BASE_OPENAI= # set to ${FOUNDRY_API_BASE}/openai/v1 +FOUNDRY_API_KEY= +FOUNDRY_API_VERSION=2024-12-01-preview # currently unused by foundry-gpt-5-chat (kept for future azure/ entries) + +# --- AWS Bedrock ----------------------------------------------------------- +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_REGION_NAME=us-east-1 + +# --- Google Gemini --------------------------------------------------------- +# Use Gemini 2.5 family (gemini-2-5-flash, gemini-2-5-pro). The 1.5 series +# is unavailable on most current API keys; 2.0 Flash is no longer offered to +# new accounts. NOTE: 2.5 generates reasoning tokens that count against +# max_tokens — clients should send max_tokens >= 500 (Flash) or >= 1500 (Pro) +# or responses will be truncated to empty content. +GEMINI_API_KEY= diff --git a/docker/litellm/README.md b/docker/litellm/README.md new file mode 100644 index 0000000..52b8c00 --- /dev/null +++ b/docker/litellm/README.md @@ -0,0 +1,80 @@ +# LiteLLM unified gateway + +Run a [LiteLLM](https://github.com/BerriAI/litellm) proxy locally to route AtomicMemory's LLM calls to Anthropic, OpenAI, Microsoft Foundry / Azure, AWS Bedrock, or Google Gemini through a single OpenAI-compatible endpoint. Provider swap is config-only — no code changes in `atomicmemory-core`. + +## Why this exists + +`atomicmemory-core` already supports any OpenAI-compatible endpoint via: + +``` +LLM_PROVIDER=openai-compatible +LLM_API_URL= +LLM_API_KEY= +LLM_MODEL= +``` + +(see `src/services/llm.ts` -> `OpenAICompatibleLLM`). The LiteLLM proxy *is* an OpenAI-compatible endpoint, so wiring it up is purely an infra/config change. No new provider lane in `llm.ts`. + +## Quick start + +```bash +# 1. Set provider credentials (fill in the ones you need) +cp docker/litellm/.env.example docker/litellm/.env +$EDITOR docker/litellm/.env + +# 2. Start the proxy on http://localhost:4000 +docker compose -f docker/litellm/docker-compose.litellm.yml up -d + +# 3. Sanity-check it's up +curl -s http://localhost:4000/health/liveliness +# -> {"status":"alive",...} + +# 4. List configured models +curl -s http://localhost:4000/v1/models \ + -H "Authorization: Bearer $LITELLM_MASTER_KEY" +``` + +Point AtomicMemory at it (your `.env`): + +``` +LLM_PROVIDER=openai-compatible +LLM_API_URL=http://localhost:4000 +LLM_API_KEY=sk-litellm-master # value of LITELLM_MASTER_KEY in docker/litellm/.env +LLM_MODEL=anthropic-haiku-4-5 # or any model_name from litellm-config.yaml +``` + +Restart the core dev server to pick up the new env. The proxy and core can run side-by-side because they're on different ports (4000 vs 3050). + +## Switching providers at runtime + +Two options: + +1. **Env-only.** Change `LLM_MODEL` to a different `model_name` from `litellm-config.yaml` and restart core. +2. **Per-request.** Use `config_override.llm_model` in the ingest/search request body (see core's per-request override pattern). The proxy is stateless w.r.t. AtomicMemory, so swapping models per request just changes which `model_list` entry the proxy resolves. + +## Configured providers + +| `model_name` (LLM_MODEL value) | Upstream | Required env | +|---|---|---| +| `anthropic-haiku-4-5` | Anthropic | `ANTHROPIC_API_KEY` | +| `anthropic-sonnet-4-6` | Anthropic | `ANTHROPIC_API_KEY` | +| `openai-gpt-5-chat` | OpenAI | `OPENAI_API_KEY` | +| `openai-gpt-4o-mini` | OpenAI | `OPENAI_API_KEY` | +| `foundry-gpt-5-chat` | Microsoft Foundry / Azure AI Projects | `FOUNDRY_API_BASE`, `FOUNDRY_API_KEY`, `FOUNDRY_API_VERSION` | +| `bedrock-claude-sonnet` | AWS Bedrock | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION_NAME` | +| `gemini-1-5-pro` | Google Gemini direct API | `GEMINI_API_KEY` | + +To add a model, append a `model_list` entry to `litellm-config.yaml` and restart the proxy. No core code change required. + +## Limitations / known caveats + +- **Cost telemetry.** Core's `cost-telemetry.ts` estimates cost from the model name and the OpenAI-compatible `usage` block returned by the proxy. Per-provider rates aren't perfectly mirrored across providers behind LiteLLM yet — Bedrock especially does its own per-deployment pricing. Treat per-call cost estimates as upper-bound when routed through the proxy. LiteLLM does emit an `x-litellm-response-cost` HTTP header; wiring core to read it is future work. +- **Streaming.** The proxy supports streaming, but core's LLM call sites don't currently stream — irrelevant today. +- **Foundry + Entra ID auth.** LiteLLM's `azure/` provider requires a static API key. The existing `foundry-client.ts` in `atomicmemory-benchmarks` uses `DefaultAzureCredential`. If your Foundry deployment is Entra-only with no static key, keep using `foundry-client.ts` directly and route only the other providers through LiteLLM. +- **Model name format.** Aliases here use kebab-case (`anthropic-haiku-4-5`). If a downstream tool expects the upstream model id verbatim, use the alias for routing and let LiteLLM translate. + +## Where this fits + +- `litellm-config.yaml` — model routing table (provider keys via `os.environ/...`). +- `docker-compose.litellm.yml` — sidecar service definition (port 4000, healthcheck, env wiring). +- `.env.example` — credential template for the host environment. diff --git a/docker/litellm/docker-compose.litellm.yml b/docker/litellm/docker-compose.litellm.yml new file mode 100644 index 0000000..40619bc --- /dev/null +++ b/docker/litellm/docker-compose.litellm.yml @@ -0,0 +1,65 @@ +# LiteLLM proxy as a sidecar service for AtomicMemory. +# +# Runs at http://localhost:4000 by default — distinct from the core dev +# server (3050) and the docker-compose.yml app service (also 3050) so the +# two can run side-by-side. Provider credentials are pulled from the host +# environment (or a .env in this directory) and resolved by the proxy at +# request time per `os.environ/...` references in litellm-config.yaml. +# +# Up: docker compose -f docker/litellm/docker-compose.litellm.yml up -d +# Logs: docker compose -f docker/litellm/docker-compose.litellm.yml logs -f litellm +# Down: docker compose -f docker/litellm/docker-compose.litellm.yml down +# +# After the container is healthy, point AtomicMemory at it: +# LLM_PROVIDER=openai-compatible +# LLM_API_URL=http://localhost:4000 +# LLM_API_KEY=$LITELLM_MASTER_KEY +# LLM_MODEL=anthropic-haiku-4-5 # or any model_name from litellm-config.yaml + +# Pin the compose project name so this stack never collides with another +# `litellm/` directory's compose project. Without this, docker compose +# derives the project name from the parent directory (`litellm`) which is a +# very common name and will silently hijack siblings' containers. +name: atomicmemory-litellm + +services: + litellm: + # Pinned image tag. `main-stable` is BerriAI's rolling stable channel. + # Bump explicitly when validating a new version against AtomicMemory. + image: ghcr.io/berriai/litellm:main-stable + container_name: atomicmemory-litellm + restart: unless-stopped + ports: + - "${LITELLM_PORT:-4000}:4000" + # Mount the config read-only so accidental writes from inside the + # container can't drift it. + volumes: + - ./litellm-config.yaml:/app/config.yaml:ro + command: ["--config", "/app/config.yaml", "--port", "4000", "--num_workers", "1"] + environment: + # Proxy auth — set in your shell or the .env file alongside this compose. + LITELLM_MASTER_KEY: ${LITELLM_MASTER_KEY:-sk-litellm-master} + # Provider credentials. Anything unset just makes the corresponding + # model alias 401 at call time; the proxy still starts. + ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-} + OPENAI_API_KEY: ${OPENAI_API_KEY:-} + FOUNDRY_API_BASE: ${FOUNDRY_API_BASE:-} + FOUNDRY_API_BASE_OPENAI: ${FOUNDRY_API_BASE_OPENAI:-} + FOUNDRY_API_KEY: ${FOUNDRY_API_KEY:-} + FOUNDRY_API_VERSION: ${FOUNDRY_API_VERSION:-2024-12-01-preview} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-} + AWS_REGION_NAME: ${AWS_REGION_NAME:-us-east-1} + GEMINI_API_KEY: ${GEMINI_API_KEY:-} + healthcheck: + # /health/liveliness is the LiteLLM-blessed unauthenticated probe. + test: ["CMD", "wget", "-qO-", "http://localhost:4000/health/liveliness"] + interval: 15s + timeout: 5s + retries: 3 + start_period: 30s + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" diff --git a/docker/litellm/litellm-config.yaml b/docker/litellm/litellm-config.yaml new file mode 100644 index 0000000..6b374dc --- /dev/null +++ b/docker/litellm/litellm-config.yaml @@ -0,0 +1,131 @@ +# LiteLLM proxy configuration for AtomicMemory. +# +# Why this exists +# --------------- +# AtomicMemory's LLM lane already supports any OpenAI-compatible endpoint via +# `LLM_PROVIDER=openai-compatible` + `LLM_API_URL` + `LLM_API_KEY` +# (see src/services/llm.ts -> OpenAICompatibleLLM). Pointing that lane at a +# LiteLLM proxy gives us a single, config-driven seam to swap LLM providers +# (Anthropic, OpenAI, Microsoft Foundry / Azure, AWS Bedrock, Google Gemini) +# without code changes in core. +# +# How it's used +# ------------- +# 1. Start the proxy: docker compose -f docker/litellm/docker-compose.litellm.yml up -d +# 2. In core .env, set: LLM_PROVIDER=openai-compatible +# LLM_API_URL=http://localhost:4000 +# LLM_API_KEY=sk-litellm-master # the master_key below +# LLM_MODEL=anthropic-haiku-4-5 # any model_name from the model_list +# 3. Restart the core dev server (do NOT bounce a running multirun's server). +# +# Provider credentials live in the host environment, NOT in this file. The +# proxy resolves `os.environ/VAR_NAME` at request time, so the same config.yaml +# works in dev (one credential set) and in CI/prod (a different set) without +# editing it. Missing credentials surface as 401 from the *target provider* +# only when that model is actually called — startup does not block. +# +# Model naming convention +# ----------------------- +# `model_name` is the alias clients use (LLM_MODEL on the AtomicMemory side +# OR `model` in the OpenAI request body). `litellm_params.model` is the +# upstream provider+model identifier LiteLLM understands. We name aliases +# `-` so dashboard reads stay legible. + +model_list: + # --------------------------------------------------------------------------- + # Anthropic — direct API. Default backbone for AtomicMemory today. + # Requires: ANTHROPIC_API_KEY + # --------------------------------------------------------------------------- + - model_name: anthropic-haiku-4-5 + litellm_params: + model: anthropic/claude-haiku-4-5 + api_key: os.environ/ANTHROPIC_API_KEY + + - model_name: anthropic-sonnet-4-6 + litellm_params: + model: anthropic/claude-sonnet-4-6 + api_key: os.environ/ANTHROPIC_API_KEY + + # --------------------------------------------------------------------------- + # OpenAI — direct API. + # Requires: OPENAI_API_KEY + # --------------------------------------------------------------------------- + - model_name: openai-gpt-5-chat + litellm_params: + model: openai/gpt-5-chat + api_key: os.environ/OPENAI_API_KEY + + - model_name: openai-gpt-4o-mini + litellm_params: + model: openai/gpt-4o-mini + api_key: os.environ/OPENAI_API_KEY + + # --------------------------------------------------------------------------- + # Microsoft Foundry / Azure AI Projects. + # The Foundry sprint already provisioned a deployment; reuse its endpoint. + # Requires: + # FOUNDRY_API_BASE = same value as PROJECT_ENDPOINT (https://.services.ai.azure.com/api/projects/) + # FOUNDRY_API_KEY = Azure AI Projects key. If using `az login` / Entra + # identity instead of a key, leave unset and use the + # Foundry-direct path in data/exp-cr-mini/foundry-client.ts. + # LiteLLM proxy needs a static key for `azure/`. + # FOUNDRY_API_VERSION (optional, defaults to 2024-12-01-preview) + # --------------------------------------------------------------------------- + # Foundry exposes an OpenAI-compatible path at `${FOUNDRY_API_BASE}/openai/v1` + # (the project-endpoint Responses-and-Chat API), NOT the legacy Azure + # `/openai/deployments//chat/completions?api-version=X` path. So we + # route via LiteLLM's `openai/` provider with a custom api_base — same + # pattern as any OpenAI-compatible endpoint. No api-version needed. + - model_name: foundry-gpt-5-chat + litellm_params: + model: openai/gpt-5-chat + api_base: os.environ/FOUNDRY_API_BASE_OPENAI + api_key: os.environ/FOUNDRY_API_KEY + + # --------------------------------------------------------------------------- + # AWS Bedrock — Claude Sonnet via Bedrock. Placeholder; no creds yet. + # Requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION_NAME + # --------------------------------------------------------------------------- + - model_name: bedrock-claude-sonnet + litellm_params: + model: bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0 + aws_access_key_id: os.environ/AWS_ACCESS_KEY_ID + aws_secret_access_key: os.environ/AWS_SECRET_ACCESS_KEY + aws_region_name: os.environ/AWS_REGION_NAME + + # --------------------------------------------------------------------------- + # Google Gemini — direct API (not Vertex). Placeholder; no creds yet. + # Requires: GEMINI_API_KEY (or set GOOGLE_API_KEY -- LiteLLM accepts either) + # --------------------------------------------------------------------------- + # Gemini 1.5 series isn't available on most current API keys; use 2.5 family. + - model_name: gemini-2-5-pro + litellm_params: + model: gemini/gemini-2.5-pro + api_key: os.environ/GEMINI_API_KEY + + - model_name: gemini-2-5-flash + litellm_params: + model: gemini/gemini-2.5-flash + api_key: os.environ/GEMINI_API_KEY + +litellm_settings: + # Drop unsupported per-provider params (e.g. seed for providers that ignore + # it) instead of erroring. Keeps the AtomicMemory LLM call shape stable + # across providers — a key reason we're using the gateway. + drop_params: true + # Per-call timeout. AtomicMemory's AUDN path can produce long prompts; 90s + # bounds wall-time without truncating legit slow responses. + request_timeout: 90 + # Surface upstream cost telemetry on /v1/chat/completions response, so + # cost-telemetry.ts can keep accounting per-provider when we route through + # the proxy. (LiteLLM emits `x-litellm-response-cost` header + `usage` block.) + set_verbose: false + +general_settings: + # Proxy-level auth. Clients (atomicmemory-core, harness) must send this as + # the `Authorization: Bearer `; the proxy then injects the + # provider-specific creds resolved from `os.environ/...` above. + # + # The default value is a developer placeholder. Override it in your + # docker-compose env (LITELLM_MASTER_KEY=...) for any non-loopback use. + master_key: os.environ/LITELLM_MASTER_KEY diff --git a/src/app/runtime-container.ts b/src/app/runtime-container.ts index 8609c89..29efe5e 100644 --- a/src/app/runtime-container.ts +++ b/src/app/runtime-container.ts @@ -24,6 +24,10 @@ import { LinkRepository } from '../db/link-repository.js'; import { MemoryRepository } from '../db/memory-repository.js'; import { EntityRepository } from '../db/repository-entities.js'; import { LessonRepository } from '../db/repository-lessons.js'; +import { TllRepository } from '../db/repository-tll.js'; +import { FirstMentionRepository } from '../db/repository-first-mentions.js'; +import { FirstMentionService } from '../services/first-mention-service.js'; +import { llm } from '../services/llm.js'; import type { CoreStores } from '../db/stores.js'; import { PgMemoryStore } from '../db/pg-memory-store.js'; import { PgEpisodeStore } from '../db/pg-episode-store.js'; @@ -202,6 +206,36 @@ export function createCoreRuntime(deps: CoreRuntimeDeps): CoreRuntime { pool, }; + // Phase 4 TLL — per-entity event chain for EO/MSR/TR queries. + // Append on memory store, traverse on retrieval. + const tllRepository = entities ? new TllRepository(pool) : null; + + // First-mention events — chronological topic-introduction list. Caller + // (e.g. an external harness) drives extraction explicitly via the + // POST /v1/memories/first-mentions/extract route, supplying its own + // turn-id-to-memory-id mapping (the ingest pipeline does not retain + // turn structure). The chatFn adapter wraps the configured LLM + // singleton; per-call cost is logged inside `llm.chat`. + const firstMentionRepository = new FirstMentionRepository(pool); + const firstMentionService = new FirstMentionService( + firstMentionRepository, + async (system, user, maxTokens) => { + const text = await llm.chat( + [ + { role: 'system', content: system }, + { role: 'user', content: user }, + ], + { maxTokens }, + ); + // Token usage is intentionally NOT returned here: `LLMProvider.chat` + // emits per-call cost telemetry via `writeCostEvent` internally + // (see `src/services/llm.ts`). Surfacing zeros at this seam invited + // the bug the prior reviewer caught — readers would treat them as + // real counts. Drop the field instead until usage is plumbed. + return { text }; + }, + ); + const service = new MemoryService( memory, claims, @@ -210,6 +244,8 @@ export function createCoreRuntime(deps: CoreRuntimeDeps): CoreRuntime { undefined, runtimeConfig, stores, + tllRepository ?? undefined, + firstMentionService, ); return { diff --git a/src/db/__tests__/repository-first-mentions.test.ts b/src/db/__tests__/repository-first-mentions.test.ts new file mode 100644 index 0000000..b57b413 --- /dev/null +++ b/src/db/__tests__/repository-first-mentions.test.ts @@ -0,0 +1,120 @@ +/** + * Integration tests for FirstMentionRepository. + * + * Validates the storage idempotency contract that pairs with review #7: + * the `(user_id, memory_id)` UNIQUE constraint silently drops duplicate + * inserts, so a re-run of `extractAndStore` for the same conversation + * never produces extra rows. The service layer's post-sorted index + * `positionInConversation` makes the read-back deterministic across + * re-runs even when the LLM's turn_id assignment drifts; this test + * exercises the DB half of that guarantee. + * + * Requires DATABASE_URL in .env.test. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { pool } from '../pool.js'; +import { FirstMentionRepository, type FirstMentionEvent } from '../repository-first-mentions.js'; +import { MemoryRepository } from '../memory-repository.js'; +import { setupTestSchema, unitVector } from './test-fixtures.js'; + +const TEST_USER = 'first-mentions-repo-test-user'; + +describe('FirstMentionRepository', () => { + const fmRepo = new FirstMentionRepository(pool); + const memoryRepo = new MemoryRepository(pool); + + beforeAll(async () => { + await setupTestSchema(pool); + }); + + beforeEach(async () => { + await pool.query( + 'DELETE FROM first_mention_events WHERE user_id = $1', + [TEST_USER], + ); + await memoryRepo.deleteAll(); + }); + + afterAll(async () => { + await pool.end(); + }); + + async function makeMemory(content: string, seed: number): Promise { + return memoryRepo.storeMemory({ + userId: TEST_USER, + content, + embedding: unitVector(seed), + importance: 0.5, + sourceSite: 'first-mention-test', + }); + } + + async function countRows(): Promise { + const r = await pool.query<{ c: string }>( + `SELECT COUNT(*)::text AS c FROM first_mention_events WHERE user_id = $1`, + [TEST_USER], + ); + return Number.parseInt(r.rows[0].c, 10); + } + + it('store() is idempotent on (user_id, memory_id) across re-runs (review #7)', async () => { + const memA = await makeMemory('A topic memory', 1); + const memB = await makeMemory('B topic memory', 2); + + // Run 1: turn_ids 5/12, position 0/1. + const run1: FirstMentionEvent[] = [ + { topic: 'A', turnId: 5, memoryId: memA, anchorDate: null, positionInConversation: 0 }, + { topic: 'B', turnId: 12, memoryId: memB, anchorDate: null, positionInConversation: 1 }, + ]; + await fmRepo.store(TEST_USER, 'beam', run1); + + // Run 2: same logical topics but the LLM drifted A's turn_id 5 -> 6. + // The post-sorted index keeps positionInConversation = 0/1 so the + // INSERT shape is identical to the row already on disk; ON CONFLICT + // (user_id, memory_id) DO NOTHING drops the duplicate cleanly. + const run2: FirstMentionEvent[] = [ + { topic: 'A', turnId: 6, memoryId: memA, anchorDate: null, positionInConversation: 0 }, + { topic: 'B', turnId: 12, memoryId: memB, anchorDate: null, positionInConversation: 1 }, + ]; + await fmRepo.store(TEST_USER, 'beam', run2); + + expect(await countRows()).toBe(2); + const list = await fmRepo.list(TEST_USER); + expect(list).toHaveLength(2); + expect(list.map((e) => e.topic)).toEqual(['A', 'B']); + expect(list.map((e) => e.positionInConversation)).toEqual([0, 1]); + // The first-write wins per ON CONFLICT DO NOTHING — original turn_id + // for A (5) is what survives, not the drifted 6. + const a = list.find((e) => e.topic === 'A'); + expect(a?.turnId).toBe(5); + }); + + it('store() does no work when the events array is empty', async () => { + await fmRepo.store(TEST_USER, 'beam', []); + expect(await countRows()).toBe(0); + }); + + it('list() returns events ordered by position_in_conversation ASC', async () => { + const m1 = await makeMemory('m1', 11); + const m2 = await makeMemory('m2', 12); + const m3 = await makeMemory('m3', 13); + + // Insert deliberately out of position order. + await fmRepo.store(TEST_USER, 'beam', [ + { topic: 'middle', turnId: 5, memoryId: m2, anchorDate: null, positionInConversation: 1 }, + { topic: 'late', turnId: 9, memoryId: m3, anchorDate: null, positionInConversation: 2 }, + { topic: 'early', turnId: 2, memoryId: m1, anchorDate: null, positionInConversation: 0 }, + ]); + + const list = await fmRepo.list(TEST_USER); + expect(list.map((e) => e.topic)).toEqual(['early', 'middle', 'late']); + expect(list.map((e) => e.positionInConversation)).toEqual([0, 1, 2]); + }); + + it('getByMemoryId() returns null when no event exists for the memory', async () => { + const memA = await makeMemory('A', 21); + const result = await fmRepo.getByMemoryId(TEST_USER, memA); + expect(result).toBeNull(); + }); +}); diff --git a/src/db/__tests__/repository-tll.test.ts b/src/db/__tests__/repository-tll.test.ts new file mode 100644 index 0000000..76a7678 --- /dev/null +++ b/src/db/__tests__/repository-tll.test.ts @@ -0,0 +1,265 @@ +/** + * Integration tests for TllRepository — the per-entity Temporal Linkage List. + * + * Validates: + * - append() idempotency on (user_id, entity_id, memory_id) + * - append() predecessor wiring + position_in_chain bookkeeping + * - chain() ordering by position_in_chain ASC + * - chainsFor() dedup + observation_date ordering, empty-input shortcut + * - chainEventsForEntities() enriched join, drops empty entities, + * preserves position order, empty-input shortcut + * + * Requires DATABASE_URL in .env.test. Runs against the live test schema. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { pool } from '../pool.js'; +import { TllRepository } from '../repository-tll.js'; +import { EntityRepository } from '../repository-entities.js'; +import { MemoryRepository } from '../memory-repository.js'; +import { setupTestSchema, unitVector } from './test-fixtures.js'; + +const TEST_USER = 'tll-repo-test-user'; + +interface SeededEntity { id: string; } +interface SeededMemory { id: string; date: Date; } + +describe('TllRepository', () => { + const tllRepo = new TllRepository(pool); + const entityRepo = new EntityRepository(pool); + const memoryRepo = new MemoryRepository(pool); + + beforeAll(async () => { + await setupTestSchema(pool); + }); + + beforeEach(async () => { + await pool.query('DELETE FROM temporal_linkage_list WHERE user_id = $1', [TEST_USER]); + await entityRepo.deleteAll(); + await memoryRepo.deleteAll(); + }); + + afterAll(async () => { + await pool.end(); + }); + + /** Resolve a fresh entity for the test user with a deterministic seed. */ + async function makeEntity(name: string, seed: number): Promise { + const id = await entityRepo.resolveEntity({ + userId: TEST_USER, + name, + entityType: 'tool', + embedding: unitVector(seed), + }); + return { id }; + } + + /** Store a memory and return its id along with its observation_date. */ + async function makeMemory(content: string, seed: number, date: Date): Promise { + const id = await memoryRepo.storeMemory({ + userId: TEST_USER, + content, + embedding: unitVector(seed), + importance: 0.6, + sourceSite: 'test', + }); + return { id, date }; + } + + /** Count rows in TLL for a (user, entity, memory) triple. */ + async function countRows(entityId: string, memoryId: string): Promise { + const r = await pool.query<{ c: string }>( + `SELECT COUNT(*)::text AS c FROM temporal_linkage_list + WHERE user_id = $1 AND entity_id = $2 AND memory_id = $3`, + [TEST_USER, entityId, memoryId], + ); + return Number.parseInt(r.rows[0].c, 10); + } + + describe('append()', () => { + it('is idempotent on (user_id, entity_id, memory_id)', async () => { + const ent = await makeEntity('Postgres', 11); + const mem = await makeMemory('User uses Postgres', 12, new Date('2026-01-10')); + + await tllRepo.append(TEST_USER, mem.id, [ent.id], mem.date); + await tllRepo.append(TEST_USER, mem.id, [ent.id], mem.date); + + expect(await countRows(ent.id, mem.id)).toBe(1); + }); + + it('returns early without writing when entityIds is empty', async () => { + const mem = await makeMemory('No-entity memory', 14, new Date('2026-01-10')); + await tllRepo.append(TEST_USER, mem.id, [], mem.date); + + const r = await pool.query<{ c: string }>( + `SELECT COUNT(*)::text AS c FROM temporal_linkage_list WHERE user_id = $1`, + [TEST_USER], + ); + expect(Number.parseInt(r.rows[0].c, 10)).toBe(0); + }); + + it('wires predecessor + position_in_chain across two appends', async () => { + const ent = await makeEntity('React', 21); + const memA = await makeMemory('first', 22, new Date('2026-01-01')); + const memB = await makeMemory('second', 23, new Date('2026-02-01')); + + await tllRepo.append(TEST_USER, memA.id, [ent.id], memA.date); + await tllRepo.append(TEST_USER, memB.id, [ent.id], memB.date); + + const events = await tllRepo.chain(TEST_USER, ent.id); + expect(events).toHaveLength(2); + expect(events[0].memoryId).toBe(memA.id); + expect(events[0].positionInChain).toBe(0); + expect(events[0].predecessorMemoryId).toBeNull(); + expect(events[1].memoryId).toBe(memB.id); + expect(events[1].positionInChain).toBe(1); + expect(events[1].predecessorMemoryId).toBe(memA.id); + }); + + it('deduplicates duplicate entityIds within a single call', async () => { + const ent = await makeEntity('Vue', 31); + const mem = await makeMemory('dup', 32, new Date('2026-01-15')); + + await tllRepo.append(TEST_USER, mem.id, [ent.id, ent.id, ent.id], mem.date); + + expect(await countRows(ent.id, mem.id)).toBe(1); + }); + + it('serializes concurrent appends to the same chain without duplicate positions', async () => { + const ent = await makeEntity('Concurrent', 121); + const mA = await makeMemory('concA', 122, new Date('2026-05-01')); + const mB = await makeMemory('concB', 123, new Date('2026-05-02')); + const mC = await makeMemory('concC', 124, new Date('2026-05-03')); + + await Promise.all([ + tllRepo.append(TEST_USER, mA.id, [ent.id], mA.date), + tllRepo.append(TEST_USER, mB.id, [ent.id], mB.date), + tllRepo.append(TEST_USER, mC.id, [ent.id], mC.date), + ]); + + const events = await tllRepo.chain(TEST_USER, ent.id); + expect(events).toHaveLength(3); + expect(events.map((e) => e.positionInChain)).toEqual([0, 1, 2]); + expect(new Set(events.map((e) => e.memoryId)).size).toBe(3); + expect(events[0].predecessorMemoryId).toBeNull(); + expect(events[1].predecessorMemoryId).toBe(events[0].memoryId); + expect(events[2].predecessorMemoryId).toBe(events[1].memoryId); + }); + }); + + describe('chain()', () => { + it('returns events ordered by position_in_chain ASC', async () => { + const ent = await makeEntity('Docker', 41); + const m1 = await makeMemory('one', 42, new Date('2026-03-01')); + const m2 = await makeMemory('two', 43, new Date('2026-03-15')); + const m3 = await makeMemory('three', 44, new Date('2026-04-01')); + + await tllRepo.append(TEST_USER, m1.id, [ent.id], m1.date); + await tllRepo.append(TEST_USER, m2.id, [ent.id], m2.date); + await tllRepo.append(TEST_USER, m3.id, [ent.id], m3.date); + + const events = await tllRepo.chain(TEST_USER, ent.id); + const positions = events.map((e) => e.positionInChain); + expect(positions).toEqual([0, 1, 2]); + expect(events.map((e) => e.memoryId)).toEqual([m1.id, m2.id, m3.id]); + }); + + it('returns [] for an entity with no events', async () => { + const ent = await makeEntity('Lonely', 51); + const events = await tllRepo.chain(TEST_USER, ent.id); + expect(events).toEqual([]); + }); + }); + + describe('chainsFor()', () => { + it('returns [] without querying when entityIds is empty', async () => { + const ids = await tllRepo.chainsFor(TEST_USER, []); + expect(ids).toEqual([]); + }); + + it('returns deduped memory_ids ordered by observation_date ASC', async () => { + const e1 = await makeEntity('TypeScript', 61); + const e2 = await makeEntity('Node.js', 62); + const earliest = await makeMemory('earliest', 63, new Date('2026-01-01')); + const middle = await makeMemory('middle', 64, new Date('2026-02-01')); + const latest = await makeMemory('latest', 65, new Date('2026-03-01')); + + await tllRepo.append(TEST_USER, latest.id, [e1.id, e2.id], latest.date); + await tllRepo.append(TEST_USER, earliest.id, [e1.id], earliest.date); + await tllRepo.append(TEST_USER, middle.id, [e2.id], middle.date); + + const ids = await tllRepo.chainsFor(TEST_USER, [e1.id, e2.id]); + + expect(ids).toEqual([earliest.id, middle.id, latest.id]); + expect(new Set(ids).size).toBe(ids.length); + }); + + it('isolates results by user', async () => { + const e1 = await makeEntity('Rust', 71); + const m1 = await makeMemory('mine', 72, new Date('2026-01-10')); + await tllRepo.append(TEST_USER, m1.id, [e1.id], m1.date); + + const ids = await tllRepo.chainsFor('different-user', [e1.id]); + expect(ids).toEqual([]); + }); + }); + + describe('chainEventsForEntities()', () => { + it('returns [] without querying when entityIds is empty', async () => { + const result = await tllRepo.chainEventsForEntities(TEST_USER, []); + expect(result).toEqual([]); + }); + + it('returns enriched events grouped by entity, position-ordered', async () => { + const ent = await makeEntity('Kafka', 81); + const m1 = await makeMemory('first event', 82, new Date('2026-01-01')); + const m2 = await makeMemory('second event', 83, new Date('2026-02-01')); + + await tllRepo.append(TEST_USER, m1.id, [ent.id], m1.date); + await tllRepo.append(TEST_USER, m2.id, [ent.id], m2.date); + + const result = await tllRepo.chainEventsForEntities(TEST_USER, [ent.id]); + + expect(result).toHaveLength(1); + expect(result[0].entityId).toBe(ent.id); + expect(result[0].events).toHaveLength(2); + expect(result[0].events[0].positionInChain).toBe(0); + expect(result[0].events[0].memoryId).toBe(m1.id); + expect(result[0].events[0].content).toBe('first event'); + expect(result[0].events[0].predecessorMemoryId).toBeNull(); + expect(result[0].events[1].positionInChain).toBe(1); + expect(result[0].events[1].memoryId).toBe(m2.id); + expect(result[0].events[1].content).toBe('second event'); + expect(result[0].events[1].predecessorMemoryId).toBe(m1.id); + }); + + it('drops entities that have no events', async () => { + const populated = await makeEntity('Populated', 91); + const empty = await makeEntity('Empty', 92); + const mem = await makeMemory('only one', 93, new Date('2026-01-05')); + + await tllRepo.append(TEST_USER, mem.id, [populated.id], mem.date); + + const result = await tllRepo.chainEventsForEntities( + TEST_USER, [populated.id, empty.id], + ); + expect(result).toHaveLength(1); + expect(result[0].entityId).toBe(populated.id); + }); + + it('excludes events whose memory is soft-deleted', async () => { + const ent = await makeEntity('Soft Delete', 101); + const live = await makeMemory('live', 102, new Date('2026-01-01')); + const dead = await makeMemory('dead', 103, new Date('2026-02-01')); + + await tllRepo.append(TEST_USER, live.id, [ent.id], live.date); + await tllRepo.append(TEST_USER, dead.id, [ent.id], dead.date); + await memoryRepo.softDeleteMemory(TEST_USER, dead.id); + + const result = await tllRepo.chainEventsForEntities(TEST_USER, [ent.id]); + + expect(result).toHaveLength(1); + expect(result[0].events.map((e) => e.memoryId)).toEqual([live.id]); + }); + }); +}); diff --git a/src/db/repository-first-mentions.ts b/src/db/repository-first-mentions.ts new file mode 100644 index 0000000..c83f202 --- /dev/null +++ b/src/db/repository-first-mentions.ts @@ -0,0 +1,119 @@ +/** + * Repository for first-mention events — per-user chronological list of + * "the first time topic X was introduced in conversation." + * + * Distinct from atomic facts (claims) and memories (chunks). The grain + * matches BEAM event-ordering rubrics: which aspects did the user bring + * up, and in what order. + * + * Entries are produced post-ingest by FirstMentionService via a single + * LLM scan of the full conversation. Storage is idempotent on + * (user_id, memory_id) so re-running the extractor does not duplicate. + */ + +import pg from 'pg'; + +export interface FirstMentionEvent { + topic: string; + turnId: number; + memoryId: string; + anchorDate: Date | null; + positionInConversation: number; +} + +export class FirstMentionRepository { + constructor(private pool: pg.Pool) {} + + /** + * Idempotent batch INSERT. Conflicts on (user_id, memory_id) are + * silently dropped, so re-running the extractor for the same + * conversation does not produce duplicate rows. + */ + async store( + userId: string, + sourceSite: string, + events: FirstMentionEvent[], + ): Promise { + if (events.length === 0) return; + + const values: string[] = []; + const params: unknown[] = []; + let p = 1; + for (const ev of events) { + values.push( + `($${p}, $${p + 1}, $${p + 2}, $${p + 3}, $${p + 4}, $${p + 5}, $${p + 6})`, + ); + params.push( + userId, + ev.topic, + ev.turnId, + ev.memoryId, + ev.anchorDate, + ev.positionInConversation, + sourceSite, + ); + p += 7; + } + + await this.pool.query( + `INSERT INTO first_mention_events + (user_id, topic, turn_id, memory_id, anchor_date, + position_in_conversation, source_site) + VALUES ${values.join(', ')} + ON CONFLICT (user_id, memory_id) DO NOTHING`, + params, + ); + } + + /** + * Look up a single first-mention event by memory_id. + * Returns null if no event is associated with the memory. + */ + async getByMemoryId( + userId: string, + memoryId: string, + ): Promise { + const result = await this.pool.query( + `SELECT topic, turn_id, memory_id, anchor_date, position_in_conversation + FROM first_mention_events + WHERE user_id = $1 AND memory_id = $2 + LIMIT 1`, + [userId, memoryId], + ); + if (result.rows.length === 0) return null; + const row = result.rows[0]; + return { + topic: row.topic, + turnId: row.turn_id, + memoryId: row.memory_id, + anchorDate: row.anchor_date, + positionInConversation: row.position_in_conversation, + }; + } + + /** + * List all first-mention events for a user, ordered by + * position_in_conversation ASC. Used by EO/MSR retrieval to surface + * the chronological topic-introduction list to the answer generator. + */ + async list( + userId: string, + limit: number = 100, + ): Promise { + const result = await this.pool.query( + `SELECT topic, turn_id, memory_id, anchor_date, position_in_conversation + FROM first_mention_events + WHERE user_id = $1 + ORDER BY position_in_conversation ASC + LIMIT $2`, + [userId, limit], + ); + return result.rows.map((row) => ({ + topic: row.topic, + turnId: row.turn_id, + memoryId: row.memory_id, + anchorDate: row.anchor_date, + positionInConversation: row.position_in_conversation, + })); + } +} diff --git a/src/db/repository-tll.ts b/src/db/repository-tll.ts new file mode 100644 index 0000000..daf73a9 --- /dev/null +++ b/src/db/repository-tll.ts @@ -0,0 +1,200 @@ +/** + * Repository for the Temporal Linkage List (TLL) — per-entity sparse graph + * of event nodes connected by predecessor/successor edges. + * + * Purpose: maintain "what happened in what order, per entity" without + * paying the full graph-DB cost. Each new memory referencing an entity + * appends an event node; the predecessor pointer lets us traverse the + * chain backward at query time for EO/MSR/TR questions. + */ + +import pg from 'pg'; + +export interface TLLEvent { + memoryId: string; + predecessorMemoryId: string | null; + observationDate: Date; + positionInChain: number; +} + +// Stable namespace for pg_advisory_xact_lock keying. Keeps TLL appends from +// colliding with unrelated advisory-lock callers in the same process. +const TLL_ADVISORY_LOCK_NAMESPACE = 0x544c4c00; // "TLL\0" + +export class TllRepository { + constructor(private pool: pg.Pool) {} + + /** + * Append an event node to each entity's chain. Idempotent on + * (user_id, entity_id, memory_id). Predecessor is the most-recent + * existing event for the entity; position is len(chain). + * + * Race-safety: each (user_id, entity_id) append runs inside a transaction + * guarded by `pg_advisory_xact_lock`. Concurrent appends targeting the + * same chain serialize on the lock, then compute the next position from + * committed rows via INSERT...SELECT. The + * `(user_id, entity_id, position_in_chain)` unique index is the + * defense-in-depth backstop that fails loudly if any caller bypasses the + * lock path. + */ + async append( + userId: string, + memoryId: string, + entityIds: string[], + observationDate: Date, + ): Promise { + if (entityIds.length === 0) return; + const uniqueEntities = [...new Set(entityIds)]; + + // Per-entity transactions keep the advisory-lock scope narrow and let + // independent chains proceed in parallel. + for (const entityId of uniqueEntities) { + await this.appendOne(userId, memoryId, entityId, observationDate); + } + } + + /** + * Append one (entity, memory) row under an advisory lock keyed on the + * chain. The INSERT...SELECT computes predecessor + position inline from + * the latest committed row, so the read-then-write window the previous + * implementation exposed cannot reorder concurrent appends. + */ + private async appendOne( + userId: string, + memoryId: string, + entityId: string, + observationDate: Date, + ): Promise { + const client = await this.pool.connect(); + try { + await client.query('BEGIN'); + await client.query('SELECT pg_advisory_xact_lock($1, hashtext($2))', [ + TLL_ADVISORY_LOCK_NAMESPACE, + `${userId}:${entityId}`, + ]); + await client.query( + `INSERT INTO temporal_linkage_list + (user_id, entity_id, memory_id, predecessor_memory_id, + observation_date, position_in_chain) + SELECT + $1, $2, $3, + (SELECT memory_id FROM temporal_linkage_list + WHERE user_id = $1 AND entity_id = $2 + ORDER BY position_in_chain DESC LIMIT 1), + $4, + COALESCE( + (SELECT MAX(position_in_chain) FROM temporal_linkage_list + WHERE user_id = $1 AND entity_id = $2), + -1 + ) + 1 + ON CONFLICT (user_id, entity_id, memory_id) DO NOTHING`, + [userId, entityId, memoryId, observationDate], + ); + await client.query('COMMIT'); + } catch (err) { + await client.query('ROLLBACK').catch((rollbackErr) => + console.error('[tll] rollback failed:', rollbackErr instanceof Error ? rollbackErr.message : rollbackErr), + ); + throw err; + } finally { + client.release(); + } + } + + /** + * Get the full event chain for an entity, ordered chronologically. + * Used for "list events in order" (EO) and "how did X evolve" (TR/MSR). + */ + async chain(userId: string, entityId: string): Promise { + const result = await this.pool.query( + `SELECT memory_id, predecessor_memory_id, observation_date, position_in_chain + FROM temporal_linkage_list + WHERE user_id = $1 AND entity_id = $2 + ORDER BY position_in_chain ASC`, + [userId, entityId], + ); + return result.rows.map((row) => ({ + memoryId: row.memory_id, + predecessorMemoryId: row.predecessor_memory_id, + observationDate: row.observation_date, + positionInChain: row.position_in_chain, + })); + } + + /** + * Bulk: get chains for multiple entities. Returns memory_ids in order + * across all entity chains, deduplicated. Used as a retrieval signal + * for queries that span multiple entities (MSR). + */ + async chainsFor(userId: string, entityIds: string[]): Promise { + if (entityIds.length === 0) return []; + const result = await this.pool.query( + `SELECT DISTINCT memory_id, observation_date + FROM temporal_linkage_list + WHERE user_id = $1 AND entity_id = ANY($2::uuid[]) + ORDER BY observation_date ASC`, + [userId, [...new Set(entityIds)]], + ); + return result.rows.map((row) => row.memory_id); + } + + /** + * Bulk-retrieve enriched event chains: per-entity ordered list of events + * joined with memory content. Used by the event-chains HTTP endpoint and + * by EO-shaped read paths that need content alongside chain position. + * + * Returns one entry per entity; entities with no events are dropped. + * Within an entity, events are ordered by position_in_chain ASC. + */ + async chainEventsForEntities( + userId: string, + entityIds: string[], + ): Promise; + }>> { + if (entityIds.length === 0) return []; + const unique = [...new Set(entityIds)]; + const result = await this.pool.query( + `SELECT t.entity_id, + t.memory_id, + t.predecessor_memory_id, + t.observation_date, + t.position_in_chain, + m.content + FROM temporal_linkage_list t + JOIN memories m ON m.id = t.memory_id + WHERE t.user_id = $1 + AND t.entity_id = ANY($2::uuid[]) + AND m.deleted_at IS NULL + AND m.status = 'active' + ORDER BY t.entity_id, t.position_in_chain ASC`, + [userId, unique], + ); + const grouped = new Map>(); + for (const row of result.rows) { + const list = grouped.get(row.entity_id) ?? []; + list.push({ + memoryId: row.memory_id, + content: row.content, + observationDate: row.observation_date, + positionInChain: row.position_in_chain, + predecessorMemoryId: row.predecessor_memory_id, + }); + grouped.set(row.entity_id, list); + } + return [...grouped.entries()].map(([entityId, events]) => ({ entityId, events })); + } +} diff --git a/src/db/schema.sql b/src/db/schema.sql index b5332bd..2482246 100644 --- a/src/db/schema.sql +++ b/src/db/schema.sql @@ -323,6 +323,88 @@ CREATE TABLE IF NOT EXISTS memory_entities ( CREATE INDEX IF NOT EXISTS idx_memory_entities_entity ON memory_entities (entity_id); +-- Phase 4: Temporal Linkage List (TLL). +-- Per-entity sparse graph of event nodes with predecessor/successor edges. +-- Karpathy-minimal: append on ingest, traverse on EO/MSR/TR queries. +-- Targets the abilities Mem0 explicitly admits their architecture doesn't +-- crack at 10M (temporal reasoning, event ordering, multi-session reasoning). +-- The unique architectural primitive nobody has shipped publicly. +CREATE TABLE IF NOT EXISTS temporal_linkage_list ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id TEXT NOT NULL, + entity_id UUID NOT NULL REFERENCES entities(id) ON DELETE CASCADE, + memory_id UUID NOT NULL REFERENCES memories(id) ON DELETE CASCADE, + predecessor_memory_id UUID DEFAULT NULL REFERENCES memories(id) ON DELETE CASCADE, + observation_date TIMESTAMPTZ NOT NULL, + position_in_chain INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (user_id, entity_id, memory_id) +); + +CREATE INDEX IF NOT EXISTS idx_tll_entity_chain + ON temporal_linkage_list (user_id, entity_id, position_in_chain); +CREATE INDEX IF NOT EXISTS idx_tll_memory + ON temporal_linkage_list (memory_id); + +-- Defense-in-depth: unique (chain, position) so any future code path that +-- bypasses the advisory-lock append fails at the DB layer instead of +-- silently producing duplicate positions. Idempotent for fresh and +-- existing schemas. +CREATE UNIQUE INDEX IF NOT EXISTS idx_tll_chain_position_unique + ON temporal_linkage_list (user_id, entity_id, position_in_chain); + +-- Align predecessor FK with memory FK (CASCADE) so a hard-deleted memory +-- removes the dependent chain node instead of leaving a half-broken +-- predecessor pointer that breaks backward chain traversal. Idempotent: +-- re-applying the constraint overwrites any prior ON DELETE SET NULL +-- definition. Required for existing databases since the table-level +-- CREATE TABLE IF NOT EXISTS above does not update column constraints. +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE table_name = 'temporal_linkage_list' + AND constraint_name = 'temporal_linkage_list_predecessor_memory_id_fkey' + ) THEN + ALTER TABLE temporal_linkage_list + DROP CONSTRAINT temporal_linkage_list_predecessor_memory_id_fkey; + END IF; + ALTER TABLE temporal_linkage_list + ADD CONSTRAINT temporal_linkage_list_predecessor_memory_id_fkey + FOREIGN KEY (predecessor_memory_id) REFERENCES memories(id) + ON DELETE CASCADE; +END$$; + +-- ===================================================================== +-- First-mention events (chronological topic-introduction list) +-- ===================================================================== +-- Per-user list of "the first time topic X was introduced in conversation." +-- Distinct from facts (which are atomic claims) and memories (which are +-- ingested chunks). The grain matches event-ordering rubrics: +-- "in what order did the user bring up these aspects." +-- +-- Generated post-ingest by FirstMentionService via a single LLM call that +-- scans the full conversation and outputs a JSON array of first-mention +-- events. Idempotent on (user_id, memory_id) so re-running doesn't duplicate. +CREATE TABLE IF NOT EXISTS first_mention_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id TEXT NOT NULL, + topic TEXT NOT NULL, + turn_id INTEGER NOT NULL, + memory_id UUID NOT NULL REFERENCES memories(id) ON DELETE CASCADE, + anchor_date TIMESTAMPTZ DEFAULT NULL, + position_in_conversation INTEGER NOT NULL, + source_site TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (user_id, memory_id) +); + +CREATE INDEX IF NOT EXISTS idx_first_mention_user_position + ON first_mention_events (user_id, position_in_conversation); + +CREATE INDEX IF NOT EXISTS idx_first_mention_user_topic + ON first_mention_events USING GIN (to_tsvector('english', topic)); + -- Entity relations: typed, directed edges between entities with temporal validity CREATE TABLE IF NOT EXISTS entity_relations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/src/routes/__tests__/event-chains-and-first-mentions.test.ts b/src/routes/__tests__/event-chains-and-first-mentions.test.ts new file mode 100644 index 0000000..980093b --- /dev/null +++ b/src/routes/__tests__/event-chains-and-first-mentions.test.ts @@ -0,0 +1,320 @@ +/** + * HTTP-level tests for the two PR #18 read endpoints (review #5): + * - GET /v1/memories/event-chains + * - POST /v1/memories/first-mentions/extract + * + * Covers the schema-validation 400 paths (no DB hit needed for the + * cases that are rejected before the handler runs) and one happy-path + * end-to-end shape assertion per endpoint. The happy-path tests seed + * the test DB and assert the response matches the response schema. + * + * Mirrors the route-test pattern from `src/__tests__/route-validation.test.ts`: + * an Express app is built with `createMemoryRouter`, a real + * `MemoryService` (wired against a real Postgres test DB plus mocked + * embeddings + a stub LLM `chatFn`), and `fetch` is used to drive the + * registered routes. + * + * Requires DATABASE_URL in .env.test. + */ + +import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest'; + +// Mock embedText so the test process never hits an embedding provider +// (CI uses a placeholder OPENAI_API_KEY). Returns a deterministic zero +// vector matching the configured embedding dimensions, mirroring the +// mock pattern in route-validation.test.ts. +vi.mock('../../services/embedding.js', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + embedText: vi.fn(async () => { + const { config: cfg } = await import('../../config.js'); + return new Array(cfg.embeddingDimensions).fill(0); + }), + }; +}); + +import express from 'express'; +import { pool } from '../../db/pool.js'; +import { MemoryRepository } from '../../db/memory-repository.js'; +import { ClaimRepository } from '../../db/claim-repository.js'; +import { EntityRepository } from '../../db/repository-entities.js'; +import { TllRepository } from '../../db/repository-tll.js'; +import { FirstMentionRepository } from '../../db/repository-first-mentions.js'; +import { FirstMentionService } from '../../services/first-mention-service.js'; +import { MemoryService } from '../../services/memory-service.js'; +import { createMemoryRouter } from '../memories.js'; +import { setupTestSchema, unitVector } from '../../db/__tests__/test-fixtures.js'; +import { + EventChainsResponseSchema, + FirstMentionsExtractResponseSchema, +} from '../../schemas/responses.js'; + +const TEST_USER = 'event-chains-route-test-user'; +const VALID_UUID = '00000000-0000-0000-0000-000000000001'; +const INVALID_UUID = 'not-a-uuid'; + +let server: ReturnType; +let baseUrl: string; +const app = express(); +app.use(express.json()); + +/** + * Stub LLM chatFn returning a static JSON array shaped like a valid + * first-mention extraction. Used for the happy-path test on + * /first-mentions/extract — the schema validation cases never reach + * the LLM call. + */ +function makeStubChatFn(json: string) { + return vi.fn(async () => ({ text: json })); +} + +beforeAll(async () => { + await setupTestSchema(pool); + + const repo = new MemoryRepository(pool); + const claimRepo = new ClaimRepository(pool); + const entityRepo = new EntityRepository(pool); + const tllRepo = new TllRepository(pool); + const fmRepo = new FirstMentionRepository(pool); + const stubChat = makeStubChatFn(JSON.stringify([ + { topic: 'sample topic alpha', turn_id: 1, session_id: 1, anchor_date: null }, + { topic: 'sample topic beta', turn_id: 3, session_id: 1, anchor_date: null }, + ])); + const fmService = new FirstMentionService(fmRepo, stubChat); + const service = new MemoryService( + repo, + claimRepo, + entityRepo, + undefined, + undefined, + undefined, + undefined, + tllRepo, + fmService, + ); + app.use('/memories', createMemoryRouter(service)); + + await new Promise((resolve) => { + server = app.listen(0, () => { + const addr = server.address(); + const port = typeof addr === 'object' && addr ? addr.port : 0; + baseUrl = `http://localhost:${port}`; + resolve(); + }); + }); +}); + +afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + await pool.end(); +}); + +// --------------------------------------------------------------------------- +// GET /v1/memories/event-chains — schema validation +// --------------------------------------------------------------------------- + +describe('GET /memories/event-chains — schema validation', () => { + it('returns 400 when user_id is missing', async () => { + const res = await fetch(`${baseUrl}/memories/event-chains?entity_ids=${VALID_UUID}`); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/user_id/i); + }); + + it('returns 400 when entity_ids is missing', async () => { + const res = await fetch(`${baseUrl}/memories/event-chains?user_id=${TEST_USER}`); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/entity_ids/i); + }); + + it('returns 400 when entity_ids contains an invalid UUID', async () => { + const res = await fetch( + `${baseUrl}/memories/event-chains?user_id=${TEST_USER}&entity_ids=${VALID_UUID},${INVALID_UUID}`, + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/valid UUIDs/i); + }); + + it('returns 400 when entity_ids exceeds the 100-entry cap (review #6)', async () => { + // Build 101 distinct UUIDs to trip the anti-amplification cap. + const ids = Array.from({ length: 101 }, (_, i) => { + const hex = (i + 1).toString(16).padStart(12, '0'); + return `00000000-0000-0000-0000-${hex}`; + }); + const res = await fetch( + `${baseUrl}/memories/event-chains?user_id=${TEST_USER}&entity_ids=${ids.join(',')}`, + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/at most 100/i); + }); + + it('returns 400 when entity_ids is present but contains only empty tokens', async () => { + const res = await fetch( + `${baseUrl}/memories/event-chains?user_id=${TEST_USER}&entity_ids=,,,`, + ); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/non-empty/i); + }); +}); + +// --------------------------------------------------------------------------- +// GET /v1/memories/event-chains — happy path +// --------------------------------------------------------------------------- + +describe('GET /memories/event-chains — happy path', () => { + it('returns the seeded chain in the EventChainsResponseSchema shape', async () => { + const repo = new MemoryRepository(pool); + const entityRepo = new EntityRepository(pool); + const tllRepo = new TllRepository(pool); + const userId = `${TEST_USER}-happy`; + + // Clean slate for this user. + await pool.query('DELETE FROM temporal_linkage_list WHERE user_id = $1', [userId]); + + const memId = await repo.storeMemory({ + userId, + content: 'Started using Postgres on the project.', + embedding: unitVector(101), + importance: 0.7, + sourceSite: 'event-chains-test', + }); + const entityId = await entityRepo.resolveEntity({ + userId, + name: 'Postgres', + entityType: 'tool', + embedding: unitVector(102), + }); + await tllRepo.append(userId, memId, [entityId], new Date('2026-01-15T00:00:00Z')); + + const res = await fetch( + `${baseUrl}/memories/event-chains?user_id=${userId}&entity_ids=${entityId}`, + ); + expect(res.status).toBe(200); + const body = await res.json(); + const parsed = EventChainsResponseSchema.parse(body); + expect(parsed.chains).toHaveLength(1); + expect(parsed.chains[0].entity_id).toBe(entityId); + expect(parsed.chains[0].events).toHaveLength(1); + const ev = parsed.chains[0].events[0]; + expect(ev.memory_id).toBe(memId); + expect(ev.position_in_chain).toBe(0); + expect(ev.predecessor_memory_id).toBeNull(); + expect(typeof ev.observation_date).toBe('string'); + }); +}); + +// --------------------------------------------------------------------------- +// POST /v1/memories/first-mentions/extract — schema validation +// --------------------------------------------------------------------------- + +describe('POST /memories/first-mentions/extract — schema validation', () => { + async function postExpecting400(body: Record): Promise<{ error: string }> { + const res = await fetch(`${baseUrl}/memories/first-mentions/extract`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }); + expect(res.status).toBe(400); + return (await res.json()) as { error: string }; + } + + it('returns 400 when user_id is missing', async () => { + const { error } = await postExpecting400({ + conversation_text: 'hi', + source_site: 'beam', + memory_ids_by_turn_id: { '1': VALID_UUID }, + }); + expect(error).toMatch(/user_id/i); + }); + + it('returns 400 when conversation_text is empty', async () => { + const { error } = await postExpecting400({ + user_id: TEST_USER, + conversation_text: '', + source_site: 'beam', + memory_ids_by_turn_id: { '1': VALID_UUID }, + }); + // Zod min(1) message — match loosely on the field name. + expect(error).toMatch(/conversation_text/i); + }); + + it('returns 400 when conversation_text exceeds the max length cap', async () => { + // The schema cap is 100_000 characters. Build a string just over it. + const oversized = 'x'.repeat(100_001); + const { error } = await postExpecting400({ + user_id: TEST_USER, + conversation_text: oversized, + source_site: 'beam', + memory_ids_by_turn_id: { '1': VALID_UUID }, + }); + expect(error).toMatch(/conversation_text/i); + }); + + it('returns 400 when memory_ids_by_turn_id is missing entirely', async () => { + const { error } = await postExpecting400({ + user_id: TEST_USER, + conversation_text: 'hi', + source_site: 'beam', + }); + expect(error).toMatch(/memory_ids_by_turn_id/i); + }); + + it('returns 400 when source_site is missing', async () => { + const { error } = await postExpecting400({ + user_id: TEST_USER, + conversation_text: 'hi', + memory_ids_by_turn_id: { '1': VALID_UUID }, + }); + expect(error).toMatch(/source_site/i); + }); +}); + +// --------------------------------------------------------------------------- +// POST /v1/memories/first-mentions/extract — happy path +// --------------------------------------------------------------------------- + +async function seedMemoryFor(userId: string, content: string, seed: number): Promise { + const repo = new MemoryRepository(pool); + return repo.storeMemory({ + userId, + content, + embedding: unitVector(seed), + importance: 0.6, + sourceSite: 'first-mentions-test', + }); +} + +describe('POST /memories/first-mentions/extract — happy path', () => { + it('returns the stub-extracted events in the FirstMentionsExtractResponseSchema shape', async () => { + const userId = `${TEST_USER}-fm-happy`; + const mem1 = await seedMemoryFor(userId, 'turn 1 memory', 201); + const mem3 = await seedMemoryFor(userId, 'turn 3 memory', 202); + + const res = await fetch(`${baseUrl}/memories/first-mentions/extract`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + user_id: userId, + conversation_text: 'turn 1: started X. turn 3: switched to Y.', + source_site: 'first-mentions-test', + memory_ids_by_turn_id: { '1': mem1, '3': mem3 }, + }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + const parsed = FirstMentionsExtractResponseSchema.parse(body); + expect(parsed.events).toHaveLength(2); + // Topics come from the stub chatFn; positions are post-sorted (review #7). + expect(parsed.events.map((e) => e.topic)).toEqual([ + 'sample topic alpha', + 'sample topic beta', + ]); + expect(parsed.events.map((e) => e.position_in_conversation)).toEqual([0, 1]); + expect(parsed.events.map((e) => e.memory_id)).toEqual([mem1, mem3]); + }); +}); diff --git a/src/routes/memories.ts b/src/routes/memories.ts index ece2a62..2c0fb54 100644 --- a/src/routes/memories.ts +++ b/src/routes/memories.ts @@ -56,6 +56,8 @@ import { ResetSourceBodySchema, LessonReportBodySchema, UserIdQuerySchema, + EventChainsQuerySchema, + FirstMentionsExtractBodySchema, UserIdLimitQuerySchema, ListQuerySchema, MemoryByIdQuerySchema, @@ -142,6 +144,8 @@ export function createMemoryRouter( registerStatsRoute(router, service); registerHealthRoute(router, configRouteAdapter); registerConfigRoute(router, configRouteAdapter); + registerEventChainsRoute(router, service); + registerFirstMentionsExtractRoute(router, service); registerConsolidateRoute(router, service); registerDecayRoute(router, service); registerCapRoute(router, service); @@ -441,6 +445,80 @@ function registerConfigRoute(router: Router, configRouteAdapter: RuntimeConfigRo }); } +/** + * GET /v1/memories/event-chains?user_id=X&entity_ids=uuid1,uuid2 + * + * Returns per-entity chronological event chains from the Temporal Linkage + * List. Entities with no events are dropped from the result. Within an + * entity, events are ordered by position_in_chain ASC. Used by EO-shaped + * read paths that need content alongside chain position. Returns + * `{ chains: [] }` if TLL is disabled in the runtime config. + */ +function registerEventChainsRoute(router: Router, service: MemoryService): void { + router.get('/event-chains', validateQuery(EventChainsQuerySchema), async (req: Request, res: Response) => { + try { + const q = req.query as unknown as { userId: string; entityIds: string[] }; + const chains = await service.getEventChains(q.userId, q.entityIds); + res.json({ + chains: chains.map(c => ({ + entity_id: c.entityId, + events: c.events.map(e => ({ + memory_id: e.memoryId, + content: e.content, + observation_date: e.observationDate.toISOString(), + position_in_chain: e.positionInChain, + predecessor_memory_id: e.predecessorMemoryId, + })), + })), + }); + } catch (err) { + handleRouteError(res, 'GET /v1/memories/event-chains', err); + } + }); +} + +/** + * POST /v1/memories/first-mentions/extract + * + * Extract first-mention events from a conversation transcript and persist + * them. Caller supplies the turn-id-to-memory-id mapping. Returns the + * parsed events. Best-effort: if FirstMentionService is not wired or the + * underlying LLM call fails, returns `{ events: [] }` without throwing. + */ +function registerFirstMentionsExtractRoute(router: Router, service: MemoryService): void { + router.post( + '/first-mentions/extract', + validateBody(FirstMentionsExtractBodySchema), + async (req: Request, res: Response) => { + try { + const b = req.body as unknown as { + userId: string; + conversationText: string; + sourceSite: string; + memoryIdsByTurnId: Map; + }; + const events = await service.extractFirstMentions( + b.userId, + b.conversationText, + b.sourceSite, + b.memoryIdsByTurnId, + ); + res.json({ + events: events.map(e => ({ + topic: e.topic, + turn_id: e.turnId, + memory_id: e.memoryId, + anchor_date: e.anchorDate ? e.anchorDate.toISOString() : null, + position_in_conversation: e.positionInConversation, + })), + }); + } catch (err) { + handleRouteError(res, 'POST /v1/memories/first-mentions/extract', err); + } + }, + ); +} + function registerConsolidateRoute(router: Router, service: MemoryService): void { router.post('/consolidate', validateBody(ConsolidateBodySchema), async (req: Request, res: Response) => { try { diff --git a/src/routes/response-schema-map.ts b/src/routes/response-schema-map.ts index f4983d9..4e16827 100644 --- a/src/routes/response-schema-map.ts +++ b/src/routes/response-schema-map.ts @@ -21,6 +21,8 @@ export const MEMORY_RESPONSE_SCHEMAS: ResponseSchemaMap = { 'get /stats': R.StatsResponseSchema, 'get /health': R.HealthResponseSchema, 'put /config': R.ConfigUpdateResponseSchema, + 'get /event-chains': R.EventChainsResponseSchema, + 'post /first-mentions/extract': R.FirstMentionsExtractResponseSchema, 'post /consolidate': R.ConsolidateResponseSchema, 'post /decay': R.DecayResponseSchema, 'get /cap': R.CapResponseSchema, diff --git a/src/schemas/memories.ts b/src/schemas/memories.ts index 042b6cb..3e18c8f 100644 --- a/src/schemas/memories.ts +++ b/src/schemas/memories.ts @@ -534,6 +534,76 @@ export const MemoryByIdQuerySchema = z export type MemoryByIdQuery = z.infer; +/** + * Anti-amplification cap for `GET /v1/memories/event-chains`. The endpoint + * fans out per entity (one chain hydration each); without an upper bound a + * single request can pull tens of thousands of rows. + */ +const MAX_ENTITY_IDS_PER_REQUEST = 100; + +/** + * Query for `GET /v1/memories/event-chains`. Accepts a comma-separated list + * of entity UUIDs and returns the per-entity ordered event chain. + * `entity_ids` is parsed as comma-separated, trimmed, deduplicated; empty + * tokens skipped. At least one valid UUID required, capped at + * MAX_ENTITY_IDS_PER_REQUEST entries. + */ +export const EventChainsQuerySchema = z + .object({ + user_id: RequiredQueryString, + entity_ids: RequiredQueryString, + }) + .transform(q => { + const ids = [...new Set( + q.entity_ids + .split(',') + .map(s => s.trim()) + .filter(s => s.length > 0), + )]; + return { userId: q.user_id, entityIds: ids }; + }) + .refine(q => q.entityIds.length > 0, { + message: 'entity_ids must contain at least one non-empty value', + }) + .refine(q => q.entityIds.length <= MAX_ENTITY_IDS_PER_REQUEST, { + message: `entity_ids must contain at most ${MAX_ENTITY_IDS_PER_REQUEST} values`, + }) + .refine(q => q.entityIds.every(id => UUID_REGEX.test(id)), { + message: 'entity_ids entries must be valid UUIDs', + }); + +export type EventChainsQuery = z.infer; + +/** + * Body for `POST /v1/memories/first-mentions/extract`. Accepts the full + * conversation transcript plus a turn-id-to-memory-id mapping (the harness + * provides this — the in-core ingest pipeline does not retain turn + * structure). The service runs a single LLM call to identify chronological + * topic-introduction events and persists them. + */ +export const FirstMentionsExtractBodySchema = z + .object({ + user_id: z.string().min(1), + conversation_text: z.string().min(1).max(MAX_CONVERSATION_LENGTH), + source_site: z.string().min(1), + memory_ids_by_turn_id: z.record(z.string(), z.string()), + }) + .transform(b => { + const map = new Map(); + for (const [k, v] of Object.entries(b.memory_ids_by_turn_id)) { + const n = Number(k); + if (Number.isFinite(n) && Number.isInteger(n)) map.set(n, v); + } + return { + userId: b.user_id, + conversationText: b.conversation_text, + sourceSite: b.source_site, + memoryIdsByTurnId: map, + }; + }); + +export type FirstMentionsExtractBody = z.infer; + // --------------------------------------------------------------------------- // Path params // --------------------------------------------------------------------------- diff --git a/src/schemas/responses.ts b/src/schemas/responses.ts index 0613a71..869c11d 100644 --- a/src/schemas/responses.ts +++ b/src/schemas/responses.ts @@ -243,6 +243,29 @@ export const ConsolidateResponseSchema = z.union([ ConsolidateExecuteResponseSchema, ]).openapi({ description: 'Consolidation result — scan or execute.' }); +export const FirstMentionsExtractResponseSchema = z.object({ + events: z.array(z.object({ + topic: z.string(), + turn_id: z.number(), + memory_id: z.string(), + anchor_date: z.string().nullable(), + position_in_conversation: z.number(), + })), +}).openapi({ description: 'Extracted first-mention events for a conversation.' }); + +export const EventChainsResponseSchema = z.object({ + chains: z.array(z.object({ + entity_id: z.string(), + events: z.array(z.object({ + memory_id: z.string(), + content: z.string(), + observation_date: z.string(), + position_in_chain: z.number(), + predecessor_memory_id: z.string().nullable(), + })), + })), +}).openapi({ description: 'Per-entity chronological event chains from the Temporal Linkage List.' }); + export const DecayResponseSchema = z.object({ memories_evaluated: z.number(), candidates_for_archival: z.array(DecayCandidateSchema), diff --git a/src/services/__tests__/cross-workspace-coupling-fence.test.ts b/src/services/__tests__/cross-workspace-coupling-fence.test.ts index 121995b..d85bda1 100644 --- a/src/services/__tests__/cross-workspace-coupling-fence.test.ts +++ b/src/services/__tests__/cross-workspace-coupling-fence.test.ts @@ -97,6 +97,8 @@ function makeDeps(entityStore: any): MemoryServiceDeps { entity: entityStore, } as any, observationService: null, + tllRepository: null, + firstMentionService: null, uriResolver: {} as any, }; } diff --git a/src/services/__tests__/first-mention-service.test.ts b/src/services/__tests__/first-mention-service.test.ts new file mode 100644 index 0000000..8a9bc33 --- /dev/null +++ b/src/services/__tests__/first-mention-service.test.ts @@ -0,0 +1,266 @@ +/** + * Unit tests for FirstMentionService. + * Validates LLM-output parsing, salvage path for truncated arrays, + * mapping turn_id -> memory_id, schema filtering, and storage interaction. + * The repository and chatFn are both mocked — no DB or network access. + */ + +import { beforeEach, describe, it, expect, vi } from 'vitest'; +import { + FirstMentionRepository, + type FirstMentionEvent, +} from '../../db/repository-first-mentions.js'; +import { FirstMentionService } from '../first-mention-service.js'; + +interface ChatResult { + text: string; +} + +function makeRepo() { + const store = vi.fn().mockResolvedValue(undefined); + const getByMemoryId = vi.fn().mockResolvedValue(null); + const list = vi.fn().mockResolvedValue([]); + const repo = { store, getByMemoryId, list } as unknown as FirstMentionRepository; + return { repo, store, getByMemoryId, list }; +} + +function chatReturning(text: string) { + return vi.fn( + async (): Promise => ({ text }), + ); +} + +describe('FirstMentionService.extractAndStore', () => { + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('happy path: stores and returns events from valid JSON array', async () => { + const json = JSON.stringify([ + { topic: 'auth setup', turn_id: 2, session_id: 1, anchor_date: '2026-01-15T00:00:00Z' }, + { topic: 'redis cache', turn_id: 5, session_id: 1, anchor_date: null }, + ]); + const { repo, store } = makeRepo(); + const chatFn = chatReturning(json); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([ + [2, 'mem-aaaa'], + [5, 'mem-bbbb'], + ]); + + const result = await svc.extractAndStore('user-1', 'conv text', 'beam', memoryIds); + + expect(result).toHaveLength(2); + // positionInConversation is the 0-based index in the post-sorted + // output, not the turn_id (review #7 idempotency fix). + expect(result[0]).toMatchObject({ + topic: 'auth setup', + turnId: 2, + memoryId: 'mem-aaaa', + positionInConversation: 0, + }); + expect(result[1]).toMatchObject({ + topic: 'redis cache', + turnId: 5, + memoryId: 'mem-bbbb', + positionInConversation: 1, + }); + expect(result[0].anchorDate).toBeInstanceOf(Date); + expect(result[1].anchorDate).toBeNull(); + expect(store).toHaveBeenCalledWith('user-1', 'beam', result); + }); +}); + +describe('FirstMentionService salvage and error paths', () => { + it('salvage path: recovers events from truncated JSON', async () => { + const truncated = + '[\n' + + ' {"topic": "first topic", "turn_id": 1, "session_id": 1, "anchor_date": null},\n' + + ' {"topic": "second topic", "turn_id": 3, "session_id": 1, "anchor_date": null}'; + const { repo, store } = makeRepo(); + const chatFn = chatReturning(truncated); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([ + [1, 'mem-1'], + [3, 'mem-3'], + ]); + + const result = await svc.extractAndStore('u', 'conv', 'beam', memoryIds); + expect(result.map((e) => e.topic)).toEqual(['first topic', 'second topic']); + expect(store).toHaveBeenCalledTimes(1); + }); + + it('garbage path: returns [] and skips repo.store when no array found', async () => { + const { repo, store } = makeRepo(); + const chatFn = chatReturning('I cannot help with that.'); + const svc = new FirstMentionService(repo, chatFn); + + const result = await svc.extractAndStore('u', 'conv', 'beam', new Map()); + expect(result).toEqual([]); + expect(store).not.toHaveBeenCalled(); + }); + + it('non-array JSON: returns [] without storing', async () => { + const { repo, store } = makeRepo(); + const chatFn = chatReturning('{"topic": "not an array"}'); + const svc = new FirstMentionService(repo, chatFn); + + const result = await svc.extractAndStore('u', 'conv', 'beam', new Map()); + expect(result).toEqual([]); + expect(store).not.toHaveBeenCalled(); + }); + + it('chatFn throws: returns [] without throwing', async () => { + const { repo, store } = makeRepo(); + const chatFn = vi.fn(async () => { + throw new Error('upstream timeout'); + }); + const svc = new FirstMentionService(repo, chatFn); + + const result = await svc.extractAndStore('u', 'conv', 'beam', new Map()); + expect(result).toEqual([]); + expect(store).not.toHaveBeenCalled(); + expect(chatFn).toHaveBeenCalledTimes(1); + }); +}); + +describe('FirstMentionService mapping and filtering', () => { + it('drops events whose turn_id has no memoryId mapping', async () => { + const json = JSON.stringify([ + { topic: 'mapped topic', turn_id: 2, session_id: 1, anchor_date: null }, + { topic: 'orphan topic', turn_id: 99, session_id: 1, anchor_date: null }, + ]); + const { repo, store } = makeRepo(); + const chatFn = chatReturning(json); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([[2, 'mem-2']]); + const result = await svc.extractAndStore('u', 'conv', 'beam', memoryIds); + + expect(result).toHaveLength(1); + expect(result[0].turnId).toBe(2); + expect(store).toHaveBeenCalledWith('u', 'beam', result); + }); + + it('drops entries missing required fields (topic / turn_id)', async () => { + const json = JSON.stringify([ + { topic: 'good', turn_id: 1, session_id: 1, anchor_date: null }, + { turn_id: 2, session_id: 1 }, + { topic: 'no turn id', session_id: 1 }, + { topic: 42, turn_id: 3 }, + null, + 'string entry', + ]); + const { repo, store } = makeRepo(); + const chatFn = chatReturning(json); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([ + [1, 'mem-1'], + [2, 'mem-2'], + [3, 'mem-3'], + ]); + const result = await svc.extractAndStore('u', 'conv', 'beam', memoryIds); + + expect(result).toHaveLength(1); + expect(result[0]).toMatchObject({ topic: 'good', turnId: 1 }); + expect(store).toHaveBeenCalledTimes(1); + }); + + it('parses anchor_date strings and tolerates invalid dates as null', async () => { + const json = JSON.stringify([ + { topic: 'a', turn_id: 1, session_id: 1, anchor_date: '2026-03-10T12:00:00Z' }, + { topic: 'b', turn_id: 2, session_id: 1, anchor_date: 'not-a-date' }, + { topic: 'c', turn_id: 3, session_id: 1, anchor_date: null }, + ]); + const { repo } = makeRepo(); + const chatFn = chatReturning(json); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([ + [1, 'mem-1'], + [2, 'mem-2'], + [3, 'mem-3'], + ]); + const result: FirstMentionEvent[] = await svc.extractAndStore( + 'u', + 'conv', + 'beam', + memoryIds, + ); + + expect(result).toHaveLength(3); + expect(result[0].anchorDate).toBeInstanceOf(Date); + expect(result[1].anchorDate).toBeNull(); + expect(result[2].anchorDate).toBeNull(); + }); + + it('sorts results by turnId ascending and assigns 0-based positionInConversation', async () => { + const json = JSON.stringify([ + { topic: 'late', turn_id: 9, session_id: 1, anchor_date: null }, + { topic: 'early', turn_id: 2, session_id: 1, anchor_date: null }, + { topic: 'middle', turn_id: 5, session_id: 1, anchor_date: null }, + ]); + const { repo } = makeRepo(); + const chatFn = chatReturning(json); + const svc = new FirstMentionService(repo, chatFn); + + const memoryIds = new Map([ + [2, 'mem-2'], + [5, 'mem-5'], + [9, 'mem-9'], + ]); + const result = await svc.extractAndStore('u', 'conv', 'beam', memoryIds); + + expect(result.map((e) => e.topic)).toEqual(['early', 'middle', 'late']); + expect(result.map((e) => e.turnId)).toEqual([2, 5, 9]); + // positionInConversation is the post-sorted index, not turn_id — + // see the JSDoc on `mapToEvents` for the idempotency rationale + // (review #7). + expect(result.map((e) => e.positionInConversation)).toEqual([0, 1, 2]); + }); + + it('produces stable positionInConversation across re-runs even when LLM turn_id drifts', async () => { + // Idempotency guarantee for review #7: when the same conversation + // is extracted twice with slightly different LLM-emitted turn_ids + // for the same logical topics, both runs must produce + // positionInConversation = 0, 1, ... in turn-order. The + // `(user_id, memory_id)` UNIQUE on first_mention_events would + // otherwise let stale-position rows survive and read paths would + // see different orderings for the same data depending on which + // run wrote first. + const memoryIds = new Map([ + [5, 'mem-A'], + [6, 'mem-A'], + [12, 'mem-B'], + ]); + + const { repo: repo1 } = makeRepo(); + const svc1 = new FirstMentionService( + repo1, + chatReturning(JSON.stringify([ + { topic: 'A', turn_id: 5, session_id: 1, anchor_date: null }, + { topic: 'B', turn_id: 12, session_id: 1, anchor_date: null }, + ])), + ); + const r1 = await svc1.extractAndStore('u', 'conv', 'beam', memoryIds); + + const { repo: repo2 } = makeRepo(); + const svc2 = new FirstMentionService( + repo2, + chatReturning(JSON.stringify([ + // Same logical topics, but the LLM drifted A from turn 5 -> 6. + { topic: 'A', turn_id: 6, session_id: 1, anchor_date: null }, + { topic: 'B', turn_id: 12, session_id: 1, anchor_date: null }, + ])), + ); + const r2 = await svc2.extractAndStore('u', 'conv', 'beam', memoryIds); + + expect(r1.map((e) => e.positionInConversation)).toEqual([0, 1]); + expect(r2.map((e) => e.positionInConversation)).toEqual([0, 1]); + expect(r1.map((e) => e.topic)).toEqual(['A', 'B']); + expect(r2.map((e) => e.topic)).toEqual(['A', 'B']); + }); +}); diff --git a/src/services/__tests__/tll-retrieval.test.ts b/src/services/__tests__/tll-retrieval.test.ts new file mode 100644 index 0000000..f44a71f --- /dev/null +++ b/src/services/__tests__/tll-retrieval.test.ts @@ -0,0 +1,230 @@ +/** + * Unit tests for TLL retrieval signal helpers. + * + * Covers: + * - shouldUseTLL query classification (positive + negative cases) + * - entitiesForMemories SQL shape and empty-input shortcut + * - expandViaTLL composition (entities lookup -> chain expansion), + * empty-input shortcut, and the 10-id slice cap on initial inputs. + * + * No DB required: pg.Pool and TllRepository are mocked via vi.fn(). + */ + +import { describe, it, expect, vi } from 'vitest'; +import type pg from 'pg'; +import { + shouldUseTLL, + entitiesForMemories, + expandViaTLL, + TLL_ENTITY_LOOKUP_SEED_LIMIT, +} from '../tll-retrieval.js'; +import type { TllRepository } from '../../db/repository-tll.js'; + +interface QueryRow { + entity_id: string; +} + +interface QueryResult { + rows: QueryRow[]; +} + +/** Build a minimal pg.Pool stub whose query() returns the supplied rows. */ +function makePool(rows: QueryRow[]): { + pool: pg.Pool; + query: ReturnType; +} { + const query = vi.fn<(sql: string, params?: unknown[]) => Promise>() + .mockResolvedValue({ rows }); + const pool = { query } as unknown as pg.Pool; + return { pool, query }; +} + +/** Build a minimal TllRepository stub exposing only chainsFor(). */ +function makeTllRepo(chainResult: string[]): { + repo: TllRepository; + chainsFor: ReturnType; +} { + const chainsFor = vi.fn< + (userId: string, entityIds: string[]) => Promise + >().mockResolvedValue(chainResult); + const repo = { chainsFor } as unknown as TllRepository; + return { repo, chainsFor }; +} + +describe('shouldUseTLL', () => { + // Canonical EO/MSR/TR question shapes — each fires either via two + // ordering terms or via a single structural sequence phrase. The + // pre-tightened regex over-fired on single-token ordering hits like + // "what is my first name" / "the model used before GPT-4"; the + // updated gate trades a few rare single-word matches for sharper + // precision on these canonical shapes. + const positiveQueries = [ + // SEQUENCE_PATTERNS hits + 'in what order did I bring up X', + 'in chronological order list the events', + 'when did the user move to Berlin', + 'since when has X been deprecated', + 'how preferences shifted over time', + 'show the evolution of the project', + 'what is the history of this codebase', + 'build me a timeline of changes', + 'when the topic was brought up', + 'what did the user originally say', + 'what did they initially mention', + 'show progression of editor choice', + 'how did the architecture evolve', + 'how have my preferences shifted', + // Two ordering terms (co-occurrence) + 'first this then that', + 'what aspects did I discuss before vs after the launch', + 'first the migration, then the rollback', + 'what came earlier and what came later', + ]; + + it.each(positiveQueries)('returns true for ordering query: %s', (q) => { + expect(shouldUseTLL(q)).toBe(true); + }); + + it('matches case-insensitively', () => { + expect(shouldUseTLL('What Is The HISTORY OF this?')).toBe(true); + expect(shouldUseTLL('TIMELINE OF events please')).toBe(true); + }); + + const negativeQueries = [ + // Non-temporal shapes (existing coverage) + 'what is X', + 'list all the entities', + 'explain why this is a tool', + 'who is the current owner', + 'tell me about the project', + 'summarize the discussion', + // False-positive shapes that the prior loose regex incorrectly + // matched (review #9). These must stay false under the new gate. + 'what is my first name', + 'the model used before GPT-4', + 'track my spending', + 'we then moved on to lunch', + 'what is the next step', + 'is this the previous version', + 'what came last in the queue', + ]; + + it.each(negativeQueries)('returns false for non-temporal query: %s', (q) => { + expect(shouldUseTLL(q)).toBe(false); + }); +}); + +describe('entitiesForMemories', () => { + it('returns [] without touching the pool when memoryIds is empty', async () => { + const { pool, query } = makePool([]); + const result = await entitiesForMemories(pool, []); + expect(result).toEqual([]); + expect(query).not.toHaveBeenCalled(); + }); + + it('issues a DISTINCT query against memory_entities and maps entity_ids', async () => { + const { pool, query } = makePool([ + { entity_id: 'e-1' }, + { entity_id: 'e-2' }, + ]); + + const result = await entitiesForMemories(pool, ['m-1', 'm-2']); + + expect(result).toEqual(['e-1', 'e-2']); + expect(query).toHaveBeenCalledTimes(1); + const [sql, params] = query.mock.calls[0]; + expect(sql).toMatch(/SELECT\s+DISTINCT\s+entity_id/i); + expect(sql).toMatch(/FROM\s+memory_entities/i); + expect(sql).toMatch(/memory_id\s*=\s*ANY\(\$1::uuid\[\]\)/i); + expect(params).toEqual([['m-1', 'm-2']]); + }); + + it('returns an empty array when no rows match', async () => { + const { pool } = makePool([]); + const result = await entitiesForMemories(pool, ['m-1']); + expect(result).toEqual([]); + }); +}); + +describe('expandViaTLL', () => { + const USER = 'u-1'; + + it('returns [] without any work when initialMemoryIds is empty', async () => { + const { pool, query } = makePool([]); + const { repo, chainsFor } = makeTllRepo([]); + + const result = await expandViaTLL(USER, [], repo, pool); + + expect(result).toEqual([]); + expect(query).not.toHaveBeenCalled(); + expect(chainsFor).not.toHaveBeenCalled(); + }); + + it('returns [] and skips chainsFor when no entities are found', async () => { + const { pool, query } = makePool([]); + const { repo, chainsFor } = makeTllRepo([]); + + const result = await expandViaTLL(USER, ['m-1'], repo, pool); + + expect(result).toEqual([]); + expect(query).toHaveBeenCalledTimes(1); + expect(chainsFor).not.toHaveBeenCalled(); + }); + + it('looks up entities first then expands via chainsFor', async () => { + const callOrder: string[] = []; + const { pool, query } = makePool([{ entity_id: 'e-1' }, { entity_id: 'e-2' }]); + query.mockImplementationOnce(async () => { + callOrder.push('entitiesForMemories'); + return { rows: [{ entity_id: 'e-1' }, { entity_id: 'e-2' }] }; + }); + const { repo, chainsFor } = makeTllRepo(['mem-a', 'mem-b']); + chainsFor.mockImplementationOnce(async () => { + callOrder.push('chainsFor'); + return ['mem-a', 'mem-b']; + }); + + const result = await expandViaTLL(USER, ['m-1', 'm-2'], repo, pool); + + expect(result).toEqual(['mem-a', 'mem-b']); + expect(callOrder).toEqual(['entitiesForMemories', 'chainsFor']); + expect(chainsFor).toHaveBeenCalledWith(USER, ['e-1', 'e-2']); + }); + + it('slices initialMemoryIds to TLL_ENTITY_LOOKUP_SEED_LIMIT before entity lookup', async () => { + const { pool, query } = makePool([{ entity_id: 'e-1' }]); + const { repo } = makeTllRepo(['mem-x']); + + const inputIds = Array.from( + { length: TLL_ENTITY_LOOKUP_SEED_LIMIT * 2 + 5 }, + (_, i) => `m-${i}`, + ); + await expandViaTLL(USER, inputIds, repo, pool); + + expect(query).toHaveBeenCalledTimes(1); + const params = query.mock.calls[0][1] as unknown[]; + const passedIds = params[0] as string[]; + expect(passedIds).toHaveLength(TLL_ENTITY_LOOKUP_SEED_LIMIT); + expect(passedIds).toEqual(inputIds.slice(0, TLL_ENTITY_LOOKUP_SEED_LIMIT)); + }); + + it('does not slice when initialMemoryIds length is <= TLL_ENTITY_LOOKUP_SEED_LIMIT', async () => { + const { pool, query } = makePool([{ entity_id: 'e-1' }]); + const { repo } = makeTllRepo(['mem-x']); + + const inputIds = ['m-1', 'm-2', 'm-3']; + await expandViaTLL(USER, inputIds, repo, pool); + + const params = query.mock.calls[0][1] as unknown[]; + expect(params[0]).toEqual(inputIds); + }); + + it('forwards the userId to chainsFor verbatim', async () => { + const { pool } = makePool([{ entity_id: 'e-7' }]); + const { repo, chainsFor } = makeTllRepo([]); + + await expandViaTLL('user-tenant-42', ['m-1'], repo, pool); + + expect(chainsFor).toHaveBeenCalledWith('user-tenant-42', ['e-7']); + }); +}); diff --git a/src/services/first-mention-service.ts b/src/services/first-mention-service.ts new file mode 100644 index 0000000..8cfbde6 --- /dev/null +++ b/src/services/first-mention-service.ts @@ -0,0 +1,244 @@ +/** + * First-mention extraction service — produces a chronological list of + * topic-introduction events from a conversation transcript. + * + * One LLM call scans the full transcript and outputs a JSON array of + * `{topic, turn_id, session_id, anchor_date}` records. The service + * maps those onto core's stricter `FirstMentionEvent` shape (joining + * turn_id to memory_id via a caller-supplied map) and persists via + * `FirstMentionRepository.store`. + * + * Best-effort: extraction failures are logged to stderr and produce an + * empty array. Storage errors are propagated (no silent swallow). + * + * Prompts and salvage parser were ported verbatim from the BEAM harness + * (`atomicmemory-benchmarks/data/exp-stage7-beam-dryrun/lib.ts`) so the + * core implementation matches the validated extraction behaviour. + */ + +import { + FirstMentionRepository, + type FirstMentionEvent, +} from '../db/repository-first-mentions.js'; + +/** + * Minimal chat-call shape for first-mention extraction. + * + * Token usage is intentionally not threaded here: cost telemetry for the + * underlying LLM call is already emitted from `LLMProvider.chat` (see + * `src/services/llm.ts` -> `writeCostEvent`). Reading per-call usage at + * this layer would require widening the LLMProvider.chat return type to + * include usage and plumbing it through every adapter; until something in + * the FirstMentionService path actually consumes it, the extra surface + * area would only invite hardcoded zeros that mislead downstream readers. + */ +interface ChatResult { + text: string; +} + +type ChatFn = ( + system: string, + user: string, + maxTokens: number, +) => Promise; + +/** Loose shape produced by the LLM; mapped onto `FirstMentionEvent`. */ +interface RawFirstMention { + topic: string; + turn_id: number; + session_id?: number; + anchor_date?: string | null; +} + +const FIRST_MENTIONS_MAX_TOKENS = 8000; + +const FIRST_MENTIONS_SYSTEM = + 'You scan a conversation chronologically and identify FIRST-MENTION events: ' + + 'the moment when a NEW major topic is introduced for the first time. ' + + 'A "topic" is a specific aspect, feature, tech decision, problem, milestone, ' + + 'or planning item — NOT a sub-aspect of a previously-introduced topic and ' + + 'NOT a generic concept. Use SPECIFIC TECHNICAL PHRASES from the conversation ' + + 'verbatim (version numbers, library names, config flags, named operations). ' + + 'Order strictly by turn appearance. Output ONLY a JSON array.'; + +function buildFirstMentionsUser(turnsText: string): string { + return ( + `Conversation turns (in chronological order, with turn_id and session_id markers):\n\n` + + turnsText + + `\n\nWalk through the turns sequentially. For each turn that introduces ` + + `a NEW major topic (not discussed in any earlier turn), record one event:\n` + + ` {"topic": "", "turn_id": , ` + + `"session_id": , "anchor_date": ""}\n\n` + + `Output ONLY a valid JSON array (no markdown fences, no preamble). ` + + `Aim for 15-30 first-mentions across the whole conversation, covering ` + + `distinct major aspects spanning all sessions. ` + + `Each topic phrase MUST be SHORT (5-12 words). Use verbatim tech tokens ` + + `(version numbers, library names) inside the short phrase. ` + + `Skip generic chatter, sub-aspects of already-listed topics, and assistant ` + + `explanatory content. Close the JSON array with ] before any other output.` + ); +} + +/** + * Salvage parser: if the model truncated mid-array (no closing ]), + * find the last complete object and synthesize a closing bracket. + */ +function salvageJsonArray(text: string): string | null { + const start = text.indexOf('['); + if (start < 0) return null; + const lastBrace = text.lastIndexOf('}'); + if (lastBrace < start) return null; + return text.slice(start, lastBrace + 1) + ']'; +} + +function isRawFirstMention(value: unknown): value is RawFirstMention { + if (typeof value !== 'object' || value === null) return false; + const v = value as Record; + return typeof v.topic === 'string' && typeof v.turn_id === 'number'; +} + +function parseAnchorDate(raw: unknown): Date | null { + if (typeof raw !== 'string' || raw.length === 0) return null; + const ms = Date.parse(raw); + if (Number.isNaN(ms)) return null; + return new Date(ms); +} + +/** Extract a JSON array from raw LLM text, salvaging truncated output. */ +function extractJsonArray(text: string): unknown[] | null { + const trimmed = text.trim(); + const start = trimmed.indexOf('['); + if (start < 0) { + console.error( + `[first-mention-llm-failed] no opening [ in response (text len=${trimmed.length})`, + ); + return null; + } + const end = trimmed.lastIndexOf(']'); + let jsonSlice: string; + if (end <= start) { + const salvaged = salvageJsonArray(trimmed); + if (!salvaged) return null; + console.warn( + `[first-mention-llm-salvaged] response truncated; salvaged ${salvaged.length} chars`, + ); + jsonSlice = salvaged; + } else { + jsonSlice = trimmed.slice(start, end + 1); + } + const parsed: unknown = JSON.parse(jsonSlice); + if (!Array.isArray(parsed)) { + console.error( + `[first-mention-llm-failed] parsed JSON is not an array (type=${typeof parsed})`, + ); + return null; + } + return parsed; +} + +export class FirstMentionService { + constructor( + private repo: FirstMentionRepository, + private chatFn: ChatFn, + ) {} + + /** + * Extract first-mention events from a conversation transcript and + * persist them. Returns the parsed events (post-mapping). Best-effort: + * if the LLM call fails or returns unparseable output the method + * logs to stderr and returns `[]` without throwing. + * + * `memoryIdsByTurnId` provides the mapping the LLM cannot produce — + * any event whose `turn_id` is not present in the map is dropped. + */ + async extractAndStore( + userId: string, + conversationText: string, + sourceSite: string, + memoryIdsByTurnId: Map, + ): Promise { + const raw = await this.invokeLlm(conversationText); + if (raw === null) return []; + const events = this.mapToEvents(raw, memoryIdsByTurnId); + if (events.length > 0) { + await this.repo.store(userId, sourceSite, events); + } + return events; + } + + /** + * Run the extraction LLM call; return parsed array or null on failure. + * + * Fail-open by design: a flaky upstream LLM call should not crash the + * caller's `extractAndStore` invocation; the EO read path treats a + * missing first-mention list as "no signal" rather than a hard + * failure. The deliberate fallback is a structured + * `[first-mention-llm-failed]` log line plus null return so the + * failure is observable. + */ + private async invokeLlm(conversationText: string): Promise { + let rawText = ''; + try { + const res = await this.chatFn( + FIRST_MENTIONS_SYSTEM, + buildFirstMentionsUser(conversationText), + FIRST_MENTIONS_MAX_TOKENS, + ); + rawText = res.text; + return extractJsonArray(rawText); + } catch (err) { + const msg = err instanceof Error ? err.message.slice(0, 200) : String(err); + console.error(`[first-mention-llm-failed] ${msg}`); + if (rawText) { + console.error( + `[first-mention-llm-failed] raw response (first 500 chars): ${rawText.slice(0, 500).replace(/\n/g, ' ')}`, + ); + } + return null; + } + } + + /** + * Map LLM output records onto core's `FirstMentionEvent` shape. + * + * `positionInConversation` is the 0-based index in the FINAL turn-id- + * sorted output, NOT `turn_id` itself. This makes the mapping stable + * across re-runs even when the LLM emits a slightly different + * `turn_id` for the same logical topic on a second pass — the + * `(user_id, memory_id)` UNIQUE constraint in `first_mention_events` + * skips the duplicate insert, but if `position_in_conversation` + * tracked `turn_id` directly, an unrelated drift in the second run's + * turn assignment would mean callers reading the row see whichever + * turn_id was first written. Encoding position by post-sort index + * eliminates that variability — read-side ordering is now a stable + * sequence (0, 1, 2, ...) regardless of which LLM run produced it. + */ + private mapToEvents( + raw: unknown[], + memoryIdsByTurnId: Map, + ): FirstMentionEvent[] { + const filtered = raw.filter(isRawFirstMention); + const candidates: Array> = []; + for (const m of filtered) { + const memoryId = memoryIdsByTurnId.get(m.turn_id); + if (!memoryId) continue; + candidates.push({ + topic: m.topic, + turnId: m.turn_id, + memoryId, + anchorDate: parseAnchorDate(m.anchor_date), + }); + } + candidates.sort((a, b) => a.turnId - b.turnId); + const events: FirstMentionEvent[] = candidates.map((c, index) => ({ + ...c, + positionInConversation: index, + })); + if (events.length === 0 && filtered.length > 0) { + console.warn( + `[first-mention-mapping] ${filtered.length} parsed entries had no matching memory_id`, + ); + } + return events; + } +} diff --git a/src/services/memory-search.ts b/src/services/memory-search.ts index 6015777..661bfd7 100644 --- a/src/services/memory-search.ts +++ b/src/services/memory-search.ts @@ -23,6 +23,7 @@ import { resolveRelevanceGate, type RelevanceFilterDecision, } from './relevance-policy.js'; +import { shouldUseTLL, expandViaTLL, TLL_ENTITY_LOOKUP_SEED_LIMIT } from './tll-retrieval.js'; import type { AgentScope, WorkspaceContext } from '../db/repository-types.js'; import type { MemoryServiceDeps, RetrievalOptions, RetrievalResult } from './memory-service-types.js'; @@ -109,9 +110,106 @@ async function executeSearchStep( skipReranking: retrievalOptions?.skipReranking, runtimeConfig: deps.config, }); + // TLL augmentation runs AFTER the pipeline (which already applied its own + // relevance/similarity filtering). Chain-membership is a different + // retrieval signal than semantic similarity, so the augmented rows ride + // around the post-pipeline relevance gate (see appendChainAugmentation + // callsite in performSearch). Pipeline-filtered rows stay first. return { memories: pipelineResult.filtered, activeTrace: pipelineResult.trace }; } +/** + * Outcome of a TLL expansion attempt. `failed` flips to true only when + * an error is caught — empty `memories` with `failed: false` means the + * gate ran cleanly but produced no augmentation (no entities, all chain + * memories already in the candidate set, etc). + * + * Threaded back through `appendTllAugmentation` so the retrieval trace + * can record `tll_expansion_failed` instead of silently swallowing the + * exception. + */ +interface TllExpansionResult { + memories: SearchResult[]; + failed: boolean; + errorMessage?: string; +} + +/** + * TLL retrieval signal. For ordering/temporal/multi-session queries, expand + * the candidate set by traversing entity event chains. Returns hydrated + * SearchResult rows tagged with `retrieval_signal: 'tll-chain'` so the + * relevance gate can recognize them as non-similarity-scored augmentations. + * + * Fail-open by design: chain expansion errors never block primary + * retrieval. The deliberate fallback is to log the error with a + * structured `[tll-expansion-failed]` prefix and surface a + * `tll_expansion_failed` event on the retrieval trace so the failure is + * observable rather than lost. + */ +async function maybeExpandViaTLL( + deps: MemoryServiceDeps, + userId: string, + query: string, + memories: SearchResult[], + effectiveLimit: number, +): Promise { + if (!deps.tllRepository || memories.length === 0 || !shouldUseTLL(query)) { + return { memories: [], failed: false }; + } + try { + const initialIds = memories.slice(0, TLL_ENTITY_LOOKUP_SEED_LIMIT).map((m) => m.id); + const chainIds = await expandViaTLL(userId, initialIds, deps.tllRepository, deps.stores.pool); + const knownIds = new Set(memories.map((m) => m.id)); + const newIds = chainIds.filter((id) => !knownIds.has(id)).slice(0, effectiveLimit); + if (newIds.length === 0) return { memories: [], failed: false }; + const hydrated = await hydrateChainMemories(deps, userId, newIds); + console.log(`[tll-retrieval] expanded ${newIds.length} chain memories for ordering query`); + return { memories: hydrated, failed: false }; + } catch (err) { + // Fail-open: TLL is augmentation, never block primary retrieval. + // Structured prefix `[tll-expansion-failed]` lets log scrapers + // pick this up as a distinct failure class. The retrieval-trace + // event is added by the caller (see `appendTllAugmentation`). + const errorMessage = err instanceof Error ? err.message : String(err); + console.error('[tll-expansion-failed]', errorMessage); + return { memories: [], failed: true, errorMessage }; + } +} + +/** + * Direct SQL hydration into SearchResult shape — bypasses store + * abstraction since this is a deterministic chain-traversal augmentation, + * not a similarity search. Rows are tagged with `retrieval_signal: + * 'tll-chain'` and carry `similarity: null` so the relevance gate can + * skip them rather than score them against semantic similarity. + */ +async function hydrateChainMemories( + deps: MemoryServiceDeps, + userId: string, + newIds: string[], +): Promise { + const hydratedRes = await deps.stores.pool.query<{ id: string; content: string; created_at: Date; importance: number; namespace: string | null }>( + `SELECT id, content, created_at, importance, namespace + FROM memories + WHERE user_id = $1 AND id = ANY($2::uuid[]) + AND deleted_at IS NULL AND status = 'active'`, + [userId, newIds], + ); + return hydratedRes.rows.map((r) => ({ + id: r.id, + content: r.content, + similarity: null, + retrieval_signal: 'tll-chain', + created_at: r.created_at, + importance: Number(r.importance), + namespace: r.namespace, + tags: [], + keywords: [], + workspace_id: null, + agent_id: null, + } as unknown as SearchResult)); +} + /** Filter workspace-scoped, stale composites, and consensus-violating memories. */ async function postProcessResults( deps: MemoryServiceDeps, @@ -302,7 +400,47 @@ export async function performSearch( const filteredMemories = await postProcessResults( deps, rawMemories, activeTrace, userId, query, asOf, sourceSite, retrievalOptions, ); - return assembleResponse(deps, filteredMemories, query, userId, activeTrace, retrievalOptions, asOf, sourceSite, lessonCheck); + const augmented = await appendTllAugmentation(deps, userId, query, filteredMemories, effectiveLimit, activeTrace); + return assembleResponse(deps, augmented, query, userId, activeTrace, retrievalOptions, asOf, sourceSite, lessonCheck); +} + +/** + * Append TLL chain-membership augmentations after the relevance gate. The + * augmented rows ride around the similarity threshold because chain + * membership is a structurally different signal — they have no + * meaningful similarity score against the query. + */ +async function appendTllAugmentation( + deps: MemoryServiceDeps, + userId: string, + query: string, + postProcessed: PostProcessedSearch, + effectiveLimit: number, + activeTrace: TraceCollector, +): Promise { + const result = await maybeExpandViaTLL( + deps, + userId, + query, + postProcessed.memories, + effectiveLimit, + ); + // Surface the fail-open path on the retrieval trace as a distinct + // event so the failure is observable in trace artifacts even when no + // augmentation rows are produced. Pairs with the structured + // `[tll-expansion-failed]` log line emitted by `maybeExpandViaTLL`. + if (result.failed) { + activeTrace.event('tll_expansion_failed', { errorMessage: result.errorMessage }); + } + if (result.memories.length === 0) return postProcessed; + activeTrace.stage('tll-augmentation', [...postProcessed.memories, ...result.memories], { + addedCount: result.memories.length, + addedIds: result.memories.map((m) => m.id), + }); + return { + ...postProcessed, + memories: [...postProcessed.memories, ...result.memories], + }; } /** diff --git a/src/services/memory-service-types.ts b/src/services/memory-service-types.ts index a070a63..0d916df 100644 --- a/src/services/memory-service-types.ts +++ b/src/services/memory-service-types.ts @@ -256,6 +256,10 @@ export interface MemoryServiceDeps { /** Domain-facing store interfaces (Phase 5). */ stores: import('../db/stores.js').CoreStores; observationService: import('./observation-service.js').ObservationService | null; + /** Phase 4 TLL — per-entity event chains for EO/MSR/TR retrieval. */ + tllRepository: import('../db/repository-tll.js').TllRepository | null; + /** First-mention events — chronological topic-introduction list. */ + firstMentionService: import('./first-mention-service.js').FirstMentionService | null; uriResolver: import('./atomicmem-uri.js').URIResolver; } diff --git a/src/services/memory-service.ts b/src/services/memory-service.ts index 7ae92ca..e0e6ed3 100644 --- a/src/services/memory-service.ts +++ b/src/services/memory-service.ts @@ -10,6 +10,8 @@ import { ClaimRepository } from '../db/claim-repository.js'; import { EntityRepository } from '../db/repository-entities.js'; import { LessonRepository } from '../db/repository-lessons.js'; import { ObservationService } from './observation-service.js'; +import type { FirstMentionService } from './first-mention-service.js'; +import type { FirstMentionEvent } from '../db/repository-first-mentions.js'; import { URIResolver } from './atomicmem-uri.js'; import type { CoreStores } from '../db/stores.js'; import { type TierAssignment } from './tiered-loading.js'; @@ -40,6 +42,8 @@ export class MemoryService { observationService?: ObservationService, runtimeConfig?: MemoryServiceDeps['config'], stores?: CoreStores, + tllRepository?: import('../db/repository-tll.js').TllRepository, + firstMentionService?: FirstMentionService, ) { const resolvedEntities = entities ?? null; const resolvedLessons = lessons ?? null; @@ -57,6 +61,8 @@ export class MemoryService { pool: typeof repo.getPool === 'function' ? repo.getPool() : ({} as never), }, observationService: observationService ?? null, + tllRepository: tllRepository ?? null, + firstMentionService: firstMentionService ?? null, uriResolver: new URIResolver(repo, claims), }; } @@ -202,6 +208,54 @@ export class MemoryService { async reportLesson(userId: string, pattern: string, sourceMemoryIds: string[], severity?: 'low' | 'medium' | 'high' | 'critical') { return crud.reportLesson(this.deps, userId, pattern, sourceMemoryIds, severity); } async deactivateLesson(userId: string, lessonId: string) { return crud.deactivateLesson(this.deps, userId, lessonId); } + // --- First-mention events (chronological topic-introduction list) --- + + /** + * Extract first-mention events from a conversation transcript and persist + * them to `first_mention_events`. Caller supplies the turn-id-to-memory-id + * mapping (the ingest pipeline does not retain turn structure, so the + * caller knows the mapping). Returns the parsed events. Best-effort: if + * the underlying LLM call fails or the service is not wired, returns `[]` + * without throwing. + */ + async extractFirstMentions( + userId: string, + conversationText: string, + sourceSite: string, + memoryIdsByTurnId: Map, + ): Promise { + const svc = this.deps.firstMentionService; + if (!svc) return []; + return svc.extractAndStore(userId, conversationText, sourceSite, memoryIdsByTurnId); + } + + // --- Event chains (TLL read API) --- + + /** + * Retrieve per-entity chronological event chains from the Temporal Linkage + * List. Used by `GET /v1/memories/event-chains` and by EO-shaped read paths + * that need content alongside chain position. Returns one entry per entity + * with an ordered list of events (memoryId, content, observationDate, + * positionInChain). Entities without events are dropped from the result. + */ + async getEventChains( + userId: string, + entityIds: string[], + ): Promise; + }>> { + const tll = this.deps.tllRepository; + if (!tll) return []; + return tll.chainEventsForEntities(userId, entityIds); + } + // NOTE: Do NOT add multi-query search. Tested and caused 0-retrieval failures // due to embedding API rate limits with 4x calls per query. } diff --git a/src/services/memory-storage.ts b/src/services/memory-storage.ts index 6e37c4d..0858ab5 100644 --- a/src/services/memory-storage.ts +++ b/src/services/memory-storage.ts @@ -55,7 +55,7 @@ export async function storeCanonicalFact( if (!lineage?.memoryId) return { outcome: 'skipped', memoryId: null }; const memoryId = lineage.memoryId; if (deps.config.entityGraphEnabled && deps.stores.entity) { - await resolveAndLinkEntities(deps, userId, memoryId, fact.entities, fact.relations, embedding); + await resolveAndLinkEntities(deps, userId, memoryId, fact.entities, fact.relations, embedding, logicalTimestamp); if (!claimSlot) { const persistedSlot = await derivePersistedClaimSlot(deps, userId, memoryId); if (persistedSlot) { @@ -208,12 +208,76 @@ async function resolveAndLinkEntities( entities: ExtractedEntity[], relations: ExtractedRelation[], factEmbedding: number[], + logicalTimestamp?: Date, ): Promise { if (!deps.stores.entity || entities.length === 0) return; const entityEmbeddings = await embedTexts(entities.map((e) => e.name)); const nameToEntityId = await resolveEntities(deps, userId, memoryId, entities, entityEmbeddings); await storeRelations(deps, userId, memoryId, relations, nameToEntityId); + + await maybeAppendTll(deps, userId, memoryId, nameToEntityId, logicalTimestamp); +} + +/** + * Phase 4 — TLL append: per-entity event-chain extension. Each new memory + * referencing an entity becomes the new tip of that entity's chain. Used at + * retrieval-time for EO/MSR/TR queries. Best-effort, fire-and-forget to + * keep ingest hot path fast. + * + * The chain orders by observation_date ASC, so we use the memory's + * observed_at (the conversation timestamp) instead of new Date() (the + * ingest-arrival timestamp). Otherwise out-of-order or backfilled + * conversations would chain by ingest time, breaking event ordering. + */ +async function maybeAppendTll( + deps: MemoryServiceDeps, + userId: string, + memoryId: string, + nameToEntityId: Map, + logicalTimestamp?: Date, +): Promise { + if (!deps.tllRepository) return; + const entityIds = [...nameToEntityId.values()]; + if (entityIds.length === 0) return; + + const observationDate = await resolveTllObservationDate(deps, userId, memoryId, logicalTimestamp); + // Fire-and-forget: TLL append is best-effort augmentation of the + // ingest hot path. A failure here should NOT block ingest, which is + // why we deliberately drop the promise. The `[tll-append-failed]` + // structured prefix is what observability tooling greps for; pair + // with `tll_expansion_failed` on the read path. + deps.tllRepository + .append(userId, memoryId, entityIds, observationDate) + .catch((err) => + console.error( + '[tll-append-failed]', + err instanceof Error ? err.message : String(err), + ), + ); +} + +/** + * Resolve the observation_date used for TLL chain ordering. Prefers the + * caller-supplied logical timestamp (the canonical conversation time); + * falls back to the memory row's observed_at column when the caller didn't + * supply one. Last-resort fallback to new Date() only fires if the row + * lookup fails — never silently drop the append. + */ +async function resolveTllObservationDate( + deps: MemoryServiceDeps, + userId: string, + memoryId: string, + logicalTimestamp: Date | undefined, +): Promise { + if (logicalTimestamp) return logicalTimestamp; + try { + const memory = await deps.stores.memory.getMemory(memoryId, userId); + if (memory?.observed_at) return memory.observed_at; + } catch (err) { + console.error('[tll] failed to resolve observed_at:', err instanceof Error ? err.message : err); + } + return new Date(); } /** Resolve each extracted entity and link it to the memory. */ diff --git a/src/services/tll-retrieval.ts b/src/services/tll-retrieval.ts new file mode 100644 index 0000000..9a7a013 --- /dev/null +++ b/src/services/tll-retrieval.ts @@ -0,0 +1,121 @@ +/** + * Phase 4b — TLL retrieval signal. + * + * For event-ordering / temporal-reasoning / multi-session-reasoning queries, + * expand the candidate set by traversing entity TLL chains. This is the + * unique architectural primitive: nobody else ships per-entity event-chain + * traversal at retrieval time. + * + * The mechanism: + * 1. From the initial retrieval candidates, find which entities they link. + * 2. For each entity, fetch its full TLL chain (event sequence). + * 3. Merge chain memory_ids into the candidate pool. + * 4. Boost score for memories that appear in TLL chains for the query's + * entities — they're explicitly part of the chronological story. + * + * Skipped for non-ordering queries (factual lookups don't need chains). + */ + +import type pg from 'pg'; +import type { TllRepository } from '../db/repository-tll.js'; + +/** + * Cap on the number of top similarity-ranked candidates that seed TLL + * entity-lookup. Bounds the per-query fan-out for the + * `memory_entities` join — a higher value increases recall on + * sprawling queries but inflates the worst-case row scan when an + * entity dictionary is dense. 10 matches the production search seed + * count in `memory-search.ts` so both call sites move together. + */ +export const TLL_ENTITY_LOOKUP_SEED_LIMIT = 10; + +/** + * Single-token ordering signals. Matched in isolation these are too + * weak to gate TLL — "what is my FIRST name", "the model used BEFORE + * GPT-4", "we then moved on" all contain one of these but are not + * EO/MSR/TR queries. We require either two of them to co-occur, or + * one of the structural sequence patterns below, before firing. + */ +const ORDERING_TERMS_RE = + /\b(first|last|before|after|then|later|earlier|previous|next|prior)\b/gi; + +/** + * Structural sequence patterns. Each one is a phrase whose presence + * unambiguously indicates an ordering / temporal-reasoning question. + * Single-pattern hit is enough to gate TLL. + * + * Curated to keep precision high: "track my spending" and "what is my + * first name" must not match any pattern here. Add new patterns + * conservatively — a leak here will silently re-introduce the + * false-positive class this fix addresses. + */ +const SEQUENCE_PATTERNS: readonly RegExp[] = [ + /\bin (what |the )?(chronological |reverse )?order\b/i, + /\b(when|after) did\b/i, + /\bsince when\b/i, + /\bover time\b/i, + /\bevolution of\b/i, + /\b(history|timeline) of\b/i, + /\bbrought up\b/i, + /\b(originally|initially)\b/i, + /\bprogression of\b/i, + /\bhow .{1,80}(evolved?|shifted?|changed)\b/i, + /\bwhat .{1,80}(originally|initially)\b/i, +]; + +/** + * Returns true if the query has the shape of an event-ordering / temporal + * question and should trigger TLL chain expansion. The gate is + * intentionally conservative: TLL augmentation is augmentation, not the + * primary retrieval path, so over-firing was producing irrelevant chain + * memories on plain-fact queries that happened to contain "first", + * "before", "track", etc. + * + * Two ordering terms co-occurring (e.g. "what did I discuss BEFORE and + * AFTER X") is a strong-enough signal on its own; one structural + * sequence phrase (e.g. "in what order", "evolution of", "since when") + * is also strong enough. Single ordering term + nothing else is not. + */ +export function shouldUseTLL(query: string): boolean { + const orderingMatches = (query.match(ORDERING_TERMS_RE) ?? []).length; + if (orderingMatches >= 2) return true; + return SEQUENCE_PATTERNS.some((re) => re.test(query)); +} + +/** + * Get the entity_ids linked to a set of memory_ids via memory_entities. + * Used to find which entities to chain-traverse from initial retrieval. + * Exported for direct unit testing of SQL shape; the production caller + * is `expandViaTLL` below. + */ +export async function entitiesForMemories( + pool: pg.Pool, + memoryIds: string[], +): Promise { + if (memoryIds.length === 0) return []; + const result = await pool.query( + `SELECT DISTINCT entity_id FROM memory_entities WHERE memory_id = ANY($1::uuid[])`, + [memoryIds], + ); + return result.rows.map((r) => r.entity_id); +} + +/** + * Expand candidate set via TLL chain traversal for the entities most + * relevant to the query (derived from initial retrieval). + * Returns: array of memory_ids in chronological chain order, deduplicated. + */ +export async function expandViaTLL( + userId: string, + initialMemoryIds: string[], + tllRepository: TllRepository, + pool: pg.Pool, +): Promise { + if (initialMemoryIds.length === 0) return []; + const entityIds = await entitiesForMemories( + pool, + initialMemoryIds.slice(0, TLL_ENTITY_LOOKUP_SEED_LIMIT), + ); + if (entityIds.length === 0) return []; + return tllRepository.chainsFor(userId, entityIds); +}