|
1 | 1 | using Azure.Identity; |
2 | 2 | using Azure.Messaging.ServiceBus; |
| 3 | +using ServiceBusToolset.Models; |
3 | 4 | using ServiceBusToolset.Options; |
4 | 5 | using ServiceBusToolset.Services; |
5 | 6 |
|
6 | 7 | namespace ServiceBusToolset.Commands; |
7 | 8 |
|
8 | | -public class PurgeDlqCommand(IServiceBusClientFactory clientFactory, IConsoleOutput output) : BaseCommand<PurgeDlqOptions>(clientFactory, output), ICommand<PurgeDlqOptions> |
| 9 | +public class PurgeDlqCommand( |
| 10 | + IServiceBusClientFactory clientFactory, |
| 11 | + IConsoleOutput output, |
| 12 | + IDlqCategoryAnalyzer categoryAnalyzer) : BaseCommand<PurgeDlqOptions>(clientFactory, output), ICommand<PurgeDlqOptions> |
9 | 13 | { |
10 | 14 | private const int MaxBatchSize = 100; |
11 | 15 | private static readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(5); |
12 | 16 | private const int EmptyBatchThreshold = 3; |
13 | 17 |
|
14 | | - private record DlqCategory(string Label, string DeadLetterReason, int Count); |
15 | | - |
16 | 18 | public async Task<int> ExecuteAsync(PurgeDlqOptions options, CancellationToken cancellationToken = default) |
17 | 19 | { |
18 | 20 | var validationError = options.Validate(); |
@@ -275,7 +277,13 @@ private async Task<int> ExecuteInteractivePurgeAsync( |
275 | 277 | Output.Info($"Analyzing DLQ for {entityDescription}..."); |
276 | 278 |
|
277 | 279 | // Step 1: Peek all messages and build category dictionary |
278 | | - var categories = await BuildCategoryListAsync(client, options, cancellationToken); |
| 280 | + var categories = await categoryAnalyzer.AnalyzeCategoriesAsync( |
| 281 | + client, |
| 282 | + options.Queue, |
| 283 | + options.Topic, |
| 284 | + options.Subscription, |
| 285 | + Output, |
| 286 | + cancellationToken); |
279 | 287 |
|
280 | 288 | if (categories.Count == 0) |
281 | 289 | { |
@@ -323,63 +331,6 @@ private async Task<int> ExecuteInteractivePurgeAsync( |
323 | 331 | return 0; |
324 | 332 | } |
325 | 333 |
|
326 | | - private async Task<List<DlqCategory>> BuildCategoryListAsync( |
327 | | - ServiceBusClient client, |
328 | | - PurgeDlqOptions options, |
329 | | - CancellationToken cancellationToken) |
330 | | - { |
331 | | - await using var receiver = CreateDlqReceiver(client, |
332 | | - options.Queue, |
333 | | - options.Topic, |
334 | | - options.Subscription, |
335 | | - ServiceBusReceiveMode.PeekLock); |
336 | | - |
337 | | - var categoryCounts = new Dictionary<(string Label, string Reason), int>(); |
338 | | - var totalPeeked = 0; |
339 | | - long? fromSequenceNumber = null; |
340 | | - |
341 | | - while (!cancellationToken.IsCancellationRequested) |
342 | | - { |
343 | | - IReadOnlyList<ServiceBusReceivedMessage> messages; |
344 | | - |
345 | | - if (fromSequenceNumber.HasValue) |
346 | | - { |
347 | | - messages = await receiver.PeekMessagesAsync(MaxBatchSize, fromSequenceNumber.Value, cancellationToken); |
348 | | - } |
349 | | - else |
350 | | - { |
351 | | - messages = await receiver.PeekMessagesAsync(MaxBatchSize, cancellationToken: cancellationToken); |
352 | | - } |
353 | | - |
354 | | - if (messages.Count == 0) |
355 | | - { |
356 | | - break; |
357 | | - } |
358 | | - |
359 | | - foreach (var msg in messages) |
360 | | - { |
361 | | - var label = msg.Subject ?? "(none)"; |
362 | | - var reason = msg.DeadLetterReason ?? "(none)"; |
363 | | - var key = (label, reason); |
364 | | - |
365 | | - var count = categoryCounts.GetValueOrDefault(key, 0); |
366 | | - categoryCounts[key] = count + 1; |
367 | | - } |
368 | | - |
369 | | - totalPeeked += messages.Count; |
370 | | - fromSequenceNumber = messages[^1].SequenceNumber + 1; |
371 | | - |
372 | | - Output.Progress($"Peeked {totalPeeked} messages..."); |
373 | | - } |
374 | | - |
375 | | - Console.WriteLine(); |
376 | | - |
377 | | - return categoryCounts |
378 | | - .OrderByDescending(kvp => kvp.Value) |
379 | | - .Select(kvp => new DlqCategory(kvp.Key.Label, kvp.Key.Reason, kvp.Value)) |
380 | | - .ToList(); |
381 | | - } |
382 | | - |
383 | 334 | private void DisplayCategoryTable(IReadOnlyCollection<DlqCategory> categories) |
384 | 335 | { |
385 | 336 | Output.Info(""); |
|
0 commit comments