Skip to content

Commit 5519490

Browse files
committed
rebase onto develop
Change-Id: I193c1ba9a22d604729bbb7909477e0e4d7ad2a28
1 parent 04d7fd4 commit 5519490

File tree

23 files changed

+114
-95
lines changed

23 files changed

+114
-95
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import com.alibaba.fastjson2.JSONWriter;
2121
import java.nio.file.Path;
2222
import java.nio.file.Paths;

broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void deleteLmq(String parentTopic, String lmqName) {
183183
String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
184184
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
185185
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); // no iteration
186-
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getTable().remove(topicAtGroup);
186+
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group);
187187
});
188188
brokerController.getMessageStore().deleteTopics(Sets.newHashSet(lmqName));
189189
boolean sharding = brokerName.equals(liteSharding.shardingByLmqName(parentTopic, lmqName));

broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.commons.collections.CollectionUtils;
2424
import org.apache.rocketmq.broker.BrokerController;
2525
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
26-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
26+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
2727
import org.apache.rocketmq.common.ServiceThread;
2828
import org.apache.rocketmq.common.constant.LoggerName;
2929
import org.apache.rocketmq.common.entity.ClientGroup;
@@ -498,7 +498,7 @@ public void onUnregister(String clientId, String group, String lmqName, boolean
498498
LOGGER.info("unregister and reset offset. {}, {}, {}, {}", group, clientId, lmqName, consumerOffset);
499499
if (consumerOffset > 0) {
500500
consumerOffsetManager.assignResetOffset(lmqName, group, 0, 0);
501-
consumerOrderInfoManager.getTable().remove(lmqName + "@" + group); // may cause race condition.
501+
consumerOrderInfoManager.remove(lmqName, group);
502502
}
503503
}
504504
}

broker/src/main/java/org/apache/rocketmq/broker/offset/MemoryConsumerOrderInfoManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.offset;
1919

2020
import org.apache.rocketmq.broker.BrokerController;
21+
import org.apache.rocketmq.broker.pop.orderly.QueueLevelConsumerManager;
2122

2223
/**
2324
* Memory-based Consumer Order Information Manager for Lite Topics
@@ -30,7 +31,7 @@
3031
* <p>
3132
* We may make structural adjustments and optimizations to reduce overhead and memory footprint.
3233
*/
33-
public class MemoryConsumerOrderInfoManager extends ConsumerOrderInfoManager {
34+
public class MemoryConsumerOrderInfoManager extends QueueLevelConsumerManager {
3435

3536
public MemoryConsumerOrderInfoManager(BrokerController brokerController) {
3637
super(brokerController);

broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ void update(String attemptId, boolean isRetry, String topic, String group, int q
6868
*/
6969
boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime);
7070

71+
/**
72+
* Remove the specified topic and group
73+
* Usually called during topic deletion
74+
*
75+
* @param topic Topic name
76+
* @param group Consumer group name
77+
*/
78+
void remove(String topic, String group);
79+
80+
/**
81+
* Get order info count
82+
*/
83+
int getOrderInfoCount();
84+
7185
/**
7286
* Commit message and calculate next consumption offset
7387
* Called when consumer ACKs messages
@@ -137,6 +151,7 @@ void updateNextVisibleTime(String topic, String group, int queueId, long queueOf
137151
* Get available message result
138152
* Used to retrieve messages from cache
139153
*/
140-
CompletableFuture<GetMessageResult> getAvailableMessageResult(String attemptId, long popTime, long invisibleTime, String groupId,
154+
CompletableFuture<GetMessageResult> getAvailableMessageResult(String attemptId, long popTime, long invisibleTime,
155+
String groupId,
141156
String topicId, int queueId, int batchSize, StringBuilder orderCountInfoBuilder);
142157
}

broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,16 @@ public void clearBlock(String topic, String group, int queueId) {
179179
});
180180
}
181181

182+
@Override
183+
public void remove(String topic, String group) {
184+
table.remove(buildKey(topic, group));
185+
}
186+
187+
@Override
188+
public int getOrderInfoCount() {
189+
return table.size();
190+
}
191+
182192
@Override
183193
public OrderedConsumptionLevel getOrderedConsumptionLevel() {
184194
return OrderedConsumptionLevel.QUEUE;
@@ -383,7 +393,7 @@ public CompletableFuture<GetMessageResult> getAvailableMessageResult(String atte
383393
}
384394

385395
@VisibleForTesting
386-
QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
396+
protected QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
387397
return queueLevelConsumerOrderInfoLockManager;
388398
}
389399

broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected RemotingCommand getBrokerLiteInfo(ChannelHandlerContext ctx,
105105
body.setMaxLmqNum(brokerController.getMessageStoreConfig().getMaxLmqConsumeQueueNum());
106106
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
107107
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
108-
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getTable().size());
108+
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getOrderInfoCount());
109109
body.setCqTableSize(brokerController.getMessageStore().getQueueStore().getConsumeQueueTable().size());
110110
body.setOffsetTableSize(brokerController.getConsumerOffsetManager().getOffsetTable().size());
111111
body.setEventMapSize(brokerController.getLiteEventDispatcher().getEventMapSize());

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.rocketmq.broker.longpolling.PollingResult;
2626
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2727
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
28-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
2928
import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager;
3029
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
30+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
3131
import org.apache.rocketmq.common.KeyBuilder;
3232
import org.apache.rocketmq.common.MixAll;
3333
import org.apache.rocketmq.common.Pair;
@@ -341,7 +341,7 @@ public Pair<StringBuilder, GetMessageResult> handleGetMessageResult(GetMessageRe
341341
StringBuilder orderCountInfo = new StringBuilder();
342342
if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) {
343343
consumerOrderInfoManager.update(attemptId, false, lmqName, group, 0,
344-
popTime, invisibleTime, result.getMessageQueueOffset(), orderCountInfo);
344+
popTime, invisibleTime, result.getMessageQueueOffset(), orderCountInfo, null);
345345
recordPopLiteMetrics(result, parentTopic, group);
346346
orderCountInfo = transformOrderCountInfo(orderCountInfo, result.getMessageCount());
347347
}

