diff --git a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java index 2e249304056..5b8b2fb9cec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java @@ -187,7 +187,7 @@ public boolean isCgNeedColdDataFlowCtr(String consumerGroup) { if (!this.messageStoreConfig.isColdDataFlowControlEnable()) { return false; } - if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) { + if (MixAll.isSysConsumerGroupPullMessage(consumerGroup)) { return false; } AccAndTimeStamp accAndTimeStamp = cgColdThresholdMapRuntime.get(consumerGroup); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 5b11bc2fef4..8a3a91e8ca8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -489,6 +489,19 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re this.brokerController.getConsumerFilterManager()); } + if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) { + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY == consumerGroupInfo.getConsumeType()) { + if ((null == consumerGroupInfo || null == consumerGroupInfo.findChannel(channel)) + && !MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) { + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's group info not exist, or the pull consumer is rejected by server." + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + } + } + final MessageStore messageStore = brokerController.getMessageStore(); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index d85342e1a18..f3e669fb3ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -260,7 +260,8 @@ public void disableConsume(final String groupName) { public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { SubscriptionGroupConfig subscriptionGroupConfig = getSubscriptionGroupConfig(group); if (null == subscriptionGroupConfig) { - if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { + if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() + || MixAll.isSysConsumerGroupAndEnableCreate(group, brokerController.getBrokerConfig().isEnableCreateSysGroup())) { if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) { return null; } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index dd345449351..d1c6f328ade 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -455,6 +455,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean recallMessageEnable = false; + private boolean enableCreateSysGroup = true; + public String getConfigBlackList() { return configBlackList; } @@ -2006,4 +2008,12 @@ public boolean isRecallMessageEnable() { public void setRecallMessageEnable(boolean recallMessageEnable) { this.recallMessageEnable = recallMessageEnable; } + + public boolean isEnableCreateSysGroup() { + return enableCreateSysGroup; + } + + public void setEnableCreateSysGroup(boolean enableCreateSysGroup) { + this.enableCreateSysGroup = enableCreateSysGroup; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index c05a1d19262..aca9bd4ed7b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -178,6 +178,10 @@ public static boolean isSysConsumerGroup(final String consumerGroup) { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } + public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) { + return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup); + } + public static boolean isPredefinedGroup(final String consumerGroup) { return PREDEFINE_GROUP_SET.contains(consumerGroup); } @@ -530,7 +534,7 @@ public static String dealFilePath(String aclFilePath) { return path.normalize().toString(); } - public static boolean isSysConsumerGroupForNoColdReadLimit(String consumerGroup) { + public static boolean isSysConsumerGroupPullMessage(String consumerGroup) { if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup) || TOOLS_CONSUMER_GROUP.equals(consumerGroup) || SCHEDULE_CONSUMER_GROUP.equals(consumerGroup) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index d6134683861..74244f0544c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -916,7 +916,7 @@ public GetMessageResult getMessage(final String group, final String topic, final continue; } - if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupForNoColdReadLimit(group) && !selectResult.isInCache()) { + if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) { getResult.setColdDataSum(getResult.getColdDataSum() + sizePy); }