Skip to content

Commit af5ebb1

Browse files
qianye1001RongtongJin
authored andcommitted
[ISSUE #9152] Broker getConsumeStats supports inputting multiple topics (#9153)
* [ISSUE #9152] The getConsumeStats supports inputting multiple topics
1 parent 13af9a0 commit af5ebb1

6 files changed

Lines changed: 213 additions & 27 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
19471947
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
19481948
try {
19491949
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
1950-
ConsumeStats consumeStats = new ConsumeStats();
1950+
List<String> topicListProvided = requestHeader.fetchTopicList();
1951+
String topicProvided = requestHeader.getTopic();
1952+
String group = requestHeader.getConsumerGroup();
19511953

1952-
Set<String> topics = new HashSet<>();
1953-
if (UtilAll.isBlank(requestHeader.getTopic())) {
1954-
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
1955-
} else {
1956-
topics.add(requestHeader.getTopic());
1957-
}
1954+
ConsumeStats consumeStats = new ConsumeStats();
1955+
Set<String> topicsForCollecting = getTopicsForCollectingConsumeStats(topicListProvided, topicProvided, group);
19581956

1959-
for (String topic : topics) {
1957+
for (String topic : topicsForCollecting) {
19601958
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
19611959
if (null == topicConfig) {
19621960
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
19631961
continue;
19641962
}
19651963

19661964
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
1967-
1968-
{
1969-
SubscriptionData findSubscriptionData =
1970-
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
1971-
1972-
if (null == findSubscriptionData
1973-
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
1974-
LOGGER.warn(
1975-
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
1976-
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
1977-
continue;
1978-
}
1979-
}
1980-
19811965
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
19821966
MessageQueue mq = new MessageQueue();
19831967
mq.setTopic(topic);
@@ -2038,6 +2022,38 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
20382022
return response;
20392023
}
20402024

2025+
private Set<String> getTopicsForCollectingConsumeStats(List<String> topicListProvided, String topicProvided,
2026+
String group) {
2027+
Set<String> topicsForCollecting = new HashSet<>();
2028+
if (!topicListProvided.isEmpty()) {
2029+
// if topic list is provided, only collect the topics in the list
2030+
// and ignore subscription check
2031+
topicsForCollecting.addAll(topicListProvided);
2032+
} else {
2033+
// In order to be compatible with the old logic,
2034+
// even if the topic has been provided here, the subscription will be checked.
2035+
if (UtilAll.isBlank(topicProvided)) {
2036+
topicsForCollecting.addAll(
2037+
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
2038+
} else {
2039+
topicsForCollecting.add(topicProvided);
2040+
}
2041+
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
2042+
Iterator<String> iterator = topicsForCollecting.iterator();
2043+
while (iterator.hasNext()) {
2044+
String topic = iterator.next();
2045+
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
2046+
if (findSubscriptionData == null && subscriptionCount > 0) {
2047+
LOGGER.warn(
2048+
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
2049+
topic, group);
2050+
iterator.remove();
2051+
}
2052+
}
2053+
}
2054+
return topicsForCollecting;
2055+
}
2056+
20412057
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
20422058
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
20432059

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
17481748
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
17491749
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
17501750
MQBrokerException {
1751-
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
1751+
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
1752+
}
1753+
1754+
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
1755+
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
1756+
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
17521757
}
17531758

17541759
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
1755-
final long timeoutMillis)
1760+
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
1761+
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
1762+
}
1763+
1764+
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
1765+
final List<String> topicList, final long timeoutMillis)
17561766
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
17571767
MQBrokerException {
17581768
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
17591769
requestHeader.setConsumerGroup(consumerGroup);
17601770
requestHeader.setTopic(topic);
1771+
requestHeader.updateTopicList(topicList);
17611772

17621773
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
17631774

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import org.apache.commons.lang3.StringUtils;
2222
import org.apache.rocketmq.common.action.Action;
2323
import org.apache.rocketmq.common.action.RocketMQAction;
24+
import org.apache.rocketmq.common.resource.ResourceType;
2425
import org.apache.rocketmq.remoting.CommandCustomHeader;
2526
import org.apache.rocketmq.remoting.annotation.CFNotNull;
2627
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
2728
import org.apache.rocketmq.remoting.protocol.RequestCode;
2829

29-
@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET)
30+
@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, resource = ResourceType.CLUSTER, action = Action.GET)
3031
public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader {
3132
private static final String CONFIG_TYPE_SEPARATOR = ";";
3233

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,62 @@
1717
package org.apache.rocketmq.remoting.protocol.header;
1818

1919
import com.google.common.base.MoreObjects;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import org.apache.commons.lang3.StringUtils;
2024
import org.apache.rocketmq.common.action.Action;
2125
import org.apache.rocketmq.common.action.RocketMQAction;
2226
import org.apache.rocketmq.common.resource.ResourceType;
2327
import org.apache.rocketmq.common.resource.RocketMQResource;
2428
import org.apache.rocketmq.remoting.annotation.CFNotNull;
2529
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
26-
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
2730
import org.apache.rocketmq.remoting.protocol.RequestCode;
31+
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
2832

2933
@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
3034
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
35+
private static final String TOPIC_NAME_SEPARATOR = ";";
36+
3137
@CFNotNull
3238
@RocketMQResource(ResourceType.GROUP)
3339
private String consumerGroup;
40+
3441
@RocketMQResource(ResourceType.TOPIC)
3542
private String topic;
3643

44+
// if topicList is provided, topic will be ignored
45+
@RocketMQResource(value = ResourceType.TOPIC, splitter = TOPIC_NAME_SEPARATOR)
46+
private String topicList;
47+
3748
@Override
3849
public void checkFields() throws RemotingCommandException {
3950
}
4051

52+
public List<String> fetchTopicList() {
53+
if (StringUtils.isBlank(topicList)) {
54+
return Collections.emptyList();
55+
}
56+
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
57+
}
58+
59+
public void updateTopicList(List<String> topicList) {
60+
if (topicList == null || topicList.isEmpty()) {
61+
return;
62+
}
63+
StringBuilder sb = new StringBuilder();
64+
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
65+
this.setTopicList(sb.toString());
66+
}
67+
68+
public String getTopicList() {
69+
return topicList;
70+
}
71+
72+
public void setTopicList(String topicList) {
73+
this.topicList = topicList;
74+
}
75+
4176
public String getConsumerGroup() {
4277
return consumerGroup;
4378
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.remoting.protocol.header;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
26+
import static org.junit.Assert.assertEquals;
27+
import static org.junit.Assert.assertNull;
28+
29+
public class GetConsumeStatsRequestHeaderTest {
30+
31+
private GetConsumeStatsRequestHeader header;
32+
33+
@Before
34+
public void setUp() {
35+
header = new GetConsumeStatsRequestHeader();
36+
}
37+
38+
@Test
39+
public void updateTopicList_NullTopicList_DoesNotUpdate() {
40+
header.updateTopicList(null);
41+
assertNull(header.getTopicList());
42+
}
43+
44+
@Test
45+
public void updateTopicList_EmptyTopicList_SetsEmptyString() {
46+
header.updateTopicList(Collections.emptyList());
47+
assertNull(header.getTopicList());
48+
}
49+
50+
@Test
51+
public void updateTopicList_SingleTopic_SetsSingleTopicString() {
52+
List<String> topicList = Collections.singletonList("TopicA");
53+
header.updateTopicList(topicList);
54+
assertEquals("TopicA;", header.getTopicList());
55+
}
56+
57+
@Test
58+
public void updateTopicList_MultipleTopics_SetsMultipleTopicsString() {
59+
List<String> topicList = Arrays.asList("TopicA", "TopicB", "TopicC");
60+
header.updateTopicList(topicList);
61+
assertEquals("TopicA;TopicB;TopicC;", header.getTopicList());
62+
}
63+
64+
@Test
65+
public void updateTopicList_RepeatedTopics_SetsRepeatedTopicsString() {
66+
List<String> topicList = Arrays.asList("TopicA", "TopicA", "TopicB");
67+
header.updateTopicList(topicList);
68+
assertEquals("TopicA;TopicA;TopicB;", header.getTopicList());
69+
}
70+
71+
@Test
72+
public void fetchTopicList_NullTopicList_ReturnsEmptyList() {
73+
header.setTopicList(null);
74+
List<String> topicList = header.fetchTopicList();
75+
assertEquals(Collections.emptyList(), topicList);
76+
77+
header.updateTopicList(new ArrayList<>());
78+
topicList = header.fetchTopicList();
79+
assertEquals(Collections.emptyList(), topicList);
80+
}
81+
82+
@Test
83+
public void fetchTopicList_EmptyTopicList_ReturnsEmptyList() {
84+
header.setTopicList("");
85+
List<String> topicList = header.fetchTopicList();
86+
assertEquals(Collections.emptyList(), topicList);
87+
}
88+
89+
@Test
90+
public void fetchTopicList_BlankTopicList_ReturnsEmptyList() {
91+
header.setTopicList(" ");
92+
List<String> topicList = header.fetchTopicList();
93+
assertEquals(Collections.emptyList(), topicList);
94+
}
95+
96+
@Test
97+
public void fetchTopicList_SingleTopic_ReturnsSingleTopicList() {
98+
header.setTopicList("TopicA");
99+
List<String> topicList = header.fetchTopicList();
100+
assertEquals(Collections.singletonList("TopicA"), topicList);
101+
}
102+
103+
@Test
104+
public void fetchTopicList_MultipleTopics_ReturnsTopicList() {
105+
header.setTopicList("TopicA;TopicB;TopicC");
106+
List<String> topicList = header.fetchTopicList();
107+
assertEquals(Arrays.asList("TopicA", "TopicB", "TopicC"), topicList);
108+
}
109+
110+
@Test
111+
public void fetchTopicList_TopicListEndsWithSeparator_ReturnsTopicList() {
112+
header.setTopicList("TopicA;TopicB;");
113+
List<String> topicList = header.fetchTopicList();
114+
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
115+
}
116+
117+
@Test
118+
public void fetchTopicList_TopicListStartsWithSeparator_ReturnsTopicList() {
119+
header.setTopicList(";TopicA;TopicB");
120+
List<String> topicList = header.fetchTopicList();
121+
assertEquals(Arrays.asList("TopicA", "TopicB"), topicList);
122+
}
123+
}

tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ public void testMessageTrackDetail() throws InterruptedException, RemotingExcept
456456
connection.setConnectionSet(connections);
457457
when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(connection);
458458
ConsumeStats consumeStats = new ConsumeStats();
459-
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), isNull(), anyLong())).thenReturn(consumeStats);
459+
when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), (String) isNull(), anyLong())).thenReturn(consumeStats);
460460
List<MessageTrack> broadcastMessageTracks = defaultMQAdminExt.messageTrackDetail(messageExt);
461461
assertThat(broadcastMessageTracks.size()).isEqualTo(2);
462462
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);

0 commit comments

Comments
 (0)