Skip to content

Commit e78e2a7

Browse files
author
RongtongJin
committed
Merge remote-tracking branch 'origin/develop' into dev-1111
Change-Id: I21cc78101a31e80bcf9a98165c267f1568444f1b
2 parents db0cb96 + 2017630 commit e78e2a7

18 files changed

Lines changed: 253 additions & 71 deletions

File tree

BUILDING

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Build Instructions for Apache RocketMQ
44

55
(1) Prerequisites
66

7-
JDK 1.7+ is required in order to compile and run RocketMQ.
7+
JDK 1.8+ is required in order to compile and run RocketMQ.
88

99
RocketMQ utilizes Maven as a distribution management and packaging tool. Version 3.0.3 or later is required.
1010
Maven installation and configuration instructions can be found here:

broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -775,29 +775,33 @@ private void stopCheckSyncStateSet() {
775775
}
776776

777777
private void scanAvailableControllerAddresses() {
778-
if (controllerAddresses == null) {
779-
LOGGER.warn("scanAvailableControllerAddresses addresses of controller is null!");
780-
return;
781-
}
778+
try {
779+
if (controllerAddresses == null) {
780+
LOGGER.warn("scanAvailableControllerAddresses addresses of controller is null!");
781+
return;
782+
}
782783

783-
for (String address : availableControllerAddresses.keySet()) {
784-
if (!controllerAddresses.contains(address)) {
785-
LOGGER.warn("scanAvailableControllerAddresses remove invalid address {}", address);
786-
availableControllerAddresses.remove(address);
784+
for (String address : availableControllerAddresses.keySet()) {
785+
if (!controllerAddresses.contains(address)) {
786+
LOGGER.warn("scanAvailableControllerAddresses remove invalid address {}", address);
787+
availableControllerAddresses.remove(address);
788+
}
787789
}
788-
}
789790

790-
for (String address : controllerAddresses) {
791-
scanExecutor.submit(() -> {
792-
if (brokerOuterAPI.checkAddressReachable(address)) {
793-
availableControllerAddresses.putIfAbsent(address, true);
794-
} else {
795-
Boolean value = availableControllerAddresses.remove(address);
796-
if (value != null) {
797-
LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address);
791+
for (String address : controllerAddresses) {
792+
scanExecutor.submit(() -> {
793+
if (brokerOuterAPI.checkAddressReachable(address)) {
794+
availableControllerAddresses.putIfAbsent(address, true);
795+
} else {
796+
Boolean value = availableControllerAddresses.remove(address);
797+
if (value != null) {
798+
LOGGER.warn("scanAvailableControllerAddresses remove unconnected address {}", address);
799+
}
798800
}
799-
}
800-
});
801+
});
802+
}
803+
} catch (final Throwable t) {
804+
LOGGER.error("scanAvailableControllerAddresses unexpected exception", t);
801805
}
802806
}
803807

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,22 @@
2727
import java.util.concurrent.ConcurrentSkipListSet;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicLong;
30+
import org.apache.commons.lang3.StringUtils;
3031
import org.apache.rocketmq.broker.BrokerController;
3132
import org.apache.rocketmq.common.KeyBuilder;
33+
import org.apache.rocketmq.common.MixAll;
3234
import org.apache.rocketmq.common.PopAckConstants;
3335
import org.apache.rocketmq.common.ServiceThread;
3436
import org.apache.rocketmq.common.constant.LoggerName;
37+
import org.apache.rocketmq.common.message.MessageConst;
3538
import org.apache.rocketmq.logging.org.slf4j.Logger;
3639
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3740
import org.apache.rocketmq.remoting.CommandCallback;
41+
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
3842
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
3943
import org.apache.rocketmq.remoting.netty.RequestTask;
44+
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
4045
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
41-
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
4246
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4347
import org.apache.rocketmq.store.ConsumeQueueExt;
4448
import org.apache.rocketmq.store.MessageFilter;
@@ -167,13 +171,31 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu
167171

168172
public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset,
169173
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
170-
String notifyTopic;
171-
if (KeyBuilder.isPopRetryTopicV2(topic)) {
172-
notifyTopic = KeyBuilder.parseNormalTopic(topic);
174+
if (NamespaceUtil.isRetryTopic(topic)) {
175+
notifyMessageArrivingFromRetry(topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
173176
} else {
174-
notifyTopic = topic;
177+
notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
178+
}
179+
}
180+
181+
private void notifyMessageArrivingFromRetry(String topic, int queueId, Long tagsCode, long msgStoreTime, byte[] filterBitMap,
182+
Map<String, String> properties) {
183+
String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX;
184+
String originGroup = properties.get(MessageConst.PROPERTY_ORIGIN_GROUP);
185+
// In the case of pop consumption, there is no long polling hanging on the retry topic, so the wake-up is skipped.
186+
if (StringUtils.isBlank(originGroup)) {
187+
return;
188+
}
189+
// %RETRY%GROUP is used for pull mode, so the wake-up is skipped.
190+
int originTopicStartIndex = prefix.length() + originGroup.length() + 1;
191+
if (topic.length() <= originTopicStartIndex) {
192+
return;
193+
}
194+
String originTopic = topic.substring(originTopicStartIndex);
195+
if (queueId >= 0) {
196+
notifyMessageArriving(originTopic, -1, originGroup, true, tagsCode, msgStoreTime, filterBitMap, properties);
175197
}
176-
notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
198+
notifyMessageArriving(originTopic, queueId, originGroup, true, tagsCode, msgStoreTime, filterBitMap, properties);
177199
}
178200

179201
public void notifyMessageArriving(final String topic, final int queueId, long offset,

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
624624
msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
625625
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(record.getPopTime()));
626626
}
627+
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, record.getGroupId());
627628
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
628629

629630
PutMessageResult putMessageResult =

broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,16 @@
3131
import org.apache.commons.lang3.StringUtils;
3232
import org.apache.commons.lang3.tuple.Triple;
3333
import org.apache.rocketmq.broker.BrokerController;
34-
3534
import org.apache.rocketmq.client.consumer.PullResult;
3635
import org.apache.rocketmq.client.consumer.PullStatus;
3736
import org.apache.rocketmq.common.KeyBuilder;
3837
import org.apache.rocketmq.common.MixAll;
39-
import org.apache.rocketmq.common.UtilAll;
4038
import org.apache.rocketmq.common.Pair;
4139
import org.apache.rocketmq.common.PopAckConstants;
4240
import org.apache.rocketmq.common.ServiceThread;
4341
import org.apache.rocketmq.common.TopicConfig;
4442
import org.apache.rocketmq.common.TopicFilterType;
43+
import org.apache.rocketmq.common.UtilAll;
4544
import org.apache.rocketmq.common.constant.LoggerName;
4645
import org.apache.rocketmq.common.message.MessageAccessor;
4746
import org.apache.rocketmq.common.message.MessageConst;
@@ -128,6 +127,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
128127
if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
129128
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
130129
}
130+
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
131131
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
132132
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
133133
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020
import com.github.benmanes.caffeine.cache.Caffeine;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelHandlerContext;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ConcurrentSkipListSet;
27+
import java.util.concurrent.ExecutorService;
2328
import java.util.concurrent.TimeUnit;
2429
import org.apache.commons.lang3.reflect.FieldUtils;
2530
import org.apache.rocketmq.broker.BrokerController;
2631
import org.apache.rocketmq.common.BrokerConfig;
2732
import org.apache.rocketmq.common.KeyBuilder;
33+
import org.apache.rocketmq.common.MixAll;
34+
import org.apache.rocketmq.common.message.MessageConst;
2835
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
2936
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3037
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,11 +43,6 @@
3643
import org.mockito.Mock;
3744
import org.mockito.junit.MockitoJUnitRunner;
3845

39-
import java.util.Map;
40-
import java.util.concurrent.ConcurrentHashMap;
41-
import java.util.concurrent.ConcurrentSkipListSet;
42-
import java.util.concurrent.ExecutorService;
43-
4446
import static org.junit.Assert.assertEquals;
4547
import static org.junit.Assert.assertFalse;
4648
import static org.junit.Assert.assertTrue;
@@ -87,6 +89,27 @@ public void testNotifyMessageArrivingWithRetryTopic() {
8789
verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null);
8890
}
8991

92+
@Test
93+
public void testNotifyMessageArrivingFromRetry() {
94+
int queueId = -1;
95+
String group = "group";
96+
String pullRetryTopic = MixAll.getRetryTopic(group);
97+
String popRetryTopicV1 = KeyBuilder.buildPopRetryTopic(defaultTopic, group, false);
98+
String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(defaultTopic, group, true);
99+
100+
Map<String, String> properties = new HashMap<>();
101+
properties.putIfAbsent(MessageConst.PROPERTY_ORIGIN_GROUP, group);
102+
// pull retry
103+
popLongPollingService.notifyMessageArrivingWithRetryTopic(pullRetryTopic, queueId, queueId, -1L, 0L, null, properties);
104+
verify(popLongPollingService, times(0)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
105+
// pop retry v1
106+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV1, queueId, queueId, -1L, 0L, null, properties);
107+
verify(popLongPollingService, times(1)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
108+
// pop retry v2
109+
popLongPollingService.notifyMessageArrivingWithRetryTopic(popRetryTopicV2, queueId, queueId, -1L, 0L, null, properties);
110+
verify(popLongPollingService, times(2)).notifyMessageArriving(defaultTopic, queueId, group, true, -1L, 0L, null, properties, null);
111+
}
112+
90113
@Test
91114
public void testNotifyMessageArriving() {
92115
int queueId = 0;

common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class MessageConst {
2424
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
2525
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
2626
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
27+
public static final String PROPERTY_ORIGIN_GROUP = "ORIGIN_GROUP";
2728
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
2829
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
2930
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
@@ -113,6 +114,7 @@ public class MessageConst {
113114
STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
114115
STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
115116
STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
117+
STRING_HASH_SET.add(PROPERTY_ORIGIN_GROUP);
116118
STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
117119
STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
118120
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);

store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ public boolean isWriteable() {
8585
return false;
8686
}
8787

88+
public boolean isStoreWriteable() {
89+
if ((this.flagBits & NOT_WRITEABLE_BIT) == 0) {
90+
return true;
91+
}
92+
93+
return false;
94+
}
95+
96+
8897
//for consume queue, just ignore the DISK_FULL_BIT
8998
public boolean isCQWriteable() {
9099
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT | LOGIC_DISK_FULL_BIT)) == 0) {
@@ -94,7 +103,7 @@ public boolean isCQWriteable() {
94103
return false;
95104
}
96105

97-
public boolean getAndMakeNotWriteable() {
106+
public boolean getAndMakeStoreNotWriteable() {
98107
boolean result = this.isWriteable();
99108
if (result) {
100109
this.flagBits |= NOT_WRITEABLE_BIT;

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ public boolean getAndMakeNotWriteable() {
592592
if (runningFlags == null) {
593593
return false;
594594
}
595-
return runningFlags.getAndMakeNotWriteable();
595+
return runningFlags.getAndMakeStoreNotWriteable();
596596
}
597597

598598
public boolean isWriteable() {

store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ public class TimerWheel {
4343
public final int slotsTotal;
4444
public final int precisionMs;
4545
private final String fileName;
46-
private MappedByteBuffer mappedByteBuffer;
46+
private final MappedByteBuffer mappedByteBuffer;
47+
private final RandomAccessFile randomAccessFile;
48+
private final FileChannel fileChannel;
4749
private final ByteBuffer byteBuffer;
4850
private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {
4951
@Override
@@ -69,7 +71,6 @@ public TimerWheel(String fileName, int slotsTotal, int precisionMs, long snapOff
6971
File file = new File(finalFileName);
7072
UtilAll.ensureDirOK(file.getParent());
7173

72-
RandomAccessFile randomAccessFile = null;
7374
try {
7475
randomAccessFile = new RandomAccessFile(finalFileName, "rw");
7576
if (file.exists() && randomAccessFile.length() != 0 &&
@@ -79,8 +80,13 @@ public TimerWheel(String fileName, int slotsTotal, int precisionMs, long snapOff
7980
}
8081
randomAccessFile.setLength(wheelLength);
8182
if (snapOffset < 0) {
82-
mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, wheelLength);
83+
fileChannel = randomAccessFile.getChannel();
84+
mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, wheelLength);
8385
assert wheelLength == mappedByteBuffer.remaining();
86+
} else {
87+
fileChannel = null;
88+
mappedByteBuffer = null;
89+
randomAccessFile.close();
8490
}
8591
this.byteBuffer = ByteBuffer.allocateDirect(wheelLength);
8692
this.byteBuffer.put(Files.readAllBytes(file.toPath()));
@@ -90,10 +96,6 @@ public TimerWheel(String fileName, int slotsTotal, int precisionMs, long snapOff
9096
} catch (IOException e) {
9197
log.error("map file " + finalFileName + " Failed. ", e);
9298
throw e;
93-
} finally {
94-
if (randomAccessFile != null) {
95-
randomAccessFile.close();
96-
}
9799
}
98100
}
99101

@@ -114,6 +116,12 @@ public void shutdown(boolean flush) {
114116
UtilAll.cleanBuffer(this.mappedByteBuffer);
115117
UtilAll.cleanBuffer(this.byteBuffer);
116118
localBuffer.remove();
119+
120+
try {
121+
this.fileChannel.close();
122+
} catch (Throwable t) {
123+
log.error("Shutdown error in timer wheel", t);
124+
}
117125
}
118126

119127
public void flush() {

0 commit comments

Comments
 (0)