Skip to content

Commit 438077e

Browse files
authored
Limit group max length to 120. (#9563)
* limit group length to 120 for max length for pop retry topic is 255. * Add unit test for validating group. * Fix unit test for validating gRPC group, limit length to 120
1 parent 40e3aa9 commit 438077e

7 files changed

Lines changed: 108 additions & 55 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
472472
return response;
473473
}
474474

475-
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(requestHeader.getTopic());
475+
TopicValidator.ValidateResult result = TopicValidator.validateTopic(requestHeader.getTopic());
476476
if (!result.isValid()) {
477477
response.setCode(ResponseCode.INVALID_PARAMETER);
478478
response.setRemark(result.getRemark());

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
556556

557557
long executionTime;
558558
try {
559-
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
559+
TopicValidator.ValidateResult result = TopicValidator.validateTopic(topic);
560560
if (!result.isValid()) {
561561
response.setCode(ResponseCode.INVALID_PARAMETER);
562562
response.setRemark(result.getRemark());
@@ -646,7 +646,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
646646
// Valid topics
647647
for (TopicConfig topicConfig : topicConfigList) {
648648
String topic = topicConfig.getTopicName();
649-
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
649+
TopicValidator.ValidateResult result = TopicValidator.validateTopic(topic);
650650
if (!result.isValid()) {
651651
response.setCode(ResponseCode.INVALID_PARAMETER);
652652
response.setRemark(result.getRemark());
@@ -716,7 +716,7 @@ private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerCo
716716

717717
String topic = requestHeader.getTopic();
718718

719-
TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topic);
719+
TopicValidator.ValidateResult result = TopicValidator.validateTopic(topic);
720720
if (!result.isValid()) {
721721
response.setCode(ResponseCode.INVALID_PARAMETER);
722722
response.setRemark(result.getRemark());
@@ -1532,14 +1532,23 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
15321532
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
15331533

15341534
SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
1535-
if (config != null) {
1535+
if (null != config) {
1536+
TopicValidator.ValidateResult result = TopicValidator.validateGroup(config.getGroupName());
1537+
if (!result.isValid()) {
1538+
response.setCode(ResponseCode.INVALID_PARAMETER);
1539+
response.setRemark(result.getRemark());
1540+
return response;
1541+
}
1542+
15361543
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
15371544
}
15381545

15391546
response.setCode(ResponseCode.SUCCESS);
15401547
response.setRemark(null);
15411548
long executionTime = System.currentTimeMillis() - startTime;
1542-
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", config.getGroupName(), executionTime);
1549+
if (null != config) {
1550+
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", config.getGroupName(), executionTime);
1551+
}
15431552
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
15441553
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
15451554
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
@@ -1551,20 +1560,26 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
15511560

15521561
private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerContext ctx, RemotingCommand request) {
15531562
final long startTime = System.nanoTime();
1563+
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
15541564

15551565
final SubscriptionGroupList subscriptionGroupList = SubscriptionGroupList.decode(request.getBody(), SubscriptionGroupList.class);
15561566
final List<SubscriptionGroupConfig> groupConfigList = subscriptionGroupList.getGroupConfigList();
15571567

15581568
final StringBuilder builder = new StringBuilder();
15591569
for (SubscriptionGroupConfig config : groupConfigList) {
1570+
TopicValidator.ValidateResult result = TopicValidator.validateGroup(config.getGroupName());
1571+
if (!result.isValid()) {
1572+
response.setCode(ResponseCode.INVALID_PARAMETER);
1573+
response.setRemark(result.getRemark());
1574+
return response;
1575+
}
15601576
builder.append(config.getGroupName()).append(";");
15611577
}
15621578
final String groupNames = builder.toString();
15631579
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroupList: groupNames: {}, called by {}",
15641580
groupNames,
15651581
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
15661582

1567-
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
15681583
try {
15691584
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
15701585
response.setCode(ResponseCode.SUCCESS);
@@ -2058,13 +2073,13 @@ private Long searchOffsetByTimestamp(String topic, int queueId, long timestamp)
20582073
/**
20592074
* Reset consumer offset.
20602075
*
2061-
* @param topic Required, not null.
2062-
* @param group Required, not null.
2063-
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
2064-
* would get reset.
2076+
* @param topic Required, not null.
2077+
* @param group Required, not null.
2078+
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
2079+
* would get reset.
20652080
* @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
2066-
* binary search is performed to locate target offset.
2067-
* @param offset Target offset to reset to if target queue ID is properly provided.
2081+
* binary search is performed to locate target offset.
2082+
* @param offset Target offset to reset to if target queue ID is properly provided.
20682083
* @return Affected queues and their new offset
20692084
*/
20702085
private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {

broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package org.apache.rocketmq.broker.subscription;
1818

1919
import com.google.common.collect.ImmutableMap;
20+
import com.google.common.collect.ImmutableSortedMap;
21+
import com.google.common.collect.Maps;
2022
import java.util.HashMap;
21-
import java.util.Iterator;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Map.Entry;
@@ -27,15 +28,11 @@
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.ConcurrentMap;
2930
import java.util.stream.Collectors;
30-
31-
import com.google.common.collect.ImmutableSortedMap;
32-
import com.google.common.collect.Maps;
3331
import org.apache.commons.collections.CollectionUtils;
3432
import org.apache.commons.collections.MapUtils;
3533
import org.apache.commons.lang3.StringUtils;
3634
import org.apache.rocketmq.broker.BrokerController;
3735
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
38-
import org.apache.rocketmq.client.Validators;
3936
import org.apache.rocketmq.common.ConfigManager;
4037
import org.apache.rocketmq.common.MixAll;
4138
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
@@ -48,6 +45,7 @@
4845
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
4946
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
5047

48+
@SuppressWarnings("Duplicates")
5149
public class SubscriptionGroupManager extends ConfigManager {
5250
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
5351

@@ -191,10 +189,6 @@ public void updateForbidden(String group, String topic, int forbiddenIndex, bool
191189

192190
/**
193191
* set the bit value to 1 at the specific index (from 0)
194-
*
195-
* @param group
196-
* @param topic
197-
* @param forbiddenIndex from 0
198192
*/
199193
public void setForbidden(String group, String topic, int forbiddenIndex) {
200194
int topicForbidden = getForbidden(group, topic);
@@ -204,10 +198,6 @@ public void setForbidden(String group, String topic, int forbiddenIndex) {
204198

205199
/**
206200
* clear the bit value to 0 at the specific index (from 0)
207-
*
208-
* @param group
209-
* @param topic
210-
* @param forbiddenIndex from 0
211201
*/
212202
public void clearForbidden(String group, String topic, int forbiddenIndex) {
213203
int topicForbidden = getForbidden(group, topic);
@@ -270,7 +260,8 @@ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
270260
if (null == subscriptionGroupConfig) {
271261
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
272262
|| MixAll.isSysConsumerGroupAndEnableCreate(group, brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
273-
if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) {
263+
TopicValidator.ValidateResult result = TopicValidator.validateGroup(group);
264+
if (!result.isValid()) {
274265
return null;
275266
}
276267
subscriptionGroupConfig = new SubscriptionGroupConfig();
@@ -319,9 +310,7 @@ public String encode(final boolean prettyFormat) {
319310
}
320311

321312
private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
322-
Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
323-
while (it.hasNext()) {
324-
Entry<String, SubscriptionGroupConfig> next = it.next();
313+
for (Entry<String, SubscriptionGroupConfig> next : sgm.getSubscriptionGroupTable().entrySet()) {
325314
log.info("load exist subscription group, {}", next.getValue().toString());
326315
}
327316
}

client/src/main/java/org/apache/rocketmq/client/Validators.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
public class Validators {
3939
public static final int CHARACTER_MAX_LENGTH = 255;
4040
public static final int TOPIC_MAX_LENGTH = 127;
41+
/*
42+
* Group name max length is 120, for it will be used to make up retry and DLQ topic,
43+
* like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
44+
*/
45+
public static final int GROUP_MAX_LENGTH = 120;
4146

4247
/**
4348
* Validate group
@@ -47,11 +52,10 @@ public static void checkGroup(String group) throws MQClientException {
4752
throw new MQClientException("the specified group is blank", null);
4853
}
4954

50-
if (group.length() > CHARACTER_MAX_LENGTH) {
51-
throw new MQClientException("the specified group is longer than group max length 255.", null);
55+
if (group.length() > GROUP_MAX_LENGTH) {
56+
throw new MQClientException(String.format("the specified group[%s] is longer than group max length: %s.", group, GROUP_MAX_LENGTH), null);
5257
}
5358

54-
5559
if (isTopicOrGroupIllegal(group)) {
5660
throw new MQClientException(String.format(
5761
"the specified group[%s] contains illegal characters, allowing only %s", group,

common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@ public class TopicValidator {
3939

4040
public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];
4141
private static final int TOPIC_MAX_LENGTH = 127;
42+
/*
43+
* Group name max length is 120, for it will be used to make up retry and DLQ topic,
44+
* like pull retry: %RETRY%group_topic and pop retry: %RETRY%group_topic.
45+
*/
46+
private static final int GROUP_MAX_LENGTH = 120;
4247
private static final int RETRY_OR_DLQ_TOPIC_MAX_LENGTH = 255;
4348

4449
private static final Set<String> SYSTEM_TOPIC_SET = new HashSet<>();
4550

4651
/**
47-
* Topics'set which client can not send msg!
52+
* Topic set which client can not send msg!
4853
*/
4954
private static final Set<String> NOT_ALLOWED_SEND_TOPIC_SET = new HashSet<>();
5055

@@ -93,44 +98,65 @@ public class TopicValidator {
9398
public static boolean isTopicOrGroupIllegal(String str) {
9499
int strLen = str.length();
95100
int len = VALID_CHAR_BIT_MAP.length;
96-
boolean[] bitMap = VALID_CHAR_BIT_MAP;
97101
for (int i = 0; i < strLen; i++) {
98102
char ch = str.charAt(i);
99-
if (ch >= len || !bitMap[ch]) {
103+
if (ch >= len || !VALID_CHAR_BIT_MAP[ch]) {
100104
return true;
101105
}
102106
}
103107
return false;
104108
}
105109

106-
public static ValidateTopicResult validateTopic(String topic) {
110+
public static ValidateResult validateTopic(String topic) {
107111

108112
if (UtilAll.isBlank(topic)) {
109-
return new ValidateTopicResult(false, "The specified topic is blank.");
113+
return new ValidateResult(false, "The specified topic is blank.");
110114
}
111115

112116
if (isTopicOrGroupIllegal(topic)) {
113-
return new ValidateTopicResult(false, "The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$");
117+
String falseRemark = "The specified topic: " + topic + ", contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
118+
return new ValidateResult(false, falseRemark);
114119
}
115120

116121
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
117122
if (topic.length() > RETRY_OR_DLQ_TOPIC_MAX_LENGTH) {
118-
return new ValidateTopicResult(false, "The specified topic is longer than topic max length.");
123+
String falseRemark = "The specified topic is DLQ or Retry topic: " + topic + ", and it's longer than topic max length: " + RETRY_OR_DLQ_TOPIC_MAX_LENGTH;
124+
return new ValidateResult(false, falseRemark);
119125
}
120126
} else {
121127
if (topic.length() > TOPIC_MAX_LENGTH) {
122-
return new ValidateTopicResult(false, "The specified topic is longer than topic max length.");
128+
String falseRemark = "The specified topic: " + topic + ", is longer than topic max length: " + TOPIC_MAX_LENGTH;
129+
return new ValidateResult(false, falseRemark);
123130
}
124131
}
125132

126-
return new ValidateTopicResult(true, "");
133+
return new ValidateResult(true, "");
134+
}
135+
136+
public static ValidateResult validateGroup(String group) {
137+
138+
if (UtilAll.isBlank(group)) {
139+
return new ValidateResult(false, "The specified group is blank.");
140+
}
141+
142+
if (isTopicOrGroupIllegal(group)) {
143+
String falseRemark = "The specified group: " + group + ", contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
144+
return new ValidateResult(false, falseRemark);
145+
}
146+
147+
if (group.length() > GROUP_MAX_LENGTH) {
148+
String falseRemark = "The specified group: " + group + ", is longer than group max length: " + GROUP_MAX_LENGTH;
149+
return new ValidateResult(false, falseRemark);
150+
}
151+
152+
return new ValidateResult(true, "");
127153
}
128154

129-
public static class ValidateTopicResult {
155+
public static class ValidateResult {
130156
private final boolean valid;
131157
private final String remark;
132158

133-
public ValidateTopicResult(boolean valid, String remark) {
159+
public ValidateResult(boolean valid, String remark) {
134160
this.valid = valid;
135161
this.remark = remark;
136162
}

common/src/test/java/org/apache/rocketmq/common/topic/TopicValidatorTest.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,26 @@ public class TopicValidatorTest {
2424

2525
@Test
2626
public void testTopicValidator_NotPass() {
27-
TopicValidator.ValidateTopicResult res = TopicValidator.validateTopic("");
27+
TopicValidator.ValidateResult res = TopicValidator.validateTopic("");
2828
assertThat(res.isValid()).isFalse();
2929
assertThat(res.getRemark()).contains("The specified topic is blank");
3030

3131
res = TopicValidator.validateTopic("../TopicTest");
3232
assertThat(res.isValid()).isFalse();
33-
assertThat(res.getRemark()).contains("The specified topic contains illegal characters");
3433

3534
res = TopicValidator.validateTopic(generateString(128));
3635
assertThat(res.isValid()).isFalse();
37-
assertThat(res.getRemark()).contains("The specified topic is longer than topic max length.");
38-
39-
res = TopicValidator.validateTopic(generateString2(128));
40-
assertThat(res.isValid()).isFalse();
41-
assertThat(res.getRemark()).contains("The specified topic is longer than topic max length.");
4236

4337
res = TopicValidator.validateTopic(generateRetryTopic(256));
4438
assertThat(res.isValid()).isFalse();
45-
assertThat(res.getRemark()).contains("The specified topic is longer than topic max length.");
4639

4740
res = TopicValidator.validateTopic(generateDlqTopic(256));
4841
assertThat(res.isValid()).isFalse();
49-
assertThat(res.getRemark()).contains("The specified topic is longer than topic max length.");
5042
}
5143

5244
@Test
5345
public void testTopicValidator_Pass() {
54-
TopicValidator.ValidateTopicResult res = TopicValidator.validateTopic("TestTopic");
46+
TopicValidator.ValidateResult res = TopicValidator.validateTopic("TestTopic");
5547
assertThat(res.isValid()).isTrue();
5648
assertThat(res.getRemark()).isEmpty();
5749

@@ -68,6 +60,33 @@ public void testTopicValidator_Pass() {
6860
assertThat(res.getRemark()).isEmpty();
6961
}
7062

63+
@Test
64+
public void testGroupValidator_Pass() {
65+
TopicValidator.ValidateResult res = TopicValidator.validateGroup("TestGroup");
66+
assertThat(res.isValid()).isTrue();
67+
assertThat(res.getRemark()).isEmpty();
68+
69+
res = TopicValidator.validateGroup(generateString2(120));
70+
assertThat(res.isValid()).isTrue();
71+
assertThat(res.getRemark()).isEmpty();
72+
}
73+
74+
@Test
75+
public void testGroupValidator__NotPass() {
76+
TopicValidator.ValidateResult res = TopicValidator.validateGroup("");
77+
assertThat(res.isValid()).isFalse();
78+
assertThat(res.getRemark()).contains("The specified group is blank");
79+
80+
res = TopicValidator.validateGroup("../GroupTest");
81+
assertThat(res.isValid()).isFalse();
82+
83+
res = TopicValidator.validateGroup(generateString(120));
84+
assertThat(res.isValid()).isFalse();
85+
86+
res = TopicValidator.validateGroup(generateString2(121));
87+
assertThat(res.isValid()).isFalse();
88+
}
89+
7190
@Test
7291
public void testAddSystemTopic() {
7392
String topic = "SYSTEM_TOPIC_TEST";

proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void testValidateConsumer() {
6464
assertThrows(GrpcProxyException.class, () -> messingActivity.validateConsumerGroup(Resource.newBuilder().setName(MixAll.CID_SYS_RMQ_TRANS).build()));
6565
assertThrows(GrpcProxyException.class, () -> messingActivity.validateConsumerGroup(Resource.newBuilder().setName("@").build()));
6666
assertThrows(GrpcProxyException.class, () -> messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(256)).build()));
67-
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(255)).build());
67+
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(120)).build());
6868
}
6969

7070
private static String createString(int len) {

0 commit comments

Comments
 (0)