Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.rocketmq.broker.longpolling;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.KeyBuilder;
Expand Down Expand Up @@ -52,21 +54,27 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private final Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

private long lastCleanTime = 0;

private final AtomicLong totalPollingNum = new AtomicLong(0);
private final boolean notifyLast;

public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, boolean notifyLast) {
public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor,
boolean notifyLast) {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.topicCidMap = Caffeine.newBuilder()
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L)
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS)
.build();

this.pollingMap = Caffeine.newBuilder()
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize())
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(), TimeUnit.SECONDS)
.build();
this.notifyLast = notifyLast;
}

Expand All @@ -85,11 +93,11 @@ public void run() {
try {
this.waitForRunning(20);
i++;
if (pollingMap.isEmpty()) {
if (pollingMap.estimatedSize() == 0) {
continue;
}
long tmpTotalPollingNum = 0;
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.asMap().entrySet()) {
String key = entry.getKey();
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
if (popQ == null) {
Expand Down Expand Up @@ -126,7 +134,7 @@ public void run() {

if (i >= 100) {
POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}",
pollingMap.size(), tmpTotalPollingNum, totalPollingNum.get(),
pollingMap.estimatedSize(), tmpTotalPollingNum, totalPollingNum.get(),
Math.abs(totalPollingNum.get() - tmpTotalPollingNum));
totalPollingNum.set(tmpTotalPollingNum);
i = 0;
Expand All @@ -142,7 +150,7 @@ public void run() {
}
// clean all;
try {
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.asMap().entrySet()) {
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
PopRequest first;
while ((first = popQ.pollFirst()) != null) {
Expand Down Expand Up @@ -170,7 +178,7 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu

public void notifyMessageArriving(final String topic, final int queueId, long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
ConcurrentHashMap<String, Byte> cids = topicCidMap.getIfPresent(topic);
if (cids == null) {
return;
}
Expand All @@ -196,7 +204,7 @@ public boolean notifyMessageArriving(final String topic, final int queueId, fina

public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, boolean force,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties, CommandCallback callback) {
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.getIfPresent(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}
Expand Down Expand Up @@ -286,14 +294,7 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
if (cids == null) {
cids = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
if (old != null) {
cids = old;
}
}
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic(), key -> new ConcurrentHashMap<>());
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() + requestHeader.getPollTime();
final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter);
Expand All @@ -311,21 +312,13 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
}
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId());
ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key);
if (queue == null) {
queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR);
ConcurrentSkipListSet<PopRequest> old = pollingMap.putIfAbsent(key, queue);
if (old != null) {
queue = old;
}
} else {
// check size
int size = queue.size();
if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size);
return POLLING_FULL;
}
ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key, k -> new ConcurrentSkipListSet<>(PopRequest.COMPARATOR));
int size = queue.size();
if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", remotingCommand, size);
return POLLING_FULL;
}

if (queue.add(request)) {
remotingCommand.setSuspended(true);
totalPollingNum.incrementAndGet();
Expand All @@ -339,14 +332,18 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
}
}

public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
return pollingMap;
}

public Cache<String, ConcurrentHashMap<String, Byte>> getTopicCidMap() {
return topicCidMap;
}

private void cleanUnusedResource() {
try {
{
Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> topicCidMapIter = topicCidMap.entrySet().iterator();
Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> topicCidMapIter = topicCidMap.asMap().entrySet().iterator();
while (topicCidMapIter.hasNext()) {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
Expand All @@ -368,7 +365,7 @@ private void cleanUnusedResource() {
}

{
Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> pollingMapIter = pollingMap.entrySet().iterator();
Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> pollingMapIter = pollingMap.asMap().entrySet().iterator();
while (pollingMapIter.hasNext()) {
Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry = pollingMapIter.next();
if (entry.getKey() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
ConcurrentSkipListSet<PopRequest> queue = this.brokerController.getPopMessageProcessor().getPollingMap().get(key);
ConcurrentSkipListSet<PopRequest> queue = this.brokerController.getPopMessageProcessor().getPollingMap().getIfPresent(key);
if (queue != null) {
responseHeader.setPollingNum(queue.size());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.rocketmq.broker.processor;

import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.github.benmanes.caffeine.cache.Cache;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -173,7 +173,7 @@ public boolean rejectRequest() {
return false;
}

public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
return popLongPollingService.getPollingMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.broker.longpolling;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
Expand Down Expand Up @@ -102,14 +104,18 @@ public void testNotifyMessageArriving() {
public void testNotifyMessageArrivingValidRequest() throws Exception {
String cid = "CID_1";
int queueId = 0;
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(10).build();
Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterAccess(300, TimeUnit.SECONDS)
.build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
popLongPollingService = new PopLongPollingService(brokerController, processor, true);
ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap =
new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterAccess(300, TimeUnit.SECONDS)
.build();
Channel channel = mock(Channel.class);
when(channel.isActive()).thenReturn(true);
PopRequest popRequest = mock(PopRequest.class);
Expand Down Expand Up @@ -195,8 +201,10 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException {
when(requestHeader.getPollTime()).thenReturn(1000L);
when(requestHeader.getTopic()).thenReturn(defaultTopic);
when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(10).build();
Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterAccess(300, TimeUnit.SECONDS)
.build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public class BrokerConfig extends BrokerIdentity {

private int popPollingSize = 1024;
private int popPollingMapSize = 100000;

private int popPollingMapExpireTimeSeconds = 60 * 10;
// 20w cost 200M heap memory.
private long maxPopPollingSize = 100000;
private int reviveQueueNum = 8;
Expand Down Expand Up @@ -524,6 +526,14 @@ public void setPopPollingMapSize(int popPollingMapSize) {
this.popPollingMapSize = popPollingMapSize;
}

public int getPopPollingMapExpireTimeSeconds() {
return popPollingMapExpireTimeSeconds;
}

public void setPopPollingMapExpireTimeSeconds(int popPollingMapExpireTimeSeconds) {
this.popPollingMapExpireTimeSeconds = popPollingMapExpireTimeSeconds;
}

public long getReviveScanTime() {
return reviveScanTime;
}
Expand Down
Loading