Skip to content

Commit 32d68e7

Browse files
authored
[ISSUE #9581] Optimize the resource bloat of pollingMap and topicCidMap in LMQ scenarios (#9579)
* Optimize the resource bloat of pollingMap and topicCidMap in LMQ scenarios * Fix PopLongPollingServiceTest some tests can not pass
1 parent 2f89371 commit 32d68e7

5 files changed

Lines changed: 64 additions & 49 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.rocketmq.broker.longpolling;
1919

20-
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
20+
import com.github.benmanes.caffeine.cache.Cache;
21+
import com.github.benmanes.caffeine.cache.Caffeine;
2122
import io.netty.channel.ChannelHandlerContext;
2223
import java.util.ArrayList;
2324
import java.util.Iterator;
2425
import java.util.Map;
2526
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.ConcurrentSkipListSet;
28+
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicLong;
2830
import org.apache.rocketmq.broker.BrokerController;
2931
import org.apache.rocketmq.common.KeyBuilder;
@@ -52,21 +54,27 @@ public class PopLongPollingService extends ServiceThread {
5254
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
5355
private final BrokerController brokerController;
5456
private final NettyRequestProcessor processor;
55-
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
56-
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
57+
private final Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap;
58+
private final Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
5759
private long lastCleanTime = 0;
5860

5961
private final AtomicLong totalPollingNum = new AtomicLong(0);
6062
private final boolean notifyLast;
6163

62-
public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, boolean notifyLast) {
64+
public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor,
65+
boolean notifyLast) {
6366
this.brokerController = brokerController;
6467
this.processor = processor;
6568
// 100000 topic default, 100000 lru topic + cid + qid
66-
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
67-
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
68-
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
69-
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
69+
this.topicCidMap = Caffeine.newBuilder()
70+
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L)
71+
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS)
72+
.build();
73+
74+
this.pollingMap = Caffeine.newBuilder()
75+
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize())
76+
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS)
77+
.build();
7078
this.notifyLast = notifyLast;
7179
}
7280

@@ -85,11 +93,11 @@ public void run() {
8593
try {
8694
this.waitForRunning(20);
8795
i++;
88-
if (pollingMap.isEmpty()) {
96+
if (pollingMap.estimatedSize() == 0) {
8997
continue;
9098
}
9199
long tmpTotalPollingNum = 0;
92-
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
100+
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.asMap().entrySet()) {
93101
String key = entry.getKey();
94102
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
95103
if (popQ == null) {
@@ -126,7 +134,7 @@ public void run() {
126134

127135
if (i >= 100) {
128136
POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}",
129-
pollingMap.size(), tmpTotalPollingNum, totalPollingNum.get(),
137+
pollingMap.estimatedSize(), tmpTotalPollingNum, totalPollingNum.get(),
130138
Math.abs(totalPollingNum.get() - tmpTotalPollingNum));
131139
totalPollingNum.set(tmpTotalPollingNum);
132140
i = 0;
@@ -142,7 +150,7 @@ public void run() {
142150
}
143151
// clean all;
144152
try {
145-
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
153+
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.asMap().entrySet()) {
146154
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
147155
PopRequest first;
148156
while ((first = popQ.pollFirst()) != null) {
@@ -170,7 +178,7 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu
170178

171179
public void notifyMessageArriving(final String topic, final int queueId, long offset,
172180
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
173-
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
181+
ConcurrentHashMap<String, Byte> cids = topicCidMap.getIfPresent(topic);
174182
if (cids == null) {
175183
return;
176184
}
@@ -196,7 +204,7 @@ public boolean notifyMessageArriving(final String topic, final int queueId, fina
196204

197205
public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force,
198206
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties, CommandCallback callback) {
199-
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
207+
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.getIfPresent(KeyBuilder.buildPollingKey(topic, cid, queueId));
200208
if (remotingCommands == null || remotingCommands.isEmpty()) {
201209
return false;
202210
}
@@ -286,14 +294,7 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
286294
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
287295
return NOT_POLLING;
288296
}
289-
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
290-
if (cids == null) {
291-
cids = new ConcurrentHashMap<>();
292-
ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
293-
if (old != null) {
294-
cids = old;
295-
}
296-
}
297+
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic(), key -> new ConcurrentHashMap<>());
297298
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
298299
long expired = requestHeader.getBornTime() + requestHeader.getPollTime();
299300
final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter);
@@ -311,21 +312,13 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
311312
}
312313
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
313314
requestHeader.getQueueId());
314-
ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key);
315-
if (queue == null) {
316-
queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR);
317-
ConcurrentSkipListSet<PopRequest> old = pollingMap.putIfAbsent(key, queue);
318-
if (old != null) {
319-
queue = old;
320-
}
321-
} else {
322-
// check size
323-
int size = queue.size();
324-
if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
325-
POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size);
326-
return POLLING_FULL;
327-
}
315+
ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key, k -> new ConcurrentSkipListSet<>(PopRequest.COMPARATOR));
316+
int size = queue.size();
317+
if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
318+
POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size);
319+
return POLLING_FULL;
328320
}
321+
329322
if (queue.add(request)) {
330323
remotingCommand.setSuspended(true);
331324
totalPollingNum.incrementAndGet();
@@ -339,14 +332,18 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
339332
}
340333
}
341334

