From 9d557b5f25183853eb2667ee0b878dd02767dcb8 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 29 Jul 2025 15:58:22 +0800 Subject: [PATCH 1/2] Optimize the resource bloat of pollingMap and topicCidMap in LMQ scenarios --- .../longpolling/PopLongPollingService.java | 75 +++++++++---------- .../processor/PollingInfoProcessor.java | 2 +- .../broker/processor/PopMessageProcessor.java | 4 +- .../apache/rocketmq/common/BrokerConfig.java | 10 +++ 4 files changed, 49 insertions(+), 42 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index e87a8e803fd..71520b82195 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -17,13 +17,15 @@ package org.apache.rocketmq.broker.longpolling; -import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.netty.channel.ChannelHandlerContext; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.KeyBuilder; @@ -52,21 +54,27 @@ public class PopLongPollingService extends ServiceThread { LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; private final NettyRequestProcessor processor; - private final ConcurrentLinkedHashMap> topicCidMap; - private final ConcurrentLinkedHashMap> pollingMap; + private final Cache> topicCidMap; + private final Cache> pollingMap; private long lastCleanTime = 0; private final AtomicLong totalPollingNum = new AtomicLong(0); private final boolean notifyLast; - public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, boolean notifyLast) { + public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, + boolean notifyLast) { this.brokerController = brokerController; this.processor = processor; // 100000 topic default, 100000 lru topic + cid + qid - this.topicCidMap = new ConcurrentLinkedHashMap.Builder>() - .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build(); - this.pollingMap = new ConcurrentLinkedHashMap.Builder>() - .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); + this.topicCidMap = Caffeine.newBuilder() + .maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L) + .expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS) + .build(); + + this.pollingMap = Caffeine.newBuilder() + .maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize()) + .expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS) + .build(); this.notifyLast = notifyLast; } @@ -85,11 +93,11 @@ public void run() { try { this.waitForRunning(20); i++; - if (pollingMap.isEmpty()) { + if (pollingMap.estimatedSize() == 0) { continue; } long tmpTotalPollingNum = 0; - for (Map.Entry> entry : pollingMap.entrySet()) { + for (Map.Entry> entry : pollingMap.asMap().entrySet()) { String key = entry.getKey(); ConcurrentSkipListSet popQ = entry.getValue(); if (popQ == null) { @@ -126,7 +134,7 @@ public void run() { if (i >= 100) { POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}", - pollingMap.size(), tmpTotalPollingNum, totalPollingNum.get(), + pollingMap.estimatedSize(), tmpTotalPollingNum, totalPollingNum.get(), Math.abs(totalPollingNum.get() - tmpTotalPollingNum)); totalPollingNum.set(tmpTotalPollingNum); i = 0; @@ -142,7 +150,7 @@ public void run() { } // clean all; try { - for (Map.Entry> entry : pollingMap.entrySet()) { + for (Map.Entry> entry : pollingMap.asMap().entrySet()) { ConcurrentSkipListSet popQ = entry.getValue(); PopRequest first; while ((first = popQ.pollFirst()) != null) { @@ -170,7 +178,7 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { - ConcurrentHashMap cids = topicCidMap.get(topic); + ConcurrentHashMap cids = topicCidMap.getIfPresent(topic); if (cids == null) { return; } @@ -196,7 +204,7 @@ public boolean notifyMessageArriving(final String topic, final int queueId, fina public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties, CommandCallback callback) { - ConcurrentSkipListSet remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId)); + ConcurrentSkipListSet remotingCommands = pollingMap.getIfPresent(KeyBuilder.buildPollingKey(topic, cid, queueId)); if (remotingCommands == null || remotingCommands.isEmpty()) { return false; } @@ -286,14 +294,7 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re if (requestHeader.getPollTime() <= 0 || this.isStopped()) { return NOT_POLLING; } - ConcurrentHashMap cids = topicCidMap.get(requestHeader.getTopic()); - if (cids == null) { - cids = new ConcurrentHashMap<>(); - ConcurrentHashMap old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids); - if (old != null) { - cids = old; - } - } + ConcurrentHashMap cids = topicCidMap.get(requestHeader.getTopic(), key -> new ConcurrentHashMap<>()); cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE); long expired = requestHeader.getBornTime() + requestHeader.getPollTime(); final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter); @@ -311,21 +312,13 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re } String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId()); - ConcurrentSkipListSet queue = pollingMap.get(key); - if (queue == null) { - queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR); - ConcurrentSkipListSet old = pollingMap.putIfAbsent(key, queue); - if (old != null) { - queue = old; - } - } else { - // check size - int size = queue.size(); - if (size > brokerController.getBrokerConfig().getPopPollingSize()) { - POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size); - return POLLING_FULL; - } + ConcurrentSkipListSet queue = pollingMap.get(key, k -> new ConcurrentSkipListSet<>(PopRequest.COMPARATOR)); + int size = queue.size(); + if (size > brokerController.getBrokerConfig().getPopPollingSize()) { + POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size); + return POLLING_FULL; } + if (queue.add(request)) { remotingCommand.setSuspended(true); totalPollingNum.incrementAndGet(); @@ -339,14 +332,18 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re } } - public ConcurrentLinkedHashMap> getPollingMap() { + public Cache> getPollingMap() { return pollingMap; } + public Cache> getTopicCidMap() { + return topicCidMap; + } + private void cleanUnusedResource() { try { { - Iterator>> topicCidMapIter = topicCidMap.entrySet().iterator(); + Iterator>> topicCidMapIter = topicCidMap.asMap().entrySet().iterator(); while (topicCidMapIter.hasNext()) { Map.Entry> entry = topicCidMapIter.next(); String topic = entry.getKey(); @@ -368,7 +365,7 @@ private void cleanUnusedResource() { } { - Iterator>> pollingMapIter = pollingMap.entrySet().iterator(); + Iterator>> pollingMapIter = pollingMap.asMap().entrySet().iterator(); while (pollingMapIter.hasNext()) { Map.Entry> entry = pollingMapIter.next(); if (entry.getKey() == null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java index f7baac144e6..c114f4d4c3d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java @@ -106,7 +106,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return response; } String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId()); - ConcurrentSkipListSet queue = this.brokerController.getPopMessageProcessor().getPollingMap().get(key); + ConcurrentSkipListSet queue = this.brokerController.getPopMessageProcessor().getPollingMap().getIfPresent(key); if (queue != null) { responseHeader.setPollingNum(queue.size()); } else { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index d73acc84df6..0d44e3a043f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.broker.processor; import com.alibaba.fastjson.JSON; -import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.github.benmanes.caffeine.cache.Cache; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -173,7 +173,7 @@ public boolean rejectRequest() { return false; } - public ConcurrentLinkedHashMap> getPollingMap() { + public Cache> getPollingMap() { return popLongPollingService.getPollingMap(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 77f49554a5c..19d9e274dc3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -214,6 +214,8 @@ public class BrokerConfig extends BrokerIdentity { private int popPollingSize = 1024; private int popPollingMapSize = 100000; + + private int popPollingMapExpireTimeSeconds = 60 * 10; // 20w cost 200M heap memory. private long maxPopPollingSize = 100000; private int reviveQueueNum = 8; @@ -524,6 +526,14 @@ public void setPopPollingMapSize(int popPollingMapSize) { this.popPollingMapSize = popPollingMapSize; } + public int getPopPollingMapExpireTimeSeconds() { + return popPollingMapExpireTimeSeconds; + } + + public void setPopPollingMapExpireTimeSeconds(int popPollingMapExpireTimeSeconds) { + this.popPollingMapExpireTimeSeconds = popPollingMapExpireTimeSeconds; + } + public long getReviveScanTime() { return reviveScanTime; } From bd8074e6fdf97de3003fec224ca7d68ff976fd59 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 29 Jul 2025 20:04:34 +0800 Subject: [PATCH 2/2] Fix PopLongPollingServiceTest some tests can not pass --- .../PopLongPollingServiceTest.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java index 003bf09842a..3547687a6de 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java @@ -16,9 +16,11 @@ */ package org.apache.rocketmq.broker.longpolling; -import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; @@ -102,14 +104,18 @@ public void testNotifyMessageArriving() { public void testNotifyMessageArrivingValidRequest() throws Exception { String cid = "CID_1"; int queueId = 0; - ConcurrentLinkedHashMap> topicCidMap = new ConcurrentLinkedHashMap.Builder>() - .maximumWeightedCapacity(10).build(); + Cache> topicCidMap = Caffeine.newBuilder() + .maximumSize(10) + .expireAfterAccess(300, TimeUnit.SECONDS) + .build(); ConcurrentHashMap cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids); popLongPollingService = new PopLongPollingService(brokerController, processor, true); - ConcurrentLinkedHashMap> pollingMap = - new ConcurrentLinkedHashMap.Builder>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); + Cache> pollingMap = Caffeine.newBuilder() + .maximumSize(10) + .expireAfterAccess(300, TimeUnit.SECONDS) + .build(); Channel channel = mock(Channel.class); when(channel.isActive()).thenReturn(true); PopRequest popRequest = mock(PopRequest.class); @@ -195,8 +201,10 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException { when(requestHeader.getPollTime()).thenReturn(1000L); when(requestHeader.getTopic()).thenReturn(defaultTopic); when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup"); - ConcurrentLinkedHashMap> topicCidMap = new ConcurrentLinkedHashMap.Builder>() - .maximumWeightedCapacity(10).build(); + Cache> topicCidMap = Caffeine.newBuilder() + .maximumSize(10) + .expireAfterAccess(300, TimeUnit.SECONDS) + .build(); ConcurrentHashMap cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids);