From c2884e5af560d06fba10d22db34d5224959f1e82 Mon Sep 17 00:00:00 2001 From: mtlyzhang Date: Mon, 13 Apr 2026 17:07:57 +0800 Subject: [PATCH] [ISSUE #10256] Fix shared mutable batchAckIndexList causing toStoreBits update failure in PopBufferMergeService When enablePopBatchAck and appendAckAsync are both enabled, scan() passes a class-level shared batchAckIndexList to the async putBatchAckToStore(), then immediately clears the list in the finally block. The async callback captures the same (now-empty) list reference, causing toStoreBits to never be updated. This prevents checkpoint wrappers from being removed from the unbounded commitOffsets queue, leading to memory leak and potential OOM. Fix: pass a defensive copy new ArrayList<>(indexList) so the async callback captures an independent snapshot of the index data. Made-with: Cursor --- .../processor/PopBufferMergeService.java | 2 +- .../processor/PopBufferMergeServiceTest.java | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 5373eaea333..c1011cb332c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -306,7 +306,7 @@ private void scan() { } } if (indexList.size() > 0) { - putBatchAckToStore(pointWrapper, indexList, count); + putBatchAckToStore(pointWrapper, new ArrayList<>(indexList), count); } } finally { indexList.clear(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index 6cbbd9cfd92..a46b909d819 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -234,4 +234,51 @@ public void testPutAckToStore() throws Exception { method.invoke(popBufferMergeService, pointWrapper, msgIndex, count); verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); } + + @Test + public void testBatchAckAsyncShouldUpdateToStoreBitsAfterFix() throws Exception { + PopCheckPoint point = new PopCheckPoint(); + point.setStartOffset(100L); + point.setCId(defaultGroup); + point.setTopic(defaultTopic); + point.setQueueId(1); + point.setNum((byte) 2); + long popTime = System.currentTimeMillis() - 20000; + point.setPopTime(popTime); + point.setInvisibleTime(30000); + point.setBrokerName("testBroker"); + + PopBufferMergeService.PopCheckPointWrapper pointWrapper = + popBufferMergeService.new PopCheckPointWrapper(0, 0, point, 102); + pointWrapper.getBits().set(3); + pointWrapper.setCkStored(true); + popBufferMergeService.buffer.put(pointWrapper.getMergeKey(), pointWrapper); + + when(brokerConfig.isEnablePopBatchAck()).thenReturn(true); + when(brokerConfig.isAppendAckAsync()).thenReturn(true); + when(brokerConfig.getPopCkStayBufferTimeOut()).thenReturn(3000); + + java.util.concurrent.CompletableFuture future = + new java.util.concurrent.CompletableFuture<>(); + EscapeBridge escapeBridge = mock(EscapeBridge.class); + when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); + when(escapeBridge.asyncPutMessageToSpecificQueue(any())).thenReturn(future); + + BrokerMetricsManager brokerMetricsManager = mock(BrokerMetricsManager.class); + PopMetricsManager popMetricsManager = mock(PopMetricsManager.class); + when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager); + when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager); + + Method scanMethod = PopBufferMergeService.class.getDeclaredMethod("scan"); + scanMethod.setAccessible(true); + scanMethod.invoke(popBufferMergeService); + + PutMessageResult putResult = mock(PutMessageResult.class); + when(putResult.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK); + future.complete(putResult); + + assertEquals( + "After fix, async callback should correctly update toStoreBits to 3 (binary 11)", + 3, pointWrapper.getToStoreBits().get()); + } }