Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public boolean isCgNeedColdDataFlowCtr(String consumerGroup) {
if (!this.messageStoreConfig.isColdDataFlowControlEnable()) {
return false;
}
if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) {
if (MixAll.isSysConsumerGroupPullMessage(consumerGroup)) {
return false;
}
AccAndTimeStamp accAndTimeStamp = cgColdThresholdMapRuntime.get(consumerGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,19 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
this.brokerController.getConsumerFilterManager());
}

if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == consumerGroupInfo.getConsumeType()) {
if ((null == consumerGroupInfo || null == consumerGroupInfo.findChannel(channel))
&& !MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) {
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist, or the pull consumer is rejected by server." + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
}
}

final MessageStore messageStore = brokerController.getMessageStore();
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public void disableConsume(final String groupName) {
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = getSubscriptionGroupConfig(group);
if (null == subscriptionGroupConfig) {
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
|| MixAll.isSysConsumerGroupAndEnableCreate(group, brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) {
return null;
}
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ public class BrokerConfig extends BrokerIdentity {

private boolean recallMessageEnable = false;

private boolean enableCreateSysGroup = true;

public String getConfigBlackList() {
return configBlackList;
}
Expand Down Expand Up @@ -2006,4 +2008,12 @@ public boolean isRecallMessageEnable() {
public void setRecallMessageEnable(boolean recallMessageEnable) {
this.recallMessageEnable = recallMessageEnable;
}

public boolean isEnableCreateSysGroup() {
return enableCreateSysGroup;
}

public void setEnableCreateSysGroup(boolean enableCreateSysGroup) {
this.enableCreateSysGroup = enableCreateSysGroup;
}
}
6 changes: 5 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}

public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) {
return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
}

public static boolean isPredefinedGroup(final String consumerGroup) {
return PREDEFINE_GROUP_SET.contains(consumerGroup);
}
Expand Down Expand Up @@ -530,7 +534,7 @@ public static String dealFilePath(String aclFilePath) {
return path.normalize().toString();
}

public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup) {
public static boolean isSysConsumerGroupPullMessage(String consumerGroup) {
if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup)
|| TOOLS_CONSUMER_GROUP.equals(consumerGroup)
|| SCHEDULE_CONSUMER_GROUP.equals(consumerGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
continue;
}

if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupForNoColdReadLimit(group) && !selectResult.isInCache()) {
if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) {
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
}

Expand Down
Loading