From b3bf36d6f120574c716ca4762f8ca14a7a95edb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vieira?= Date: Fri, 23 May 2025 14:39:52 -0700 Subject: [PATCH 1/2] fix NPE when exception message is null --- core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 11c7f0a9..8a50ae0f 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -291,7 +291,7 @@ void flush(BufferNextMessage bundler) { // Old senders in other artifacts may be using this less precise way of indicating they've been closed // out-of-band. - if (t instanceof IllegalStateException && t.getMessage().equals("closed")) { + if (t instanceof IllegalStateException && "closed".equals(t.getMessage())) { throw (IllegalStateException) t; } } From 9f2dbcdfc19a13fb4a9a64b0a3ef83d4fc25b7b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vieira?= Date: Fri, 23 May 2025 15:37:09 -0700 Subject: [PATCH 2/2] adding tests --- .../java/zipkin2/reporter/FakeSender.java | 12 +++-- .../reporter/internal/AsyncReporterTest.java | 49 ++++++++++++++++--- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/core/src/test/java/zipkin2/reporter/FakeSender.java b/core/src/test/java/zipkin2/reporter/FakeSender.java index 293fd725..6c585721 100644 --- a/core/src/test/java/zipkin2/reporter/FakeSender.java +++ b/core/src/test/java/zipkin2/reporter/FakeSender.java @@ -52,17 +52,21 @@ public FakeSender messageMaxBytes(int messageMaxBytes) { return messageMaxBytes; } - /** close is typically called from a different thread */ - volatile boolean closeCalled; + // allow us to simulate an exception + volatile RuntimeException exceptionToThrow; @Override public void send(List encodedSpans) { - if (closeCalled) throw new ClosedSenderException(); + if (exceptionToThrow != null) throw exceptionToThrow; List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList()); onSpans.accept(decoded); } @Override public void close() { - closeCalled = true; + exceptionToThrow = new ClosedSenderException(); + } + + public void throwException(RuntimeException e) { + exceptionToThrow = e; } @Override public String toString() { diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index ee6a16d7..c172027c 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -114,7 +115,7 @@ void queuedMaxSpans_dropsWhenOverqueuing(int queuedMaxBytes) { reporter.report(span); // dropped the one that queued more than allowed count reporter.flush(); reporter.close(); - + assertThat(sentSpans.get()).isEqualTo(1); } @@ -131,7 +132,7 @@ void report_incrementsMetrics(int queuedMaxBytes) { reporter.report(span); reporter.flush(); reporter.close(); - + assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spanBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length * 2); } @@ -154,8 +155,7 @@ void report_incrementsSpansDropped(int queuedMaxBytes) { assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spansDropped()).isEqualTo(1); } - - + @ParameterizedTest(name = "queuedMaxBytes={0}") @ValueSource(ints = { 0, 1000000 }) void report_incrementsSpansDroppedOversizing(int queuedMaxBytes) { @@ -321,7 +321,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException { // check name is pretty assertThat(threadName.take()) .isEqualTo("AsyncReporter{FakeSender}"); - + reporter.close(); } @@ -342,7 +342,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException { BoundedAsyncReporter impl = (BoundedAsyncReporter) reporter; assertThat(impl.close.await(3, TimeUnit.MILLISECONDS)) .isTrue(); - + reporter.close(); } @@ -396,7 +396,7 @@ void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException { assertThat(metrics.messagesDropped()).isEqualTo(1); assertThat(metrics.messagesDroppedByCause().keySet().iterator().next()) .isEqualTo(ClosedSenderException.class); - + reporter.close(); } @@ -511,6 +511,41 @@ void quitsBlockingWhenOverTimeout(int queuedMaxBytes) throws InterruptedExceptio } } + @Test void flush_incrementsMetricsAndThrowsWhenIllegalStateExceptionWithMessage() { + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) + .metrics(metrics) + .messageTimeout(0, TimeUnit.MILLISECONDS) + .build(SpanBytesEncoder.JSON_V2); + + reporter.report(span); + + sleepingSender.throwException(new IllegalStateException("closed")); + try { + reporter.flush(); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isEqualTo(1); + } finally { + reporter.close(); + } + } + + @Test void flush_incrementsMetricsAndDontThrowsWhenCancellationException() { + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) + .metrics(metrics) + .messageTimeout(0, TimeUnit.MILLISECONDS) + .build(SpanBytesEncoder.JSON_V2); + + reporter.report(span); + + sleepingSender.throwException(new CancellationException()); + reporter.flush(); + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isEqualTo(1); + reporter.close(); + } + @Test void build_threadFactory() { Thread thread = new Thread(); AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create())