From 03239c793f3a35a30abdcfcab82b452dc0bda5bf Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Tue, 11 Mar 2025 16:42:33 +0800 Subject: [PATCH 1/3] fix: Response codes do not match --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4ff4bed814d..19dba15afb5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1884,8 +1884,8 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, return response; } - response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); - response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not exist"); return response; } From fc2e86ebc160c2d3c874a93f6cf1842319a80180 Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Wed, 12 Mar 2025 17:03:29 +0800 Subject: [PATCH 2/3] fix: Response codes do not match --- .../broker/processor/AdminBrokerProcessor.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 19dba15afb5..d903042c89e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1855,6 +1855,12 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, final GetConsumerConnectionListRequestHeader requestHeader = (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); + if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(requestHeader.getConsumerGroup())) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not exist"); + return response; + } + ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (consumerGroupInfo != null) { @@ -1884,8 +1890,8 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, return response; } - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not exist"); + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); return response; } From d19169f86ba125ae3c346f7b0ea08244c6555941 Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Wed, 12 Mar 2025 17:38:40 +0800 Subject: [PATCH 3/3] fix test --- .../rocketmq/broker/processor/AdminBrokerProcessorTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 959b147d9d3..b7eb1ca332d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -38,6 +38,7 @@ import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; @@ -681,7 +682,10 @@ public void testGetConsumerConnectionList() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, null); request.addExtField("consumerGroup", "GID-group-test"); consumerManager = mock(ConsumerManager.class); + SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class); when(brokerController.getConsumerManager()).thenReturn(consumerManager); + when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); + when(subscriptionGroupManager.containsSubscriptionGroup(anyString())).thenReturn(true); ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo("GID-group-test", ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); when(consumerManager.getConsumerGroupInfo(anyString())).thenReturn(consumerGroupInfo); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);