Skip to content
8 changes: 7 additions & 1 deletion .github/workflows/alpha-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,16 @@ jobs:
echo "version=$ALPHA_VERSION" >> $GITHUB_OUTPUT
echo "Generated version: $ALPHA_VERSION"

- name: Pack
- name: Pack CLI
env:
VERSION: ${{ steps.version.outputs.version }}
run: dotnet pack src/ServiceBusToolset.CLI -c Release --no-build -o ./artifacts -p:Version="$VERSION"

- name: Pack Application
env:
VERSION: ${{ steps.version.outputs.version }}
run: dotnet pack src/ServiceBusToolset.Application -c Release --no-build -o ./artifacts -p:Version="$VERSION"

- name: Push to NuGet
env:
NUGET_API_KEY: ${{ secrets.NUGET_API_KEY }}
Expand All @@ -133,3 +138,4 @@ jobs:
exit 0
fi
dotnet nuget push ./artifacts/*.nupkg --api-key "$NUGET_API_KEY" --source https://api.nuget.org/v3/index.json --skip-duplicate
dotnet nuget push ./artifacts/*.snupkg --api-key "$NUGET_API_KEY" --source https://api.nuget.org/v3/index.json --skip-duplicate
8 changes: 7 additions & 1 deletion .github/workflows/stable-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ jobs:
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
echo "Releasing version: $VERSION"

- name: Pack
- name: Pack CLI
env:
VERSION: ${{ steps.version.outputs.version }}
run: dotnet pack src/ServiceBusToolset.CLI -c Release --no-build -o ./artifacts -p:Version="$VERSION"

- name: Pack Application
env:
VERSION: ${{ steps.version.outputs.version }}
run: dotnet pack src/ServiceBusToolset.Application -c Release --no-build -o ./artifacts -p:Version="$VERSION"

- name: Generate SBOM
env:
VERSION: ${{ steps.version.outputs.version }}
Expand All @@ -91,6 +96,7 @@ jobs:
exit 1
fi
dotnet nuget push ./artifacts/*.nupkg --api-key "$NUGET_API_KEY" --source https://api.nuget.org/v3/index.json --skip-duplicate
dotnet nuget push ./artifacts/*.snupkg --api-key "$NUGET_API_KEY" --source https://api.nuget.org/v3/index.json --skip-duplicate

- name: Upload package to GitHub Release
uses: softprops/action-gh-release@153bb8e04406b158c6c84fc1615b65b24149a1fe # v2.6.1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Ardalis.Result;
using Mediator;
using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Models;

namespace ServiceBusToolset.Application.DeadLetters.DiagnoseDlq;

public sealed record DiagnoseBatchCommand(string AppInsightsResourceId,
IReadOnlyList<OperationInfo> Operations) : ICommand<Result<IReadOnlyList<DiagnosticResult>>>;

public sealed record OperationInfo(string OperationId,
DateTimeOffset EnqueuedTime,
string? MessageId,
string? Subject,
string? DeadLetterReason);
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Ardalis.Result;
using Mediator;
using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Common.AppInsights;
using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Models;

namespace ServiceBusToolset.Application.DeadLetters.DiagnoseDlq;

public sealed class DiagnoseBatchCommandHandler(IAppInsightsService appInsightsService)
: ICommandHandler<DiagnoseBatchCommand, Result<IReadOnlyList<DiagnosticResult>>>
{
public async ValueTask<Result<IReadOnlyList<DiagnosticResult>>> Handle(
DiagnoseBatchCommand command,
CancellationToken cancellationToken)
{
if (command.Operations.Count == 0)
{
return Result.Success<IReadOnlyList<DiagnosticResult>>([]);
}

appInsightsService.Initialize(command.AppInsightsResourceId);

var operations = command.Operations
.Select(op => (op.OperationId, op.EnqueuedTime))
.ToList();

var diagnosticResults = await appInsightsService.DiagnoseBatchAsync(operations,
null,
cancellationToken);

var results = new List<DiagnosticResult>();
// Deduplicate by OperationId — take the first occurrence if duplicates exist
var operationsById = command.Operations
.DistinctBy(op => op.OperationId)
.ToDictionary(op => op.OperationId);

foreach (var (operationId, result) in diagnosticResults)
{
if (operationsById.TryGetValue(operationId, out var operation))
{
result.MessageId = operation.MessageId;
result.Subject = operation.Subject;
result.DeadLetterReason = operation.DeadLetterReason;
results.Add(result);
}
}

return Result.Success<IReadOnlyList<DiagnosticResult>>(results);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Ardalis.Result;
using Mediator;
using ServiceBusToolset.Application.Common.ServiceBus.Models;

namespace ServiceBusToolset.Application.DeadLetters.PeekDlq;

public sealed record PeekDlqBatchCommand(string FullyQualifiedNamespace,
EntityTarget Target,
int BatchSize = 500,
long? FromSequenceNumber = null,
long? KnownDeadLetterCount = null) : ICommand<Result<PeekDlqBatchResult>>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
using Ardalis.Result;
using Azure;
using Azure.Messaging.ServiceBus;
using Mediator;
using ServiceBusToolset.Application.Common.ServiceBus.Abstractions;
using ServiceBusToolset.Application.Common.ServiceBus.Helpers;
using ServiceBusToolset.Application.Common.ServiceBus.Models;
using ServiceBusToolset.Application.DeadLetters.DiagnoseDlq.Common;

namespace ServiceBusToolset.Application.DeadLetters.PeekDlq;

public sealed class PeekDlqBatchCommandHandler(IServiceBusClientFactory clientFactory)
: ICommandHandler<PeekDlqBatchCommand, Result<PeekDlqBatchResult>>
{
private const int PeekSubBatchSize = 100;
private const int EmptyBatchThreshold = 3;

public async ValueTask<Result<PeekDlqBatchResult>> Handle(
PeekDlqBatchCommand command,
CancellationToken cancellationToken)
{
await using var client = clientFactory.CreateClient(command.FullyQualifiedNamespace);

// Get total DLQ count — use known count if provided, otherwise query admin API
var totalDeadLetterCount = command.KnownDeadLetterCount
?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target);

// Peek messages in sub-batches until we reach BatchSize or run out
await using var receiver = ReceiverFactory.CreateDlqReceiver(client, command.Target);

List<ServiceBusReceivedMessage> allMessages = [];
var emptyBatches = 0;
var isFirstPeek = true;
var highestSequenceNumber = command.FromSequenceNumber ?? -1;

while (allMessages.Count < command.BatchSize &&
emptyBatches < EmptyBatchThreshold &&
!cancellationToken.IsCancellationRequested)
Comment on lines +36 to +38
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat cancellation as normal exhaustion.

The IsCancellationRequested guard at Line 38 only exits the loop, so a canceled request falls through to the success returns at Lines 73-80 or Lines 111-116 with empty/partial data. That makes cancellation indistinguishable from “no more messages”.

🛠️ Suggested change
-        while (allMessages.Count < command.BatchSize &&
-               emptyBatches < EmptyBatchThreshold &&
-               !cancellationToken.IsCancellationRequested)
+        while (allMessages.Count < command.BatchSize &&
+               emptyBatches < EmptyBatchThreshold)
         {
+            cancellationToken.ThrowIfCancellationRequested();
             var remaining = command.BatchSize - allMessages.Count;
             var subBatchSize = Math.Min(PeekSubBatchSize, remaining);

Also applies to: 73-80, 111-116

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/ServiceBusToolset.Application/DeadLetters/PeekDlq/PeekDlqBatchCommandHandler.cs`
around lines 36 - 38, The loop in PeekDlqBatchCommandHandler.Handle currently
treats cancellation as normal exhaustion because it only breaks the loop on
cancellation; update the handler so that after the loop you check
cancellationToken.IsCancellationRequested and, if set, short-circuit with an
appropriate canceled response (e.g., throw OperationCanceledException or return
a Result/Response indicating cancellation) instead of falling through to the
success return paths that return empty/partial data (the success-returning
blocks currently used around the "empty/partial" returns); ensure both success
return sites that return empty results are guarded by this cancellation check so
cancellation is distinguishable from "no more messages."

{
var remaining = command.BatchSize - allMessages.Count;
var subBatchSize = Math.Min(PeekSubBatchSize, remaining);

IReadOnlyList<ServiceBusReceivedMessage> batch;
if (isFirstPeek && command.FromSequenceNumber.HasValue)
{
batch = await receiver.PeekMessagesAsync(subBatchSize, command.FromSequenceNumber.Value + 1, cancellationToken);
isFirstPeek = false;
}
else
{
batch = await receiver.PeekMessagesAsync(subBatchSize, cancellationToken:cancellationToken);
isFirstPeek = false;
}

if (batch.Count == 0)
{
emptyBatches++;
continue;
}

// Detect wrap-around: if the batch contains messages with sequence numbers
// we've already passed, the receiver has looped back to the beginning
if (batch[0].SequenceNumber <= highestSequenceNumber)
{
break;
}

emptyBatches = 0;
allMessages.AddRange(batch);
highestSequenceNumber = batch[^1].SequenceNumber;
}

if (allMessages.Count == 0)
{
return Result.Success(new PeekDlqBatchResult([],
0,
0,
command.FromSequenceNumber,
false,
totalDeadLetterCount));
}

// Extract operation IDs
List<PeekedMessage> messages = [];
var skipped = 0;
HashSet<string> seenOperationIds = [];

foreach (var message in allMessages)
{
var operationId = MessageDiagnostics.ExtractOperationId(message);
if (string.IsNullOrEmpty(operationId))
{
skipped++;
continue;
}

if (seenOperationIds.Add(operationId))
{
messages.Add(new PeekedMessage(message.MessageId,
message.Subject,
operationId,
message.EnqueuedTime,
message.DeadLetterReason));
}
}

var lastSequenceNumber = allMessages[^1].SequenceNumber;
// We have more messages if we filled the batch (didn't run out early)
var hasMore = allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
Comment on lines +107 to +109
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

HasMoreMessages can be a false positive on a full first page.

Line 109 ignores the resolved count from Line 25. If the first page returns exactly BatchSize messages and totalDeadLetterCount is the same value, this still reports HasMoreMessages = true, which sends the caller to a guaranteed-empty next page.

🧭 Suggested change
-        var hasMore = allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
+        var hasMore = command.FromSequenceNumber is null && totalDeadLetterCount > 0
+            ? allMessages.Count < totalDeadLetterCount
+            : allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var lastSequenceNumber = allMessages[^1].SequenceNumber;
// We have more messages if we filled the batch (didn't run out early)
var hasMore = allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
var lastSequenceNumber = allMessages[^1].SequenceNumber;
// We have more messages if we filled the batch (didn't run out early)
var hasMore = command.FromSequenceNumber is null && totalDeadLetterCount > 0
? allMessages.Count < totalDeadLetterCount
: allMessages.Count >= command.BatchSize && emptyBatches < EmptyBatchThreshold;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/ServiceBusToolset.Application/DeadLetters/PeekDlq/PeekDlqBatchCommandHandler.cs`
around lines 107 - 109, The hasMore calculation in PeekDlqBatchCommandHandler.cs
(using allMessages, command.BatchSize and emptyBatches) can return true on a
full first page even when there are no more dead letters; change hasMore to use
the resolved total dead-letter count (the variable assigned at Line 25, e.g.,
totalDeadLetterCount/resolvedCount) together with the current offset/skip to
determine if more rows exist—i.e., compute hasMore by checking
totalDeadLetterCount > (currentOffset + allMessages.Count) and still require
emptyBatches < EmptyBatchThreshold, instead of relying solely on
allMessages.Count >= command.BatchSize.


return Result.Success(new PeekDlqBatchResult(messages,
allMessages.Count,
skipped,
lastSequenceNumber,
hasMore,
totalDeadLetterCount));
}

private static async Task<long> GetDeadLetterCountAsync(
IServiceBusClientFactory clientFactory,
string fullyQualifiedNamespace,
EntityTarget target)
{
try
{
var adminClient = clientFactory.CreateAdministrationClient(fullyQualifiedNamespace);

if (target.IsQueueMode)
{
var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!);
return props.Value.DeadLetterMessageCount;
}

var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!);
return subProps.Value.DeadLetterMessageCount;
}
catch (RequestFailedException)
{
// Admin API may not be available in all environments (e.g., emulator, unit tests)
return 0;
}
catch (OperationCanceledException)
{
return 0;
}
}
Comment on lines +126 to +153
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing CancellationToken propagation to admin API calls.

The admin API methods accept a CancellationToken, but this method neither accepts one nor passes it to the underlying calls. While the OperationCanceledException catch handles post-hoc cancellation, the calls themselves will run to completion even if the parent operation is cancelled.

🛠️ Proposed fix to propagate cancellation token
 private static async Task<long> GetDeadLetterCountAsync(
     IServiceBusClientFactory clientFactory,
     string fullyQualifiedNamespace,
-    EntityTarget target)
+    EntityTarget target,
+    CancellationToken cancellationToken)
 {
     try
     {
         var adminClient = clientFactory.CreateAdministrationClient(fullyQualifiedNamespace);

         if (target.IsQueueMode)
         {
-            var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!);
+            var props = await adminClient.GetQueueRuntimePropertiesAsync(target.Queue!, cancellationToken);
             return props.Value.DeadLetterMessageCount;
         }

-        var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!);
+        var subProps = await adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!, target.Subscription!, cancellationToken);
         return subProps.Value.DeadLetterMessageCount;
     }

And update the call site at line 25:

 var totalDeadLetterCount = command.KnownDeadLetterCount
-                           ?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target);
+                           ?? await GetDeadLetterCountAsync(clientFactory, command.FullyQualifiedNamespace, command.Target, cancellationToken);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/ServiceBusToolset.Application/DeadLetters/PeekDlq/PeekDlqBatchCommandHandler.cs`
around lines 118 - 145, Update GetDeadLetterCountAsync to accept and propagate a
CancellationToken and pass it into the Azure admin API calls: modify the method
signature of GetDeadLetterCountAsync(IServiceBusClientFactory clientFactory,
string fullyQualifiedNamespace, EntityTarget target, CancellationToken
cancellationToken), and then call
adminClient.GetQueueRuntimePropertiesAsync(target.Queue!, cancellationToken) and
adminClient.GetSubscriptionRuntimePropertiesAsync(target.Topic!,
target.Subscription!, cancellationToken); also remove the
OperationCanceledException catch or keep it as needed, and update all call sites
to supply the appropriate CancellationToken when invoking
GetDeadLetterCountAsync.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceBusToolset.Application.DeadLetters.PeekDlq;

public sealed record PeekDlqBatchResult(IReadOnlyList<PeekedMessage> Messages,
int PeekedInBatch,
int SkippedNoOperationId,
long? LastSequenceNumber,
bool HasMoreMessages,
long TotalDeadLetterCount);

public sealed record PeekedMessage(string? MessageId,
string? Subject,
string OperationId,
DateTimeOffset EnqueuedTime,
string? DeadLetterReason);
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<!-- NuGet Package Metadata -->
<PackageId>ServiceBusToolset.Application</PackageId>
<Authors>Gagik Kyurkchyan</Authors>
<Description>Application layer for Azure Service Bus diagnostics: dead letter queue analysis, Application Insights correlation, and message inspection.</Description>
<PackageTags>azure;service-bus;dead-letter-queue;diagnostics;application-insights</PackageTags>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/kyurkchyan/ServiceBusToolset</PackageProjectUrl>
<RepositoryUrl>https://github.com/kyurkchyan/ServiceBusToolset.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageReadmeFile>README.md</PackageReadmeFile>

<!-- Version - Overridden by CI/CD -->
<Version>1.0.0</Version>

<!-- Source Link -->
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\README.md" Pack="true" PackagePath="\"/>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Mediator.Abstractions"/>
<PackageReference Include="Mediator.SourceGenerator">
Expand All @@ -13,6 +39,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions"/>
<PackageReference Include="System.Reactive"/>
<PackageReference Include="DynamicData"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="All"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Azure;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ServiceBusToolset.Application.Common.ServiceBus.Abstractions;

namespace ServiceBusToolset.Application.Tests.Common.Mocks;
Expand Down Expand Up @@ -34,6 +36,12 @@ private MockServiceBusClientFactory()
Factory.CreateClient(Arg.Any<string>()).Returns(Client);
Factory.CreateAdministrationClient(Arg.Any<string>()).Returns(AdminClient);

// Admin API is unavailable in tests (mirrors emulator behavior)
AdminClient.GetQueueRuntimePropertiesAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new RequestFailedException("Admin API not available"));
AdminClient.GetSubscriptionRuntimePropertiesAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new RequestFailedException("Admin API not available"));

// Setup client dispose
Client.DisposeAsync().Returns(ValueTask.CompletedTask);
}
Expand Down Expand Up @@ -86,19 +94,14 @@ private void ConfigureReceiver()
_peekCallCount = 0;
_receiveCallCount = 0;

// Setup PeekMessagesAsync - returns all messages on first call, empty on subsequent
// Setup PeekMessagesAsync - returns messages in pages, respecting maxMessages per call
Receiver.PeekMessagesAsync(Arg.Any<int>(), Arg.Any<long?>(), Arg.Any<CancellationToken>())
.Returns(callInfo =>
{
var maxMessages = callInfo.ArgAt<int>(0);
_peekCallCount++;
if (_peekCallCount == 1)
{
var batch = _messagesToReturn.Take(maxMessages).ToList();
return Task.FromResult<IReadOnlyList<ServiceBusReceivedMessage>>(batch);
}

return Task.FromResult<IReadOnlyList<ServiceBusReceivedMessage>>([]);
var batch = _messagesToReturn.Skip(_peekCallCount).Take(maxMessages).ToList();
_peekCallCount += batch.Count;
return Task.FromResult<IReadOnlyList<ServiceBusReceivedMessage>>(batch);
});

// Setup ReceiveMessagesAsync - returns all messages on first call, empty on subsequent
Expand Down
Loading
Loading