Skip to content

Commit e8b3ba8

Browse files
committed
Implemented support for dumping the summary of the dead letter messages
1 parent 13ec6a3 commit e8b3ba8

5 files changed

Lines changed: 289 additions & 5 deletions

File tree

ServiceBusToolset/Commands/PurgeDlqCommand.cs

Lines changed: 254 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class PurgeDlqCommand(IServiceBusClientFactory clientFactory, IConsoleOut
1111
private static readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(5);
1212
private const int EmptyBatchThreshold = 3;
1313

14+
private record DlqCategory(string Label, string DeadLetterReason, int Count);
15+
1416
public async Task<int> ExecuteAsync(PurgeDlqOptions options, CancellationToken cancellationToken = default)
1517
{
1618
var validationError = options.Validate();
@@ -34,6 +36,14 @@ public async Task<int> ExecuteAsync(PurgeDlqOptions options, CancellationToken c
3436
cancellationToken);
3537
}
3638

39+
if (options.Interactive)
40+
{
41+
return await ExecuteInteractivePurgeAsync(client,
42+
options,
43+
entityDescription,
44+
cancellationToken);
45+
}
46+
3747
return await ExecutePurgeAsync(client,
3848
options,
3949
entityDescription,
@@ -70,7 +80,6 @@ private async Task<int> ExecuteDryRunAsync(
7080
{
7181
return await ExecuteFilteredDryRunAsync(client,
7282
options,
73-
entityDescription,
7483
cancellationToken);
7584
}
7685

@@ -103,7 +112,6 @@ private async Task<int> ExecuteFastDryRunAsync(
103112
private async Task<int> ExecuteFilteredDryRunAsync(
104113
ServiceBusClient client,
105114
PurgeDlqOptions options,
106-
string entityDescription,
107115
CancellationToken cancellationToken)
108116
{
109117
Output.Verbose("Using slow count due to --before filter", options.Verbose);
@@ -257,4 +265,248 @@ private async Task<int> ExecuteFilteredPurgeAsync(
257265
Output.Success($"Purged {totalDeleted} messages from DLQ for {entityDescription} (skipped {totalSkipped} newer messages)");
258266
return 0;
259267
}
268+
269+
private async Task<int> ExecuteInteractivePurgeAsync(
270+
ServiceBusClient client,
271+
PurgeDlqOptions options,
272+
string entityDescription,
273+
CancellationToken cancellationToken)
274+
{
275+
Output.Info($"Analyzing DLQ for {entityDescription}...");
276+
277+
// Step 1: Peek all messages and build category dictionary
278+
var categories = await BuildCategoryListAsync(client, options, cancellationToken);
279+
280+
if (categories.Count == 0)
281+
{
282+
Output.Info("No messages found in DLQ.");
283+
return 0;
284+
}
285+
286+
// Step 2: Display category table
287+
DisplayCategoryTable(categories);
288+
289+
// Step 3: Get user selection
290+
Output.Info("");
291+
Console.Write("Select categories to purge (comma-separated numbers, 'all', or 'q' to quit): ");
292+
var input = Output.ReadLine();
293+
294+
var selectedIndices = ParseSelection(input, categories.Count);
295+
if (selectedIndices == null)
296+
{
297+
Output.Info("Operation cancelled.");
298+
return 0;
299+
}
300+
301+
if (selectedIndices.Count == 0)
302+
{
303+
Output.Warning("No valid categories selected.");
304+
return 0;
305+
}
306+
307+
// Step 4: Build set of selected category keys
308+
var selectedCategories = new HashSet<(string Label, string Reason)>();
309+
var totalToPurge = 0;
310+
foreach(var cat in selectedIndices.Select(idx => categories[idx]))
311+
{
312+
selectedCategories.Add((cat.Label, cat.DeadLetterReason));
313+
totalToPurge += cat.Count;
314+
}
315+
316+
Output.Info($"Purging {totalToPurge} messages from {selectedIndices.Count} categories...");
317+
318+
// Step 5: Receive messages and complete only those matching selected categories
319+
var totalDeleted = await PurgeByCategoriesAsync(client, options, selectedCategories, cancellationToken);
320+
321+
Console.WriteLine();
322+
Output.Success($"Purged {totalDeleted} messages from DLQ for {entityDescription}.");
323+
return 0;
324+
}
325+
326+
private async Task<List<DlqCategory>> BuildCategoryListAsync(
327+
ServiceBusClient client,
328+
PurgeDlqOptions options,
329+
CancellationToken cancellationToken)
330+
{
331+
await using var receiver = CreateDlqReceiver(client,
332+
options.Queue,
333+
options.Topic,
334+
options.Subscription,
335+
ServiceBusReceiveMode.PeekLock);
336+
337+
var categoryCounts = new Dictionary<(string Label, string Reason), int>();
338+
var totalPeeked = 0;
339+
long? fromSequenceNumber = null;
340+
341+
while (!cancellationToken.IsCancellationRequested)
342+
{
343+
IReadOnlyList<ServiceBusReceivedMessage> messages;
344+
345+
if (fromSequenceNumber.HasValue)
346+
{
347+
messages = await receiver.PeekMessagesAsync(MaxBatchSize, fromSequenceNumber.Value, cancellationToken);
348+
}
349+
else
350+
{
351+
messages = await receiver.PeekMessagesAsync(MaxBatchSize, cancellationToken: cancellationToken);
352+
}
353+
354+
if (messages.Count == 0)
355+
{
356+
break;
357+
}
358+
359+
foreach (var msg in messages)
360+
{
361+
var label = msg.Subject ?? "(none)";
362+
var reason = msg.DeadLetterReason ?? "(none)";
363+
var key = (label, reason);
364+
365+
var count = categoryCounts.GetValueOrDefault(key, 0);
366+
categoryCounts[key] = count + 1;
367+
}
368+
369+
totalPeeked += messages.Count;
370+
fromSequenceNumber = messages[^1].SequenceNumber + 1;
371+
372+
Output.Progress($"Peeked {totalPeeked} messages...");
373+
}
374+
375+
Console.WriteLine();
376+
377+
return categoryCounts
378+
.OrderByDescending(kvp => kvp.Value)
379+
.Select(kvp => new DlqCategory(kvp.Key.Label, kvp.Key.Reason, kvp.Value))
380+
.ToList();
381+
}
382+
383+
private void DisplayCategoryTable(IReadOnlyCollection<DlqCategory> categories)
384+
{
385+
Output.Info("");
386+
Output.Info("Dead Letter Summary:");
387+
388+
var headers = new[] { "#", "Label", "DeadLetterReason", "Count" };
389+
var rows = categories.Select((cat, index) => new[]
390+
{
391+
(index + 1).ToString(),
392+
cat.Label.ReplaceLineEndings(" "),
393+
cat.DeadLetterReason.ReplaceLineEndings(" "),
394+
cat.Count.ToString()
395+
});
396+
397+
Output.Table(headers, rows);
398+
399+
var total = categories.Sum(c => c.Count);
400+
Output.Info($"Total: {total} messages");
401+
}
402+
403+
private static List<int>? ParseSelection(string? input, int maxIndex)
404+
{
405+
if (string.IsNullOrWhiteSpace(input))
406+
{
407+
return null;
408+
}
409+
410+
var trimmed = input.Trim().ToLowerInvariant();
411+
412+
if (trimmed == "q" || trimmed == "quit")
413+
{
414+
return null;
415+
}
416+
417+
if (trimmed == "all" || trimmed == "a")
418+
{
419+
return Enumerable.Range(0, maxIndex).ToList();
420+
}
421+
422+
var indices = new List<int>();
423+
var parts = input.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
424+
425+
foreach (var part in parts)
426+
{
427+
// Handle ranges like "1-5"
428+
if (part.Contains('-'))
429+
{
430+
var rangeParts = part.Split('-', 2);
431+
if (rangeParts.Length == 2 &&
432+
int.TryParse(rangeParts[0], out var start) &&
433+
int.TryParse(rangeParts[1], out var end))
434+
{
435+
for (var i = start; i <= end; i++)
436+
{
437+
var idx = i - 1; // Convert to 0-based
438+
if (idx >= 0 && idx < maxIndex && !indices.Contains(idx))
439+
{
440+
indices.Add(idx);
441+
}
442+
}
443+
}
444+
}
445+
else if (int.TryParse(part, out var num))
446+
{
447+
var idx = num - 1; // Convert to 0-based
448+
if (idx >= 0 && idx < maxIndex && !indices.Contains(idx))
449+
{
450+
indices.Add(idx);
451+
}
452+
}
453+
}
454+
455+
return indices;
456+
}
457+
458+
private async Task<int> PurgeByCategoriesAsync(
459+
ServiceBusClient client,
460+
PurgeDlqOptions options,
461+
HashSet<(string Label, string Reason)> selectedCategories,
462+
CancellationToken cancellationToken)
463+
{
464+
await using var receiver = CreateDlqReceiver(client,
465+
options.Queue,
466+
options.Topic,
467+
options.Subscription,
468+
ServiceBusReceiveMode.PeekLock);
469+
470+
var totalDeleted = 0;
471+
var totalSkipped = 0;
472+
var emptyBatches = 0;
473+
474+
while (!cancellationToken.IsCancellationRequested && emptyBatches < EmptyBatchThreshold)
475+
{
476+
var messages = await receiver.ReceiveMessagesAsync(MaxBatchSize,
477+
MaxWaitTime,
478+
cancellationToken);
479+
480+
if (messages.Count == 0)
481+
{
482+
emptyBatches++;
483+
Output.Verbose($"Empty batch {emptyBatches}/{EmptyBatchThreshold}", options.Verbose);
484+
continue;
485+
}
486+
487+
emptyBatches = 0;
488+
489+
foreach (var message in messages)
490+
{
491+
var label = message.Subject ?? "(none)";
492+
var reason = message.DeadLetterReason ?? "(none)";
493+
var key = (label, reason);
494+
495+
if (selectedCategories.Contains(key))
496+
{
497+
await receiver.CompleteMessageAsync(message, cancellationToken);
498+
totalDeleted++;
499+
}
500+
else
501+
{
502+
await receiver.AbandonMessageAsync(message, cancellationToken: cancellationToken);
503+
totalSkipped++;
504+
}
505+
}
506+
507+
Output.Progress($"Purged {totalDeleted} messages (skipped {totalSkipped})...");
508+
}
509+
510+
return totalDeleted;
511+
}
260512
}

ServiceBusToolset/Options/PurgeDlqOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public class PurgeDlqOptions
4141
HelpText = "Enable verbose output")]
4242
public bool Verbose { get; set; }
4343

44+
[Option('i',
45+
"interactive",
46+
Default = false,
47+
HelpText = "Interactive mode: show categories and select which to purge")]
48+
public bool Interactive { get; set; }
49+
4450
public bool IsQueueMode => !string.IsNullOrEmpty(Queue);
4551
public bool IsSubscriptionMode => !string.IsNullOrEmpty(Topic) && !string.IsNullOrEmpty(Subscription);
4652

ServiceBusToolset/ServiceBusToolset.csproj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.2"/>
12-
<PackageReference Include="Azure.Identity" Version="1.13.2"/>
13-
<PackageReference Include="CommandLineParser" Version="2.9.1"/>
11+
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.2" />
12+
<PackageReference Include="Azure.Identity" Version="1.13.2" />
13+
<PackageReference Include="CommandLineParser" Version="2.9.1" />
14+
<PackageReference Include="Spectre.Console" Version="0.54.0" />
1415
</ItemGroup>
1516

1617
</Project>

ServiceBusToolset/Services/ConsoleOutput.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Spectre.Console;
2+
13
namespace ServiceBusToolset.Services;
24

35
public class ConsoleOutput : IConsoleOutput
@@ -44,4 +46,25 @@ public void Progress(string message)
4446
Console.Write($"\r{message}");
4547
Console.ResetColor();
4648
}
49+
50+
public void Table(IEnumerable<string> headers, IEnumerable<string[]> rows)
51+
{
52+
var table = new Table();
53+
table.Border(TableBorder.Rounded);
54+
table.Expand();
55+
56+
foreach (var header in headers)
57+
{
58+
table.AddColumn(new TableColumn(header));
59+
}
60+
61+
foreach (var row in rows)
62+
{
63+
table.AddRow(row.Select(Markup.Escape).ToArray());
64+
}
65+
66+
AnsiConsole.Write(table);
67+
}
68+
69+
public string? ReadLine() => Console.ReadLine();
4770
}

ServiceBusToolset/Services/IConsoleOutput.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ public interface IConsoleOutput
88
void Error(string message);
99
void Verbose(string message, bool isVerbose);
1010
void Progress(string message);
11+
void Table(IEnumerable<string> headers, IEnumerable<string[]> rows);
12+
string? ReadLine();
1113
}

0 commit comments

Comments
 (0)