-
Notifications
You must be signed in to change notification settings - Fork 0
Publish application layer as a NuGet #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
49d3ee2
6da25b2
3d82639
e8f8aee
98434b8
488bf05
415bdbd
32c47eb
01df065
435d8bf
eee32fb
2bce3ef
ba326e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| using Ardalis.Result; | ||
| using Mediator; | ||
| using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Models; | ||
|
|
||
| namespace ServiceBusToolset.Application.DeadLetters.DiagnoseDlq; | ||
|
|
||
| public sealed record DiagnoseBatchCommand(string AppInsightsResourceId, | ||
| IReadOnlyList<OperationInfo> Operations) : ICommand<Result<IReadOnlyList<DiagnosticResult>>>; | ||
|
|
||
| public sealed record OperationInfo(string OperationId, | ||
| DateTimeOffset EnqueuedTime, | ||
| string? MessageId, | ||
| string? Subject, | ||
| string? DeadLetterReason); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| using Ardalis.Result; | ||
| using Mediator; | ||
| using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Common.AppInsights; | ||
| using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Models; | ||
|
|
||
| namespace ServiceBusToolset.Application.DeadLetters.DiagnoseDlq; | ||
|
|
||
| public sealed class DiagnoseBatchCommandHandler(IAppInsightsService appInsightsService) | ||
| : ICommandHandler<DiagnoseBatchCommand, Result<IReadOnlyList<DiagnosticResult>>> | ||
| { | ||
| public async ValueTask<Result<IReadOnlyList<DiagnosticResult>>> Handle( | ||
| DiagnoseBatchCommand command, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| if (command.Operations.Count == 0) | ||
| { | ||
| return Result.Success<IReadOnlyList<DiagnosticResult>>([]); | ||
| } | ||
|
|
||
| appInsightsService.Initialize(command.AppInsightsResourceId); | ||
|
|
||
| var operations = command.Operations | ||
| .Select(op => (op.OperationId, op.EnqueuedTime)) | ||
| .ToList(); | ||
|
|
||
| var diagnosticResults = await appInsightsService.DiagnoseBatchAsync(operations, | ||
| null, | ||
| cancellationToken); | ||
|
|
||
| var results = new List<DiagnosticResult>(); | ||
| // Deduplicate by OperationId — take the first occurrence if duplicates exist | ||
| var operationsById = command.Operations | ||
| .DistinctBy(op => op.OperationId) | ||
| .ToDictionary(op => op.OperationId); | ||
|
|
||
| foreach (var (operationId, result) in diagnosticResults) | ||
| { | ||
| if (operationsById.TryGetValue(operationId, out var operation)) | ||
| { | ||
| result.MessageId = operation.MessageId; | ||
| result.Subject = operation.Subject; | ||
| result.DeadLetterReason = operation.DeadLetterReason; | ||
| results.Add(result); | ||
| } | ||
| } | ||
|
|
||
| return Result.Success<IReadOnlyList<DiagnosticResult>>(results); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| using Ardalis.Result; | ||
| using Mediator; | ||
| using ServiceBusToolset.Application.Common.ServiceBus.Models; | ||
|
|
||
| namespace ServiceBusToolset.Application.DeadLetters.PeekDlq; | ||
|
|
||
| public sealed record PeekDlqBatchCommand(string FullyQualifiedNamespace, | ||
| EntityTarget Target, | ||
| int BatchSize = 500, | ||
| long? FromSequenceNumber = null, | ||
| long? KnownDeadLetterCount = null) : ICommand<Result<PeekDlqBatchResult>>; |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,146 @@ | ||||||||||||||||||
| using Ardalis.Result; | ||||||||||||||||||
| using Azure.Messaging.ServiceBus; | ||||||||||||||||||
| using Mediator; | ||||||||||||||||||
| using ServiceBusToolset.Application.Common.ServiceBus.Abstractions; | ||||||||||||||||||
| using ServiceBusToolset.Application.Common.ServiceBus.Helpers; | ||||||||||||||||||
| using ServiceBusToolset.Application.Common.ServiceBus.Models; | ||||||||||||||||||
| using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Common; | ||||||||||||||||||
|
|
||||||||||||||||||
| namespace ServiceBusToolset.Application.DeadLetters.PeekDlq; | ||||||||||||||||||
|
|
||||||||||||||||||
| public sealed class PeekDlqBatchCommandHandler(IServiceBusClientFactory clientFactory) | ||||||||||||||||||
| : ICommandHandler<PeekDlqBatchCommand, Result<PeekDlqBatchResult>> | ||||||||||||||||||
| { | ||||||||||||||||||
| private const int PeekSubBatchSize = 100; | ||||||||||||||||||
| private const int EmptyBatchThreshold = 3; | ||||||||||||||||||
|
|
||||||||||||||||||
| public async ValueTask<Result<PeekDlqBatchResult>> Handle( | ||||||||||||||||||
| PeekDlqBatchCommand command, | ||||||||||||||||||
| CancellationToken cancellationToken) | ||||||||||||||||||
| { | ||||||||||||||||||
| await using var client = clientFactory.CreateClient(command.FullyQualifiedNamespace); | ||||||||||||||||||
|
|
||||||||||||||||||
| // Get total DLQ count — use known count if provided, otherwise query admin API | ||||||||||||||||||
| var totalDeadLetterCount = command.KnownDeadLetterCount | ||||||||||||||||||
| ?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target); | ||||||||||||||||||
|
|
||||||||||||||||||
| // Peek messages in sub-batches until we reach BatchSize or run out | ||||||||||||||||||
| await using var receiver = ReceiverFactory.CreateDlqReceiver(client, command.Target); | ||||||||||||||||||
|
|
||||||||||||||||||
| List<ServiceBusReceivedMessage> allMessages = []; | ||||||||||||||||||
| var emptyBatches = 0; | ||||||||||||||||||
| var isFirstPeek = true; | ||||||||||||||||||
| var highestSequenceNumber = command.FromSequenceNumber ?? -1; | ||||||||||||||||||
|
|
||||||||||||||||||
| while (allMessages.Count < command.BatchSize && | ||||||||||||||||||
| emptyBatches < EmptyBatchThreshold && | ||||||||||||||||||
| !cancellationToken.IsCancellationRequested) | ||||||||||||||||||
| { | ||||||||||||||||||
| var remaining = command.BatchSize - allMessages.Count; | ||||||||||||||||||
| var subBatchSize = Math.Min(PeekSubBatchSize, remaining); | ||||||||||||||||||
|
|
||||||||||||||||||
| IReadOnlyList<ServiceBusReceivedMessage> batch; | ||||||||||||||||||
| if (isFirstPeek && command.FromSequenceNumber.HasValue) | ||||||||||||||||||
| { | ||||||||||||||||||
| batch = await receiver.PeekMessagesAsync(subBatchSize, command.FromSequenceNumber.Value + 1, cancellationToken); | ||||||||||||||||||
| isFirstPeek = false; | ||||||||||||||||||
| } | ||||||||||||||||||
| else | ||||||||||||||||||
| { | ||||||||||||||||||
| batch = await receiver.PeekMessagesAsync(subBatchSize, cancellationToken:cancellationToken); | ||||||||||||||||||
| isFirstPeek = false; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (batch.Count == 0) | ||||||||||||||||||
| { | ||||||||||||||||||
| emptyBatches++; | ||||||||||||||||||
| continue; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Detect wrap-around: if the batch contains messages with sequence numbers | ||||||||||||||||||
| // we've already passed, the receiver has looped back to the beginning | ||||||||||||||||||
| if (batch[0].SequenceNumber <= highestSequenceNumber) | ||||||||||||||||||
| { | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| emptyBatches = 0; | ||||||||||||||||||
| allMessages.AddRange(batch); | ||||||||||||||||||
| highestSequenceNumber = batch[^1].SequenceNumber; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (allMessages.Count == 0) | ||||||||||||||||||
| { | ||||||||||||||||||
| return Result.Success(new PeekDlqBatchResult([], | ||||||||||||||||||
| 0, | ||||||||||||||||||
| 0, | ||||||||||||||||||
| command.FromSequenceNumber, | ||||||||||||||||||
| false, | ||||||||||||||||||
| totalDeadLetterCount)); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Extract operation IDs | ||||||||||||||||||
| List<PeekedMessage> messages = []; | ||||||||||||||||||
| var skipped = 0; | ||||||||||||||||||
| HashSet<string> seenOperationIds = []; | ||||||||||||||||||
|
|
||||||||||||||||||
| foreach (var message in allMessages) | ||||||||||||||||||
| { | ||||||||||||||||||
| var operationId = MessageDiagnostics.ExtractOperationId(message); | ||||||||||||||||||
| if (string.IsNullOrEmpty(operationId)) | ||||||||||||||||||
| { | ||||||||||||||||||
| skipped++; | ||||||||||||||||||
| continue; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (seenOperationIds.Add(operationId)) | ||||||||||||||||||
| { | ||||||||||||||||||
| messages.Add(new PeekedMessage(message.MessageId, | ||||||||||||||||||
| message.Subject, | ||||||||||||||||||
| operationId, | ||||||||||||||||||
| message.EnqueuedTime, | ||||||||||||||||||
| message.DeadLetterReason)); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| var lastSequenceNumber = allMessages[^1].SequenceNumber; | ||||||||||||||||||
| // We have more messages if we filled the batch (didn't run out early) | ||||||||||||||||||
| var hasMore = allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold; | ||||||||||||||||||
|
Comment on lines
+107
to
+109
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Line 109 ignores the resolved count from Line 25. If the first page returns exactly 🧭 Suggested change- var hasMore = allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
+ var hasMore = command.FromSequenceNumber is null && totalDeadLetterCount > 0
+ ? allMessages.Count < totalDeadLetterCount
+ : allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
|
|
||||||||||||||||||
| return Result.Success(new PeekDlqBatchResult(messages, | ||||||||||||||||||
| allMessages.Count, | ||||||||||||||||||
| skipped, | ||||||||||||||||||
| lastSequenceNumber, | ||||||||||||||||||
| hasMore, | ||||||||||||||||||
| totalDeadLetterCount)); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private static async Task<long> GetDeadLetterCountAsync( | ||||||||||||||||||
| IServiceBusClientFactory clientFactory, | ||||||||||||||||||
| string fullyQualifiedNamespace, | ||||||||||||||||||
| EntityTarget target) | ||||||||||||||||||
| { | ||||||||||||||||||
| try | ||||||||||||||||||
| { | ||||||||||||||||||
| var adminClient = clientFactory.CreateAdministrationClient(fullyQualifiedNamespace); | ||||||||||||||||||
|
|
||||||||||||||||||
| if (target.IsQueueMode) | ||||||||||||||||||
| { | ||||||||||||||||||
| var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!); | ||||||||||||||||||
| return props.Value.DeadLetterMessageCount; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!); | ||||||||||||||||||
| return subProps.Value.DeadLetterMessageCount; | ||||||||||||||||||
| } | ||||||||||||||||||
| catch (Azure.RequestFailedException) | ||||||||||||||||||
| { | ||||||||||||||||||
| // Admin API may not be available in all environments (e.g., emulator, unit tests) | ||||||||||||||||||
| return 0; | ||||||||||||||||||
| } | ||||||||||||||||||
| catch (OperationCanceledException) | ||||||||||||||||||
| { | ||||||||||||||||||
| return 0; | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+126
to
+153
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing The admin API methods accept a 🛠️ Proposed fix to propagate cancellation token private static async Task<long> GetDeadLetterCountAsync(
IServiceBusClientFactory clientFactory,
string fullyQualifiedNamespace,
- EntityTarget target)
+ EntityTarget target,
+ CancellationToken cancellationToken)
{
try
{
var adminClient = clientFactory.CreateAdministrationClient(fullyQualifiedNamespace);
if (target.IsQueueMode)
{
- var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!);
+ var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!, cancellationToken);
return props.Value.DeadLetterMessageCount;
}
- var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!);
+ var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!, cancellationToken);
return subProps.Value.DeadLetterMessageCount;
}And update the call site at line 25: var totalDeadLetterCount = command.KnownDeadLetterCount
- ?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target);
+ ?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target, cancellationToken);🤖 Prompt for AI Agents |
||||||||||||||||||
| } | ||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| namespace ServiceBusToolset.Application.DeadLetters.PeekDlq; | ||
|
|
||
| public sealed record PeekDlqBatchResult(IReadOnlyList<PeekedMessage> Messages, | ||
| int PeekedInBatch, | ||
| int SkippedNoOperationId, | ||
| long? LastSequenceNumber, | ||
| bool HasMoreMessages, | ||
| long TotalDeadLetterCount); | ||
|
|
||
| public sealed record PeekedMessage(string? MessageId, | ||
| string? Subject, | ||
| string OperationId, | ||
| DateTimeOffset EnqueuedTime, | ||
| string? DeadLetterReason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't treat cancellation as normal exhaustion.
The
IsCancellationRequestedguard at Line 38 only exits the loop, so a canceled request falls through to the success returns at Lines 73-80 or Lines 111-116 with empty/partial data. That makes cancellation indistinguishable from “no more messages”.🛠️ Suggested change
Also applies to: 73-80, 111-116
🤖 Prompt for AI Agents