broker/src/test/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManagerTest.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import org.apache.rocketmq.broker.BrokerController;
2626
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
27-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
27+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
2828
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
2929
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
3030
import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -86,8 +86,6 @@ public class AbstractLiteLifecycleManagerTest {
8686
private final TopicConfig topicConfig = new TopicConfig(PARENT_TOPIC, 1, 1);
8787
private final SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
8888
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>();
89-
private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumerOrderInfoManager.OrderInfo>>
90-
orderInfoTable = new ConcurrentHashMap<>();
9189

9290
@Before
9391
public void setUp() {
@@ -115,7 +113,7 @@ public void setUp() {
115113
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(groupTable);
116114

117115
when(consumerOffsetManager.getOffsetTable()).thenReturn(offsetTable);
118-
when(consumerOrderInfoManager.getTable()).thenReturn(orderInfoTable);
116+
when(consumerOffsetManager.getPullOffsetTable()).thenReturn(offsetTable);
119117

120118
TestLiteLifecycleManager testObject = new TestLiteLifecycleManager(brokerController, liteSharding);
121119
lifecycleManager = Mockito.spy(testObject);
@@ -127,7 +125,6 @@ public void reset() {
127125
topicConfig.getAttributes().clear();
128126
groupConfig.getAttributes().clear();
129127
offsetTable.clear();
130-
orderInfoTable.clear();
131128
}
132129

133130
@Test
@@ -201,29 +198,24 @@ public void testDeleteLmq() {
201198
String removeKey = EXIST_LMQ_NAME + TOPIC_GROUP_SEPARATOR + GROUP;
202199
offsetTable.put(otherKey, new ConcurrentHashMap<>());
203200
offsetTable.put(removeKey, new ConcurrentHashMap<>());
204-
orderInfoTable.put(otherKey, new ConcurrentHashMap<>());
205-
orderInfoTable.put(removeKey, new ConcurrentHashMap<>());
206201

207202
// sharding to this broker
208203
when(liteSharding.shardingByLmqName(PARENT_TOPIC, EXIST_LMQ_NAME)).thenReturn(brokerConfig.getBrokerName());
209204
lifecycleManager.deleteLmq(PARENT_TOPIC, EXIST_LMQ_NAME);
210205

211206
Assert.assertTrue(offsetTable.containsKey(otherKey));
212207
Assert.assertFalse(offsetTable.containsKey(removeKey));
213-
Assert.assertTrue(orderInfoTable.containsKey(otherKey));
214-
Assert.assertFalse(orderInfoTable.containsKey(removeKey));
215208
verify(consumerOffsetManager).removeConsumerOffset(removeKey);
216209
verify(messageStore).deleteTopics(Collections.singleton(EXIST_LMQ_NAME));
217210
verify(liteSubscriptionRegistry).cleanSubscription(EXIST_LMQ_NAME, false);
211+
verify(consumerOrderInfoManager, times(1)).remove(EXIST_LMQ_NAME, GROUP);
218212

219213
// not sharding to this broker
220214
when(liteSharding.shardingByLmqName(PARENT_TOPIC, EXIST_LMQ_NAME)).thenReturn("otherBrokerName");
221215
lifecycleManager.deleteLmq(PARENT_TOPIC, EXIST_LMQ_NAME);
222216

223217
Assert.assertTrue(offsetTable.containsKey(otherKey));
224218
Assert.assertFalse(offsetTable.containsKey(removeKey));
225-
Assert.assertTrue(orderInfoTable.containsKey(otherKey));
226-
Assert.assertFalse(orderInfoTable.containsKey(removeKey));
227219
verify(consumerOffsetManager, times(2)).removeConsumerOffset(removeKey);
228220
verify(messageStore, times(2)).deleteTopics(Collections.singleton(EXIST_LMQ_NAME));
229221
verify(liteSubscriptionRegistry, times(2)).cleanSubscription(EXIST_LMQ_NAME, false);

broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.rocketmq.broker.BrokerController;
2323
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
2424
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
25-
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
25+
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
2626
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
2727
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
2828
import org.apache.rocketmq.common.BrokerConfig;

0 commit comments

Comments
 (0)