MongoDB Change Data Capture (CDC) control plane + data plane system built in Go.
β οΈ Development Status This project is under active development. APIs, configuration formats, and architecture decisions may change without prior notice. Use with caution in production environments.
Conduit manages MongoDB collections and enables CDC (Change Data Capture) to external systems like HTTP endpoints, AWS EventBridge, or Meilisearch.
The project supports two operation modes:
- DynamoDB-compatible mode: collection defines
primary_keyand optionalsort_key - MongoDB-native mode: collection defines no key schema and uses default MongoDB behavior (
_id)
βββββββββββββββββββ ββββββββββββββββ βββββββββββββββββββ
β API Server ββββββΆβ MongoDB ββββββΆβ Worker β
β (Control) β β (Storage) β β (Data) β
β Port: 8080 β β Port: 27017 β β β
βββββββββββββββββββ ββββββββββββββββ βββββββββββββββββββ
β β
β Change Streams β Redis State
βΌ βΌ
ββββββββββββββββ βββββββββββββββ
β Watcher β β Redis β
β Manager β β Port: 6379 β
ββββββββββββββββ βββββββββββββββ
- Dual Compatibility: DynamoDB-compatible key schema or pure MongoDB mode
- DynamoDB-aligned Design: logical key semantics, stream records, TTL fields
- Per-collection Streaming: Enable/disable CDC per collection configuration
- Watcher Manager: Centralized lifecycle management with no goroutine leaks
- Resume Tokens: Per-table resume positions stored in Redis
- Idempotency: Duplicate event prevention with TTL-based keys
- Retry with Backoff: Exponential backoff (1s β 5m), max 5 retries
- Dead Letter Queue: Failed events after max retries
- Pluggable Destinations: HTTP endpoints, EventBridge, Meilisearch, custom sinks
| Component | Package | Description |
|---|---|---|
| API | cmd/api |
REST control plane for collection management |
| Worker | cmd/worker |
CDC data plane with watchers |
| Watcher Manager | internal/watcher |
Centralized watcher lifecycle + Pub/Sub |
| Dispatcher | internal/dispatch |
Event routing to destinations |
| Retry Processor | internal/retry |
Backoff and DLQ handling |
| Redis Client | internal/redis |
State store + Pub/Sub operations |
| Mongo Client | internal/mongo |
Database + change streams + replica set |
| Collections Store | internal/tables |
Configuration management (config.collections) |
cdc:resume:<tableName> # Resume token per table
cdc:retry:<tableName> # Retry queue (sorted set)
cdc:dlq:<tableName> # Dead letter queue (list)
cdc:processed:<table>:<type>:<ts> # Idempotency key (TTL: 24h)
cdc:config-change # Pub/Sub channel for table changes
Pattern examples:
cdc:processed:users:*- all events for users tablecdc:processed:users:INSERT:*- only INSERT events for userscdc:processed:*:REMOVE:*- all REMOVE events across tables
Note: Redis is used for state management (resume tokens, idempotency, retry, pub/sub). Events are sent to HTTP endpoints configured per table.
- Go 1.25+
- MongoDB 4.0+ (replica set required for change streams)
- Redis 6.0+
- Docker Compose (optional, for local development)
git clone <repository>
cd conduit
make initOption A: Full Stack (Recommended)
Starts MongoDB, Redis, API, and Worker with a single command:
make docker-up-fullOption B: Dependencies Only
Starts only MongoDB and Redis (run API/Worker locally):
make docker-upServices:
| Service | URL | Description |
|---|---|---|
| API | http://localhost:8080 | REST control plane |
| Worker | - | CDC data plane (background) |
| MongoDB | mongodb://localhost:27017 | Database |
| Redis | redis://localhost:6379 | State store |
| Mongo Express | http://localhost:8081 | MongoDB UI (admin/admin) |
Note: The API automatically initializes the MongoDB replica set on startup (required for change streams).
The application will not start without these variables:
# Copy example and edit
cp .env.example .envRequired configuration:
# MongoDB (REQUIRED)
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=conduit
# Redis - Use URI/DSN format (REQUIRED)
# Format: redis://[username[:password]@]host[:port][/db_number]
REDIS_URI=redis://localhost:6379
# Optional:
PORT=8080Redis URI Examples:
# Simple (no auth)
REDIS_URI=redis://localhost:6379
# With password
REDIS_URI=redis://:mypassword@localhost:6379
# With username and password
REDIS_URI=redis://myuser:mypassword@localhost:6379
# With database number
REDIS_URI=redis://:password@localhost:6379/1
# TLS connection
REDIS_URI=rediss://:password@localhost:6380If you started only dependencies with make docker-up, run API and Worker locally:
# Terminal 1: Start API server
make run-api
# Terminal 2: Start Worker
make run-workerThe API will automatically initialize the MongoDB replica set on first start.
curl -X POST http://localhost:8080/api/collections \
-H "Content-Type: application/json" \
-d '{
"collection_name": "users",
"stream_enabled": true,
"old_image": true,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "MODIFY", "REMOVE"]
}
]
}'| Method | Endpoint | Description |
|---|---|---|
GET |
/api/collections |
List all collections |
POST |
/api/collections |
Create collection |
PUT |
/api/collections/:name |
Update collection |
DELETE |
/api/collections/:name |
Delete collection |
GET |
/health |
Health check |
{
"collection_name": "users",
"primary_key": "id",
"sort_key": "email",
"stream_enabled": true,
"old_image": true,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "MODIFY", "DELETE"]
}
]
}Collection Configuration:
| Field | Type | Default | Description |
|---|---|---|---|
collection_name |
string | - | Name of the MongoDB collection |
primary_key |
string | - | Optional partition key field name |
sort_key |
string | - | Optional sort key field name |
stream_enabled |
bool | false | Enable CDC streaming for this collection |
old_image |
bool | false | Include old document state in change events |
ttl_attribute |
string | - | Field name for TTL expiration |
deletion_protection |
bool | true | Prevent accidental deletion (default: true) |
destinations |
array | [] | List of event destinations |
Key schema rules:
- If
sort_keyis defined,primary_keyis required primary_keyandsort_keycan use any user-defined field names- If both are omitted, collection runs in MongoDB-native mode
Destination Configuration:
| Field | Type | Required | Description |
|---|---|---|---|
type |
string | Yes | Destination type: http, eventbridge, meilisearch |
event_types |
[]string | No | Events to send: INSERT, MODIFY, REMOVE (default: all) |
filter_criteria |
object | No | Per-destination filtering on old_image / new_image |
HTTP-specific fields:
| Field | Type | Required | Description |
|---|---|---|---|
endpoint |
string | Yes | HTTP endpoint URL |
bearer_token |
string | No | Optional bearer token for authentication |
EventBridge-specific fields:
| Field | Type | Required | Description |
|---|---|---|---|
region |
string | Yes | AWS region (e.g. us-east-1) |
event_bus_name |
string | Yes | EventBridge event bus name |
source |
string | No | Event source (default: conduit-mongodb) |
Meilisearch-specific fields:
| Field | Type | Required | Description |
|---|---|---|---|
endpoint |
string | Yes | Meilisearch host (e.g. http://localhost:7700) |
bearer_token |
string | No | Meilisearch API key |
index_name |
string | No | Target index (default: collection_name) |
HTTP Request:
- Method:
POST - Headers:
Content-Type: application/json,Authorization: Bearer <token>(if configured) - Body:
StreamRecordJSON (containstableName,recordType,newImage,oldImage,timestamp)
Create collection with streaming (HTTP destination):
curl -X POST http://localhost:8080/api/collections \
-H "Content-Type: application/json" \
-d '{
"collection_name": "orders",
"stream_enabled": true,
"old_image": true,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "MODIFY", "REMOVE"]
}
]
}'Create collection with bearer token authentication:
curl -X POST http://localhost:8080/api/collections \
-H "Content-Type: application/json" \
-d '{
"collection_name": "orders",
"stream_enabled": true,
"old_image": true,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"bearer_token": "my-secret-token",
"event_types": ["INSERT", "MODIFY", "REMOVE"]
}
]
}'Create collection with multiple destinations:
curl -X POST http://localhost:8080/api/collections \
-H "Content-Type: application/json" \
-d '{
"collection_name": "orders",
"stream_enabled": true,
"old_image": true,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "REMOVE"]
},
{
"type": "http",
"endpoint": "http://localhost:3001/audit",
"event_types": ["MODIFY"]
}
]
}'Update collection:
curl -X PUT http://localhost:8080/api/collections/orders \
-H "Content-Type: application/json" \
-d '{
"collection_name": "orders",
"stream_enabled": true,
"old_image": false,
"ttl_attribute": "expiresAt",
"deletion_protection": true,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "REMOVE"]
}
]
}'Disable deletion protection and delete collection:
# First, disable deletion protection
curl -X PUT http://localhost:8080/api/collections/orders \
-H "Content-Type: application/json" \
-d '{
"collection_name": "orders",
"stream_enabled": true,
"deletion_protection": false,
"destinations": [
{
"type": "http",
"endpoint": "http://localhost:3000/events",
"event_types": ["INSERT", "MODIFY", "REMOVE"]
}
]
}'
# Then delete the collection
curl -X DELETE "http://localhost:8080/api/collections/orders"Delete collection with protection enabled (fails):
curl -X DELETE "http://localhost:8080/api/collections/orders"
# Returns: 403 Forbidden - "Deletion protection is enabled. Disable it before deleting the collection."List collections:
curl http://localhost:8080/api/collections | jq .| Variable | Required | Default | Description |
|---|---|---|---|
MONGODB_URI |
Yes | - | MongoDB connection string |
MONGODB_DATABASE |
Yes | - | Database name |
REDIS_URI |
Yes | - | Redis URI/DSN for state management |
PORT |
No | 8080 |
API server port |
Note: HTTP endpoints are configured per-table via the API, not via environment variables.
redis://[username[:password]@]host[:port][/db_number]
| Example | Description |
|---|---|
redis://localhost:6379 |
No authentication |
redis://:password@localhost:6379 |
Password only |
redis://user:pass@localhost:6379 |
Username + password |
redis://:pass@localhost:6379/1 |
With database number |
rediss://:pass@localhost:6380 |
TLS connection |
If a required variable is not set, the application will exit with a fatal error:
FATAL: Required environment variable MONGODB_URI is not set
Conduit uses DynamoDB terminology internally, but physical key field names are user-defined.
Modes:
- DynamoDB-compatible mode: configure
primary_keyand optionalsort_key - MongoDB-native mode: no key schema, use default MongoDB
_idworkflows
Default DynamoDB-aligned names (optional):
| Concept | Field Name |
|---|---|
| Primary Key | pk |
| Sort Key | sk |
| Table | table |
| Item | item |
| Stream | stream |
| TTL Field | expiresAt |
| New Image | newImage |
| Old Image | oldImage |
DynamoDB-compatible mode:
{
"collection_name": "users",
"primary_key": "id",
"sort_key": "email"
}Item example:
{
"id": "USER#1",
"email": "EMAIL#test@gmail.com"
}MongoDB-native mode:
{
"collection_name": "logs"
}Streaming is explicitly enabled per collection:
stream_enabled |
Worker Behavior |
|---|---|
false |
β No watcher created, collection ignored |
true |
β Watcher created, events processed |
- Initial Load: Fetch all
stream_enabled=truecollections fromconfig.collections - Start Watchers: One watcher per enabled collection
- Sync Loop:
- Polling: Diff with
config.collectionsevery 15min (fallback) - Push notifications: Immediate sync via Redis Pub/Sub when tables change
- Polling: Diff with
- Resume Tokens: Updated after each successful batch
- Graceful Stop: Context cancellation, no data loss
| Attempt | Delay | Action |
|---|---|---|
| 1 | 1s | First retry |
| 2 | 2s | Exponential backoff |
| 3 | 4s | Exponential backoff |
| 4 | 8s | Exponential backoff |
| 5 | 16s | Final retry |
| 6+ | β | β DLQ |
Max Retries: 5
Backoff: Exponential (capped at 5 minutes)
After 5 failures: Event sent to Dead Letter Queue
# Unit tests
make test
# Integration tests (requires MongoDB + Redis)
make test-integration
# With coverage
make test-coverage
# Watch mode (requires entr)
make test-watchAll critical paths covered:
- β CDC event processing
- β Retry with exponential backoff
- β Idempotency checks
- β Watcher lifecycle (start/stop)
- β Resume token management
- β DLQ handling
Development:
make help # Show all commands
make init # Initialize project
make build # Build all packages
make run-api # Run API server (local)
make run-worker # Run Worker (local)
make fmt # Format code
make lint # Run linter
make clean # Clean build artifactsDocker:
make docker-up # Start MongoDB + Redis only
make docker-up-full # Start full stack (API + Worker + MongoDB + Redis)
make docker-build # Build Docker images
make docker-down # Stop all containers
make docker-logs # View logs in real-time
make docker-status # Show container status
make docker-clean # Clean containers and volumesTesting:
make test # Run unit tests
make test-integration # Run integration tests
make test-coverage # Run tests with coverage reportconduit/
βββ cmd/
β βββ api/ # Control plane API (Gin)
β βββ worker/ # Data plane CDC worker
βββ internal/
β βββ dispatch/ # Event dispatcher + destinations
β βββ mongo/ # MongoDB client wrapper
β βββ redis/ # Redis client wrapper
β βββ retry/ # Retry processor with backoff
β βββ streams/ # Change stream watcher
β βββ tables/ # Collection configuration store
β βββ watcher/ # Watcher manager + lifecycle
βββ examples/
β βββ create_table.sh # Example: Create collection
β βββ monitor_queues.sh # Example: Monitor queues
βββ compose.yaml # Docker Compose config
βββ Makefile
βββ .env.example # Environment variables template
βββ go.mod
βββ AGENTS.md # Development guidelines
βββ README.md
- Active watchers count
- Retry queue length per collection
- DLQ length per collection
- Events processed per second
- Last error time per watcher
# Using the example script
./examples/monitor_queues.sh
# Or manually with Redis CLI (for retry and DLQ queues)
redis-cli KEYS "cdc:retry:*"
redis-cli KEYS "cdc:dlq:*"Note: Events are sent to HTTP endpoints configured per collection. Monitor your HTTP service logs for incoming events.
| Guarantee | Implementation |
|---|---|
| No event loss | Retry queue + DLQ |
| No duplicates | Idempotency keys (24h TTL) |
| No goroutine leaks | Proper watcher lifecycle |
| Per-collection resume | Individual tokens in Redis |
| Graceful shutdown | Context cancellation + timeout |
FATAL: Required environment variable MONGODB_URI is not set
Solution: Set all required variables before running:
export MONGODB_URI=mongodb://localhost:27017
export MONGODB_DATABASE=conduit
export REDIS_URI=redis://localhost:6379New collections should be detected within ~1 second via Redis Pub/Sub. If not:
- Check Redis is running:
redis-cli pingβ should returnPONG - Check worker logs for Pub/Sub subscription:
docker logs conduit-mongodb-worker-1 | grep "config-change" - Verify API is publishing:
docker logs conduit-mongodb-api-1 | grep "config change" - Fallback: Sync runs every 30s if Pub/Sub fails
FATAL: Failed to create worker: parse redis URI: invalid redis URL scheme:
Solution: Ensure URI starts with redis:// or rediss://:
# Correct:
REDIS_URI=redis://localhost:6379
# Wrong:
REDIS_URI=localhost:6379The API automatically initializes the replica set on startup. If you're running MongoDB locally:
# Check replica set status
mongosh --eval "rs.status()"If using Docker, the replica set is initialized automatically by the API on first start.
# Check Redis is running
redis-cli -h localhost -p 6379 ping
# Should return: PONG- Verify
stream_enabled: truein collection config - Check MongoDB connection
- Check Redis connection
- Review worker logs for errors
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
GNU General Public License v3.0 - see LICENSE for details.