342-
public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
335+
public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
343336
return pollingMap;
344337
}
345338

339+
public Cache<String, ConcurrentHashMap<String, Byte>> getTopicCidMap() {
340+
return topicCidMap;
341+
}
342+
346343
private void cleanUnusedResource() {
347344
try {
348345
{
349-
Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> topicCidMapIter = topicCidMap.entrySet().iterator();
346+
Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> topicCidMapIter = topicCidMap.asMap().entrySet().iterator();
350347
while (topicCidMapIter.hasNext()) {
351348
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
352349
String topic = entry.getKey();
@@ -368,7 +365,7 @@ private void cleanUnusedResource() {
368365
}
369366

370367
{
371-
Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> pollingMapIter = pollingMap.entrySet().iterator();
368+
Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> pollingMapIter = pollingMap.asMap().entrySet().iterator();
372369
while (pollingMapIter.hasNext()) {
373370
Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry = pollingMapIter.next();
374371
if (entry.getKey() == null) {

broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
106106
return response;
107107
}
108108
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
109-
ConcurrentSkipListSet<PopRequest> queue = this.brokerController.getPopMessageProcessor().getPollingMap().get(key);
109+
ConcurrentSkipListSet<PopRequest> queue = this.brokerController.getPopMessageProcessor().getPollingMap().getIfPresent(key);
110110
if (queue != null) {
111111
responseHeader.setPollingNum(queue.size());
112112
} else {

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.rocketmq.broker.processor;
1818

1919
import com.alibaba.fastjson.JSON;
20-
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
20+
import com.github.benmanes.caffeine.cache.Cache;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
2323
import io.netty.channel.ChannelHandlerContext;
@@ -173,7 +173,7 @@ public boolean rejectRequest() {
173173
return false;
174174
}
175175

176-
public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
176+
public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
177177
return popLongPollingService.getPollingMap();
178178
}
179179

broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*/
1717
package org.apache.rocketmq.broker.longpolling;
1818

19-
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
19+
import com.github.benmanes.caffeine.cache.Cache;
20+
import com.github.benmanes.caffeine.cache.Caffeine;
2021
import io.netty.channel.Channel;
2122
import io.netty.channel.ChannelHandlerContext;
23+
import java.util.concurrent.TimeUnit;
2224
import org.apache.commons.lang3.reflect.FieldUtils;
2325
import org.apache.rocketmq.broker.BrokerController;
2426
import org.apache.rocketmq.common.BrokerConfig;
@@ -102,14 +104,18 @@ public void testNotifyMessageArriving() {
102104
public void testNotifyMessageArrivingValidRequest() throws Exception {
103105
String cid = "CID_1";
104106
int queueId = 0;
105-
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
106-
.maximumWeightedCapacity(10).build();
107+
Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = Caffeine.newBuilder()
108+
.maximumSize(10)
109+
.expireAfterAccess(300, TimeUnit.SECONDS)
110+
.build();
107111
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
108112
cids.put(cid, (byte) 1);
109113
topicCidMap.put(defaultTopic, cids);
110114
popLongPollingService = new PopLongPollingService(brokerController, processor, true);
111-
ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap =
112-
new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
115+
Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap = Caffeine.newBuilder()
116+
.maximumSize(10)
117+
.expireAfterAccess(300, TimeUnit.SECONDS)
118+
.build();
113119
Channel channel = mock(Channel.class);
114120
when(channel.isActive()).thenReturn(true);
115121
PopRequest popRequest = mock(PopRequest.class);
@@ -195,8 +201,10 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException {
195201
when(requestHeader.getPollTime()).thenReturn(1000L);
196202
when(requestHeader.getTopic()).thenReturn(defaultTopic);
197203
when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
198-
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
199-
.maximumWeightedCapacity(10).build();
204+
Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = Caffeine.newBuilder()
205+
.maximumSize(10)
206+
.expireAfterAccess(300, TimeUnit.SECONDS)
207+
.build();
200208
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
201209
cids.put(cid, (byte) 1);
202210
topicCidMap.put(defaultTopic, cids);

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ public class BrokerConfig extends BrokerIdentity {
214214

215215
private int popPollingSize = 1024;
216216
private int popPollingMapSize = 100000;
217+
218+
private int popPollingMapExpireTimeSeconds = 60 * 10;
217219
// 20w cost 200M heap memory.
218220
private long maxPopPollingSize = 100000;
219221
private int reviveQueueNum = 8;
@@ -533,6 +535,14 @@ public void setPopPollingMapSize(int popPollingMapSize) {
533535
this.popPollingMapSize = popPollingMapSize;
534536
}
535537

538+
public int getPopPollingMapExpireTimeSeconds() {
539+
return popPollingMapExpireTimeSeconds;
540+
}
541+
542+
public void setPopPollingMapExpireTimeSeconds(int popPollingMapExpireTimeSeconds) {
543+
this.popPollingMapExpireTimeSeconds = popPollingMapExpireTimeSeconds;
544+
}
545+
536546
public long getReviveScanTime() {
537547
return reviveScanTime;
538548
}

0 commit comments

Comments
 (0)