Skip to content

Commit a52ee59

Browse files
authored
Refactor: Implement reactive in-memory caching for interactive DLQ resubmit (#7)
* Initial reactive in memory cache implementation * Fixed DLQ handling console issues * Added progress indication and cancellation support * Reactive cache article
1 parent dd12da3 commit a52ee59

18 files changed

Lines changed: 2008 additions & 53 deletions

File tree

docs/articles/reactive-cache.md

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
# Reactive In-Memory Caching for Interactive DLQ Resubmission
2+
3+
## The Problem
4+
5+
The `resubmit-dlq --interactive` command lets operators inspect dead-letter queue (DLQ) messages grouped by category (subject + dead-letter reason) and selectively resubmit them. The original implementation had a sequential, blocking flow that created serious usability problems at scale:
6+
7+
```
8+
1. Peek ALL messages from DLQ ← blocks until complete
9+
2. Categorize them in memory ← user sees nothing during this
10+
3. Display category table ← only now does the user see anything
11+
4. User selects categories
12+
5. Re-fetch ALL messages from DLQ ← double the work
13+
6. Filter and resubmit matching ones
14+
```
15+
16+
### What went wrong
17+
18+
**Blind waiting.** A DLQ with 50,000 messages takes minutes to peek through in batches of 100. The user stares at a blank terminal the entire time, with no indication of progress or even whether the tool is working.
19+
20+
**Unbounded scanning.** In high-traffic systems, new messages dead-letter continuously. The "peek all" loop may never terminate because new messages keep appearing beyond the last sequence number.
21+
22+
**Double fetch.** After the user picks categories, the tool re-fetches every message from scratch to find the ones matching the selection. This doubles the total Service Bus API calls and wall-clock time.
23+
24+
**No cycle protection.** If a resubmitted message immediately dead-letters again (common with poison messages), nothing prevents the user from resubmitting it in an infinite loop.
25+
26+
## The Solution: Reactive In-Memory Cache
27+
28+
The refactored implementation replaces the sequential pipeline with a reactive, streaming architecture built on [DynamicData](https://github.com/reactivemarbles/DynamicData), a library that adds reactive collection capabilities to .NET's `System.Reactive`.
29+
30+
The new flow:
31+
32+
```
33+
1. Start background peek → cache messages as they arrive
34+
2. Every second, rebuild category snapshot from cache
35+
3. Render live-updating table (Console.Clear + redraw)
36+
4. User can press 'x' to stop scanning at any point
37+
5. Take point-in-time snapshot from cache
38+
6. Resubmit only the snapshot — no re-fetch
39+
```
40+
41+
The key insight: **decouple data ingestion from user interaction**. Messages stream into a cache on a background thread. The UI subscribes to the cache's change notifications and redraws periodically. The user's selection operates on a snapshot of whatever has been cached so far — not on a separate fetch.
42+
43+
## How DynamicData Works (and Why We Need It)
44+
45+
[DynamicData](https://github.com/reactivemarbles/DynamicData) extends Rx.NET with *reactive collections*. Its core primitive is `SourceCache<TObject, TKey>` — a thread-safe, observable dictionary. When items are added, updated, or removed, the cache emits an `IChangeSet<TObject, TKey>` through its `Connect()` observable.
46+
47+
This is fundamentally different from a plain `Dictionary` or `ConcurrentDictionary`:
48+
49+
| Feature | `ConcurrentDictionary` | DynamicData `SourceCache` |
50+
|---|---|---|
51+
| Thread-safe mutations | Yes | Yes |
52+
| Notify on changes | No | Yes, via `IObservable<IChangeSet>` |
53+
| Composable queries | No | Yes (filter, group, transform, sort) |
54+
| Batch mutations | No | Yes, via `Edit()` |
55+
56+
For our use case, the critical capability is `Connect()`: it returns an `IObservable<IChangeSet>` that fires whenever the cache contents change. We pipe this through Rx operators to build a derived stream:
57+
58+
```csharp
59+
var categoryStream = cache.Connect() // fires on every mutation
60+
.Sample(TimeSpan.FromSeconds(1)) // emit at most once per second
61+
.Select(_ => BuildCategorySnapshot(cache)) // rebuild categories from current state
62+
.StartWith(new DlqCategorySnapshot([], 0, false)); // initial empty snapshot
63+
```
64+
65+
### Why `.Sample()` and not `.Throttle()`
66+
67+
This distinction caused an actual bug during development. In Rx.NET:
68+
69+
- **`Throttle(1s)`** = *debounce*. Only emits after 1 second of **silence** (no upstream events). When batches arrive every ~100ms back-to-back, the throttle never fires until the entire feed completes.
70+
- **`Sample(1s)`** = *periodic sampling*. Emits the most recent upstream value every 1 second, regardless of how fast events arrive.
71+
72+
With `Throttle`, the UI would freeze during scanning and only update after all 12,000 messages were peeked. `Sample` gives steady 1-second UI refreshes.
73+
74+
## Architecture
75+
76+
### Layer Separation
77+
78+
The implementation follows the project's vertical slice architecture with clear layer boundaries:
79+
80+
```
81+
Application Layer (business logic)
82+
├── Common/ServiceBus/Reactive/
83+
│ ├── ReactiveMessageCache<TMessage, TKey> ← generic, reusable
84+
│ └── ResubmitTracker ← cycle prevention
85+
86+
├── DeadLetters/Common/
87+
│ └── MessageResubmitHelper ← extracted shared logic
88+
89+
└── DeadLetters/ResubmitDlq/
90+
├── DlqResubmitSession ← session object (cache + stream + tracker)
91+
├── StreamDlqCategories ← command: start scanning, return session
92+
└── ResubmitFromCache ← command: resubmit from snapshot
93+
94+
CLI Layer (presentation)
95+
└── DeadLetters/ResubmitDlq/
96+
└── ResubmitDlqCommandHandler ← two-phase interactive flow
97+
```
98+
99+
### The Session Object
100+
101+
`DlqResubmitSession` is the central coordination point. It owns:
102+
103+
- **`Cache`**`ReactiveMessageCache<ServiceBusReceivedMessage, long>` keyed by `SequenceNumber`. SequenceNumber is unique within a queue/subscription (unlike MessageId which can duplicate).
104+
- **`CategoryStream`**`IObservable<DlqCategorySnapshot>` that emits category breakdowns every second.
105+
- **`ResubmitTracker`** — tracks which MessageIds have been resubmitted, preventing cycles.
106+
- **`ScanCompletion`**`TaskCompletionSource` signaling when the background feed finishes.
107+
- **`TotalDlqCount`** — total DLQ message count from runtime properties (best-effort, for progress display).
108+
- **`StopScanning()`** — cancels the background feed via a `CancellationTokenSource`.
109+
110+
The session is `IDisposable` and cleans up the cache and CTS.
111+
112+
### Background Feed
113+
114+
`StreamDlqCategoriesCommandHandler.FeedCacheAsync` runs on a background thread:
115+
116+
1. **Fetch total count** (best-effort) — queries `ServiceBusAdministrationClient` for the DLQ message count. This enables "Peeked 200 from 12000" progress display. If the admin call fails (permissions, etc.), scanning continues without the total.
117+
118+
2. **Peek in batches** — uses `PeekMessagesAsync` (non-destructive) with sequence-number pagination. Each batch is filtered through the `ResubmitTracker` to exclude already-resubmitted messages, then added to the cache.
119+
120+
3. **Signal completion** — in the `finally` block, marks the cache complete and sets `ScanCompletion`. This fires regardless of success, cancellation, or error.
121+
122+
The feed respects a *linked* `CancellationTokenSource` combining the global cancellation token (Ctrl+C) with the session's scan token (press 'x'):
123+
124+
```csharp
125+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
126+
cancellationToken, session.ScanCancellationToken);
127+
await FeedCacheAsync(..., linkedCts.Token);
128+
```
129+
130+
### Cache-Based Resubmit
131+
132+
`ResubmitFromCacheCommandHandler` takes a pre-built message snapshot and resubmits by sequence number matching:
133+
134+
1. Build a `HashSet<long>` of target sequence numbers from the snapshot.
135+
2. `ReceiveMessagesAsync` (destructive read) in batches from the DLQ.
136+
3. For each received message:
137+
- **Match** (sequence number in set): create a new `ServiceBusMessage` preserving all properties, send to target, complete the original, mark as resubmitted in the tracker.
138+
- **No match**: abandon (put back on DLQ for other consumers).
139+
4. Exit when all target sequence numbers are processed or after 3 consecutive empty batches.
140+
141+
This eliminates the double-fetch problem. The snapshot was built during the peek phase; the resubmit phase only does the destructive receive-and-forward.
142+
143+
## The Two-Phase CLI Flow
144+
145+
The CLI handler splits the interactive experience into two distinct phases:
146+
147+
### Phase 1: Scanning (live updates, no user input)
148+
149+
```csharp
150+
using (session.CategoryStream.Subscribe(snapshot =>
151+
{
152+
Console.Clear();
153+
RenderScanningView(snapshot, entityDescription, session.TotalDlqCount);
154+
}))
155+
{
156+
var scanTask = session.ScanCompletion.Task;
157+
var keyTask = Task.Run(() => WaitForStopKey(session.ScanCancellationToken));
158+
159+
await Task.WhenAny(scanTask, keyTask);
160+
session.StopScanning();
161+
await scanTask;
162+
}
163+
```
164+
165+
During this phase:
166+
- The reactive subscription redraws the screen every second with the latest category table.
167+
- A separate thread polls `Console.KeyAvailable` for the 'x' key.
168+
- No `ReadLine()` is called, so `Console.Clear()` is safe (no input buffer to disrupt).
169+
- The phase ends when either the feed completes naturally or the user presses 'x'.
170+
171+
The display shows:
172+
- When no categories yet: `Scanning DLQ for queue 'orders'... Peeked 200 from 12000`
173+
- When categories exist: the category table + `Scanning... Peeked 3400 from 12000`
174+
- Always: `Press 'x' to stop scanning and select categories`
175+
176+
### Phase 2: Selection (static display, user input)
177+
178+
After Phase 1, the subscription is disposed (no more screen redraws). The handler:
179+
180+
1. Takes a final snapshot from the cache via `BuildCategorySnapshot()`.
181+
2. Clears the screen and renders the final category table.
182+
3. Prompts for selection — this `ReadLine()` is now safe because nothing will redraw the screen.
183+
4. Parses the selection, takes a message snapshot for the chosen categories, and resubmits.
184+
185+
This separation solves the original UX bugs:
186+
- No "empty state" confusion — the scanning indicator appears immediately.
187+
- No repeated tables — `Console.Clear()` before each render gives in-place updates.
188+
- No lost prompt — the prompt only appears after scanning stops, and nothing can push it off-screen.
189+
190+
## Cycle Prevention
191+
192+
The `ResubmitTracker` is a thread-safe `HashSet<string>` keyed by `MessageId`. It prevents infinite resubmit loops:
193+
194+
1. **During cache feeding**: messages whose `MessageId` is in the tracker are filtered out before entering the cache. If a message dead-letters again after resubmission, it gets a new `SequenceNumber` but keeps its `MessageId` — so the tracker catches it.
195+
196+
2. **During resubmission**: after successfully sending a message to the target, its `MessageId` is added to the tracker.
197+
198+
3. **During snapshot filtering**: `SnapshotForCategories()` excludes tracked messages, so even if a resubmitted message somehow re-enters the cache, it won't appear in the selection.
199+
200+
The tracker uses `MessageId` (not `SequenceNumber`) because a resubmitted message that dead-letters again gets a new sequence number but retains its message ID.
201+
202+
## Non-Interactive Confirmation
203+
204+
A smaller but important UX fix: the non-interactive `resubmit-dlq` path (without `-i`) now asks for confirmation before proceeding:
205+
206+
```
207+
Resubmitting DLQ messages for queue 'orders'...
208+
Are you sure you want to resubmit all dead letter messages? (y/N):
209+
```
210+
211+
This prevents accidental bulk resubmission when the user forgets the `--dry-run` flag.
212+
213+
## Key Design Decisions
214+
215+
| Decision | Rationale |
216+
|---|---|
217+
| Cache key = `SequenceNumber` (long) | Unique within a queue/subscription. `MessageId` can duplicate across messages. |
218+
| Tracker key = `MessageId` (string) | Persists across resubmit cycles — a resubmitted message gets a new SequenceNumber but keeps its MessageId. |
219+
| `.Sample(1s)` on category stream | Periodic updates regardless of activity. `.Throttle` (debounce) would freeze the UI during rapid ingestion. |
220+
| Peek (non-destructive) for cache | Messages stay in the DLQ during scanning. Destructive receive only happens during the actual resubmit. |
221+
| Snapshot-based resubmit | No re-categorization or re-fetch. The exact messages from the cache at selection time. |
222+
| Best-effort total count | The admin API call to get the total DLQ count can fail (permissions, transient errors). Scanning works without it; the progress display just shows "N messages found so far" instead of "Peeked N from M". |
223+
| `TaskCompletionSource` for scan completion | The reactive stream's `Sample` operator may not emit after the last changeset. A dedicated signal ensures the CLI always knows when scanning finishes. |
224+
| Linked CancellationTokenSource | Combines Ctrl+C (global) and press-x (user-initiated) cancellation into a single token for the feed loop. The linked CTS is disposed inside the `Task.Run` lambda after the feed completes. |
225+
226+
## Test Coverage
227+
228+
The implementation includes comprehensive tests at every layer:
229+
230+
**Unit tests** (Application layer):
231+
- `ReactiveMessageCacheShould` — empty snapshots, add/update, deduplication, changeset emission, point-in-time snapshots.
232+
- `ResubmitTrackerShould` — single/batch tracking, false negatives, thread safety under concurrent access.
233+
- `DlqResubmitSessionShould` — category filtering, resubmit exclusion, before-time filtering, empty results.
234+
- `StreamDlqCategoriesHandlerShould` — session creation, cache population, category grouping, completion marking, resubmit exclusion during feed.
235+
- `ResubmitFromCacheCommandHandlerShould` — matching/abandoning messages, tracker integration, empty snapshots, property preservation, progress reporting.
236+
237+
**Integration tests** (against Service Bus emulator):
238+
- `StreamDlqCategoriesIntegrationShould` — full end-to-end cache population, multi-category grouping, stream emission, empty DLQ handling.
239+
- `ResubmitFromCacheIntegrationShould` — full resubmit flow, category-filtered resubmit, tracker verification, message property preservation, empty snapshot handling.
240+
241+
## File Summary
242+
243+
### New files
244+
245+
| File | Purpose |
246+
|---|---|
247+
| `Common/ServiceBus/Reactive/ReactiveMessageCache.cs` | Generic DynamicData `SourceCache` wrapper |
248+
| `Common/ServiceBus/Reactive/ResubmitTracker.cs` | Thread-safe MessageId cycle tracker |
249+
| `DeadLetters/Common/MessageResubmitHelper.cs` | Extracted message cloning logic |
250+
| `DeadLetters/ResubmitDlq/DlqResubmitSession.cs` | Session: cache + stream + tracker + signals |
251+
| `DeadLetters/ResubmitDlq/StreamDlqCategories.cs` | Command to start scanning and return session |
252+
| `DeadLetters/ResubmitDlq/ResubmitFromCache.cs` | Command to resubmit from cached snapshot |
253+
254+
### Modified files
255+
256+
| File | Change |
257+
|---|---|
258+
| `ResubmitDlqMessagesCommandHandler.cs` | Use `MessageResubmitHelper` instead of private method |
259+
| `ResubmitDlqCommandHandler.cs` (CLI) | Two-phase interactive flow, scanning view, stop-key, confirmation prompt |
260+
| `ServiceBusToolset.Application.csproj` | Added `DynamicData` 9.0.4 |
261+
| `ServiceBusToolset.Application.Tests.csproj` | Added `Microsoft.Reactive.Testing` 6.1.0 |
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using DynamicData;
2+
3+
namespace ServiceBusToolset.Application.Common.ServiceBus.Reactive;
4+
5+
public sealed class ReactiveMessageCache<TMessage, TKey>(Func<TMessage, TKey> keySelector) : IDisposable
6+
where TMessage : notnull
7+
where TKey : notnull
8+
{
9+
private readonly SourceCache<TMessage, TKey> _cache = new(keySelector);
10+
private bool _disposed;
11+
12+
public bool IsComplete { get; private set; }
13+
14+
public int Count => _cache.Count;
15+
16+
public IObservable<int> CountChanged => _cache.CountChanged;
17+
18+
public void AddOrUpdate(IEnumerable<TMessage> items)
19+
{
20+
_cache.Edit(updater =>
21+
{
22+
foreach (var item in items)
23+
{
24+
updater.AddOrUpdate(item);
25+
}
26+
});
27+
}
28+
29+
public void MarkComplete()
30+
{
31+
IsComplete = true;
32+
}
33+
34+
public IObservable<IChangeSet<TMessage, TKey>> Connect() => _cache.Connect();
35+
36+
public IReadOnlyList<TMessage> Snapshot() => _cache.Items.ToList();
37+
38+
public TMessage? Lookup(TKey key)
39+
{
40+
var optional = _cache.Lookup(key);
41+
return optional.HasValue ? optional.Value : default;
42+
}
43+
44+
public void Dispose()
45+
{
46+
if (_disposed)
47+
{
48+
return;
49+
}
50+
51+
_disposed = true;
52+
_cache.Dispose();
53+
}
54+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
namespace ServiceBusToolset.Application.Common.ServiceBus.Reactive;
2+
3+
public sealed class ResubmitTracker
4+
{
5+
private readonly HashSet<string> _resubmittedIds = [];
6+
private readonly object _lock = new();
7+
8+
public void MarkResubmitted(string messageId)
9+
{
10+
lock (_lock)
11+
{
12+
_resubmittedIds.Add(messageId);
13+
}
14+
}
15+
16+
public void MarkResubmitted(IEnumerable<string> messageIds)
17+
{
18+
lock (_lock)
19+
{
20+
foreach (var id in messageIds)
21+
{
22+
_resubmittedIds.Add(id);
23+
}
24+
}
25+
}
26+
27+
public bool WasResubmitted(string messageId)
28+
{
29+
lock (_lock)
30+
{
31+
return _resubmittedIds.Contains(messageId);
32+
}
33+
}
34+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Azure.Messaging.ServiceBus;
2+
3+
namespace ServiceBusToolset.Application.DeadLetters.Common;
4+
5+
public static class MessageResubmitHelper
6+
{
7+
public static ServiceBusMessage CreateResubmitMessage(ServiceBusReceivedMessage original)
8+
{
9+
var message = new ServiceBusMessage(original.Body)
10+
{
11+
ContentType = original.ContentType,
12+
Subject = original.Subject,
13+
MessageId = original.MessageId,
14+
CorrelationId = original.CorrelationId,
15+
To = original.To,
16+
ReplyTo = original.ReplyTo,
17+
ReplyToSessionId = original.ReplyToSessionId,
18+
SessionId = original.SessionId,
19+
PartitionKey = original.PartitionKey,
20+
TransactionPartitionKey = original.TransactionPartitionKey,
21+
TimeToLive = original.TimeToLive
22+
};
23+
24+
foreach (var prop in original.ApplicationProperties)
25+
{
26+
message.ApplicationProperties[prop.Key] = prop.Value;
27+
}
28+
29+
return message;
30+
}
31+
}

0 commit comments

Comments
 (0)