Skip to content

Commit 3b78ab0

Browse files
authored
[ISSUE #9632] Fix Pop Long-polling Not Awakened for V1 Retry Messages (#9828)
1 parent 7a1a950 commit 3b78ab0

5 files changed

Lines changed: 61 additions & 13 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,22 @@
2727
import java.util.concurrent.ConcurrentSkipListSet;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicLong;
30+
import org.apache.commons.lang3.StringUtils;
3031
import org.apache.rocketmq.broker.BrokerController;
3132
import org.apache.rocketmq.common.KeyBuilder;
33+
import org.apache.rocketmq.common.MixAll;
3234
import org.apache.rocketmq.common.PopAckConstants;
3335
import org.apache.rocketmq.common.ServiceThread;
3436
import org.apache.rocketmq.common.constant.LoggerName;
37+
import org.apache.rocketmq.common.message.MessageConst;
3538
import org.apache.rocketmq.logging.org.slf4j.Logger;
3639
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3740
import org.apache.rocketmq.remoting.CommandCallback;
41+
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
3842
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
3943
import org.apache.rocketmq.remoting.netty.RequestTask;
44+
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
4045
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
41-
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
4246
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4347
import org.apache.rocketmq.store.ConsumeQueueExt;
4448
import org.apache.rocketmq.store.MessageFilter;
@@ -167,13 +171,31 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu
167171

168172
public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset,
169173
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
170-
String notifyTopic;
171-
if (KeyBuilder.isPopRetryTopicV2(topic)) {
172-
notifyTopic = KeyBuilder.parseNormalTopic(topic);
174+
if (NamespaceUtil.isRetryTopic(topic)) {
175+
notifyMessageArrivingFromRetry(topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
173176
} else {
174-
notifyTopic = topic;
177+
notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
178+
}
179+
}
180+
181+
private void notifyMessageArrivingFromRetry(String topic, int queueId, Long tagsCode, long msgStoreTime, byte[] filterBitMap,
182+
Map<String, String> properties) {
183+
String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
184+
String originGroup = properties.get(MessageConst.PROPERTY_ORIGIN_GROUP);
185+
// In the case of pop consumption, there is no long polling hanging on the retry topic, so the wake-up is skipped.
186+
if (StringUtils.isBlank(originGroup)) {
187+
return;
188+
}
189+
// %RETRY%GROUP is used for pull mode, so the wake-up is skipped.
190+
int originTopicStartIndex = prefix.length() + originGroup.length() + 1;
191+
if (topic.length() <= originTopicStartIndex) {
192+
return;
193+
}
194+
String originTopic = topic.substring(originTopicStartIndex);
195+
if (queueId >= 0) {
196+
notifyMessageArriving(originTopic, -1, originGroup, true, tagsCode, msgStoreTime, filterBitMap, properties);
175197
}
176-
notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
198+
notifyMessageArriving(originTopic, queueId, originGroup, true, tagsCode, msgStoreTime, filterBitMap, properties);
177199
}
178200

179201
public void notifyMessageArriving(final String topic, final int queueId, long offset,

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
624624
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
625625
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(record.getPopTime()));
626626
}
627+
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, record.getGroupId());
627628
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
628629

629630
PutMessageResult putMessageResult =

broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,16 @@
3131
import org.apache.commons.lang3.StringUtils;
3232
import org.apache.commons.lang3.tuple.Triple;
3333
import org.apache.rocketmq.broker.BrokerController;
34-
3534
import org.apache.rocketmq.client.consumer.PullResult;
3635
import org.apache.rocketmq.client.consumer.PullStatus;
3736
import org.apache.rocketmq.common.KeyBuilder;
3837
import org.apache.rocketmq.common.MixAll;
39-
import org.apache.rocketmq.common.UtilAll;
4038
import org.apache.rocketmq.common.Pair;
4139
import org.apache.rocketmq.common.PopAckConstants;
4240
import org.apache.rocketmq.common.ServiceThread;
4341
import org.apache.rocketmq.common.TopicConfig;
4442
import org.apache.rocketmq.common.TopicFilterType;
43+
import org.apache.rocketmq.common.UtilAll;
4544
import org.apache.rocketmq.common.constant.LoggerName;
4645
import org.apache.rocketmq.common.message.MessageAccessor;
4746
import org.apache.rocketmq.common.message.MessageConst;
@@ -128,6 +127,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
128127
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
129128
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
130129
}
130+
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
131131
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
132132
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
133133
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020
import com.github.benmanes.caffeine.cache.Caffeine;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelHandlerContext;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ConcurrentSkipListSet;
27+
import java.util.concurrent.ExecutorService;
2328
import java.util.concurrent.TimeUnit;
2429
import org.apache.commons.lang3.reflect.FieldUtils;
2530
import org.apache.rocketmq.broker.BrokerController;
2631
import org.apache.rocketmq.common.BrokerConfig;
2732
import org.apache.rocketmq.common.KeyBuilder;
33+
import org.apache.rocketmq.common.MixAll;
34+
import org.apache.rocketmq.common.message.MessageConst;
2835
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
2936
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3037
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,11 +43,6 @@
3643
import org.mockito.Mock;
3744
import org.mockito.junit.MockitoJUnitRunner;
3845

39-
import java.util.Map;
40-
import java.util.concurrent.ConcurrentHashMap;
41-
import java.util.concurrent.ConcurrentSkipListSet;
42-
import java.util.concurrent.ExecutorService;
43-
4446
import static org.junit.Assert.assertEquals;
4547
import static org.junit.Assert.assertFalse;
4648
import static org.junit.Assert.assertTrue;
@@ -87,6 +89,27 @@ public void testNotifyMessageArrivingWithRetryTopic() {
8789
verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null);
8890
}
8991

92+
@Test
93+
public void testNotifyMessageArrivingFromRetry() {
94+
int queueId = -1;
95+
String group = "group";
96+
String pullRetryTopic = MixAll.getRetryTopic(group);
97+
String popRetryTopicV1 = KeyBuilder.buildPopRetryTopic(defaultTopic, group, false);
98+
String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(defaultTopic, group, true);
99+
100+
Map<String, String> properties = new HashMap<>();
101+
properties.putIfAbsent(MessageConst.PROPERTY_ORIGIN_GROUP, group);
102+
// pull retry
103+
popLongPollingService.notifyMessageArrivingWithRetryTopic(pullRetryTopic, queueId, queueId, -1L, 0L, null, properties);
104+
verify(popLongPollingService, times(0)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
105+
// pop retry v1
106+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV1, queueId, queueId, -1L, 0L, null, properties);
107+
verify(popLongPollingService, times(1)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
108+
// pop retry v2
109+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV2, queueId, queueId, -1L, 0L, null, properties);
110+
verify(popLongPollingService, times(2)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
111+
}
112+
90113
@Test
91114
public void testNotifyMessageArriving() {
92115
int queueId = 0;

common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class MessageConst {
2424
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
2525
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
2626
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
27+
public static final String PROPERTY_ORIGIN_GROUP = "ORIGIN_GROUP";
2728
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
2829
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
2930
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
@@ -113,6 +114,7 @@ public class MessageConst {
113114
STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
114115
STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
115116
STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
117+
STRING_HASH_SET.add(PROPERTY_ORIGIN_GROUP);
116118
STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
117119
STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
118120
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);

0 commit comments

Comments
 (0)