Skip to content

Commit 9a5c1eb

Browse files
author
mengjiaying
committed
[ISSUE #transaction-deadloop] Fix infinite loop in transaction message check when putBackHalfMsgQueue fails
Problem: When putBackHalfMsgQueue() fails during transaction message check, the code executes continue without updating the offset, causing an infinite loop that blocks the entire transaction message processing for up to 60 seconds. Solution: Add limited retry mechanism (PUT_BACK_RETRY_TIMES = 3) with exponential backoff. Skip the message after retries are exhausted and update offset to continue. Add detailed error log for troubleshooting. Impact: Before - 60s blocking per failed message with CPU spinning. After - ~300ms per failed message then continue.
1 parent 38b71b8 commit 9a5c1eb

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
6767

6868
private static final int SLEEP_WHILE_NO_OP = 1000;
6969

70+
private static final int PUT_BACK_RETRY_TIMES = 3;
71+
7072
private final ConcurrentHashMap<Integer, MessageQueueOpContext> deleteContext = new ConcurrentHashMap<>();
7173

7274
private ServiceThread transactionalOpBatchService;
@@ -298,9 +300,30 @@ public void check(long transactionTimeout, int transactionCheckMax,
298300

299301
if (isNeedCheck) {
300302

301-
if (!putBackHalfMsgQueue(msgExt, i)) {
303+
int retryTimes = 0;
304+
boolean putBackSuccess = false;
305+
while (retryTimes < PUT_BACK_RETRY_TIMES) {
306+
putBackSuccess = putBackHalfMsgQueue(msgExt, i);
307+
if (putBackSuccess) {
308+
break;
309+
}
310+
retryTimes++;
311+
if (retryTimes < PUT_BACK_RETRY_TIMES) {
312+
try {
313+
Thread.sleep(100L * retryTimes);
314+
} catch (InterruptedException ignored) {
315+
}
316+
}
317+
}
318+
319+
if (!putBackSuccess) {
320+
log.error("PutBackToHalfQueue failed after {} retries, skip this message. topic={}, queueId={}, offset={}, msgId={}",
321+
PUT_BACK_RETRY_TIMES, msgExt.getTopic(), msgExt.getQueueId(), i, msgExt.getMsgId());
322+
newOffset = i + 1;
323+
i++;
302324
continue;
303325
}
326+
304327
putInQueueCount++;
305328
log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
306329
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),

broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,73 @@ public void testOpen() {
179179
assertThat(isOpen).isTrue();
180180
}
181181

182+
@Test
183+
public void testCheck_putBackFailedShouldNotInfiniteLoop() {
184+
// This test verifies that when putBackHalfMsgQueue fails, the check method should not enter an infinite loop
185+
// The check should retry 3 times and then skip the message to continue processing subsequent messages
186+
187+
when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC));
188+
// Create a message that needs to be checked (old enough)
189+
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1));
190+
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hello2", 0));
191+
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0));
192+
when(bridge.getBrokerController()).thenReturn(this.brokerController);
193+
when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner());
194+
// Simulate putBack failure - return PUT_FAILED status
195+
when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class)))
196+
.thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
197+
// Mock fetchConsumeOffset to return valid offset
198+
when(bridge.fetchConsumeOffset(any(MessageQueue.class))).thenReturn(0L);
199+
200+
long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut();
201+
final int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
202+
203+
// This should complete without getting stuck in an infinite loop
204+
long startTime = System.currentTimeMillis();
205+
queueTransactionMsgService.check(timeOut, checkMax, listener);
206+
long elapsedTime = System.currentTimeMillis() - startTime;
207+
208+
// The check should complete quickly (within a few seconds), not run for MAX_PROCESS_TIME_LIMIT (60s)
209+
assertThat(elapsedTime).isLessThan(5000L);
210+
// Verify that putMessageReturnResult was called 3 times (retry limit)
211+
verify(bridge, org.mockito.Mockito.times(3)).putMessageReturnResult(any(MessageExtBrokerInner.class));
212+
}
213+
214+
@Test
215+
public void testCheck_putBackSucceedsAfterRetry() {
216+
// This test verifies that if putBackHalfMsgQueue succeeds after retry, the check continues normally
217+
218+
when(bridge.fetchMessageQueues(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC)).thenReturn(createMessageQueueSet(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC));
219+
when(bridge.getHalfMessage(0, 0, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 5, "hello", 1));
220+
when(bridge.getHalfMessage(0, 1, 1)).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC, 6, "hello2", 0));
221+
when(bridge.getOpMessage(anyInt(), anyLong(), anyInt())).thenReturn(createPullResult(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC, 1, "5", 0));
222+
when(bridge.getBrokerController()).thenReturn(this.brokerController);
223+
when(bridge.renewHalfMessageInner(any(MessageExtBrokerInner.class))).thenReturn(createMessageBrokerInner());
224+
when(bridge.fetchConsumeOffset(any(MessageQueue.class))).thenReturn(0L);
225+
226+
// First call fails, second call succeeds
227+
org.mockito.Mockito.when(bridge.putMessageReturnResult(any(MessageExtBrokerInner.class)))
228+
.thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null))
229+
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
230+
231+
long timeOut = this.brokerController.getBrokerConfig().getTransactionTimeOut();
232+
final int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax();
233+
234+
final AtomicInteger checkMessage = new AtomicInteger(0);
235+
doAnswer(new Answer() {
236+
@Override
237+
public Object answer(InvocationOnMock invocation) {
238+
checkMessage.addAndGet(1);
239+
return checkMessage;
240+
}
241+
}).when(listener).resolveHalfMsg(any(MessageExt.class));
242+
243+
queueTransactionMsgService.check(timeOut, checkMax, listener);
244+
245+
// resolveHalfMsg should be called once since putBack succeeded on retry
246+
assertThat(checkMessage.get()).isEqualTo(1);
247+
}
248+
182249
private PullResult createDiscardPullResult(String topic, long queueOffset, String body, int size) {
183250
PullResult result = createPullResult(topic, queueOffset, body, size);
184251
List<MessageExt> msgs = result.getMsgFoundList();

0 commit comments

Comments
 (0)