Skip to content

Commit c2884e5

Browse files
author
mtlyzhang
committed
[ISSUE #10256] Fix shared mutable batchAckIndexList causing toStoreBits update failure in PopBufferMergeService
When enablePopBatchAck and appendAckAsync are both enabled, scan() passes a class-level shared batchAckIndexList to the async putBatchAckToStore(), then immediately clears the list in the finally block. The async callback captures the same (now-empty) list reference, causing toStoreBits to never be updated. This prevents checkpoint wrappers from being removed from the unbounded commitOffsets queue, leading to memory leak and potential OOM. Fix: pass a defensive copy new ArrayList<>(indexList) so the async callback captures an independent snapshot of the index data. Made-with: Cursor
1 parent 9879968 commit c2884e5

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private void scan() {
306306
}
307307
}
308308
if (indexList.size() > 0) {
309-
putBatchAckToStore(pointWrapper, indexList, count);
309+
putBatchAckToStore(pointWrapper, new ArrayList<>(indexList), count);
310310
}
311311
} finally {
312312
indexList.clear();

broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,51 @@ public void testPutAckToStore() throws Exception {
234234
method.invoke(popBufferMergeService, pointWrapper, msgIndex, count);
235235
verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class));
236236
}
237+
238+
@Test
239+
public void testBatchAckAsyncShouldUpdateToStoreBitsAfterFix() throws Exception {
240+
PopCheckPoint point = new PopCheckPoint();
241+
point.setStartOffset(100L);
242+
point.setCId(defaultGroup);
243+
point.setTopic(defaultTopic);
244+
point.setQueueId(1);
245+
point.setNum((byte) 2);
246+
long popTime = System.currentTimeMillis() - 20000;
247+
point.setPopTime(popTime);
248+
point.setInvisibleTime(30000);
249+
point.setBrokerName("testBroker");
250+
251+
PopBufferMergeService.PopCheckPointWrapper pointWrapper =
252+
popBufferMergeService.new PopCheckPointWrapper(0, 0, point, 102);
253+
pointWrapper.getBits().set(3);
254+
pointWrapper.setCkStored(true);
255+
popBufferMergeService.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
256+
257+
when(brokerConfig.isEnablePopBatchAck()).thenReturn(true);
258+
when(brokerConfig.isAppendAckAsync()).thenReturn(true);
259+
when(brokerConfig.getPopCkStayBufferTimeOut()).thenReturn(3000);
260+
261+
java.util.concurrent.CompletableFuture<PutMessageResult> future =
262+
new java.util.concurrent.CompletableFuture<>();
263+
EscapeBridge escapeBridge = mock(EscapeBridge.class);
264+
when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
265+
when(escapeBridge.asyncPutMessageToSpecificQueue(any())).thenReturn(future);
266+
267+
BrokerMetricsManager brokerMetricsManager = mock(BrokerMetricsManager.class);
268+
PopMetricsManager popMetricsManager = mock(PopMetricsManager.class);
269+
when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager);
270+
when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
271+
272+
Method scanMethod = PopBufferMergeService.class.getDeclaredMethod("scan");
273+
scanMethod.setAccessible(true);
274+
scanMethod.invoke(popBufferMergeService);
275+
276+
PutMessageResult putResult = mock(PutMessageResult.class);
277+
when(putResult.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK);
278+
future.complete(putResult);
279+
280+
assertEquals(
281+
"After fix, async callback should correctly update toStoreBits to 3 (binary 11)",
282+
3, pointWrapper.getToStoreBits().get());
283+
}
237284
}

0 commit comments

Comments
 (0)