From e715f6fbdef426c228660d0123593f282098ef0d Mon Sep 17 00:00:00 2001 From: yx9o Date: Sun, 1 Jun 2025 22:41:47 +0800 Subject: [PATCH 1/2] [ISSUE #9443] Fix add value error of topicGroupTable in ConsumerManager --- .../broker/client/ConsumerManager.java | 2 +- .../broker/client/ConsumerManagerTest.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 5aec8e577f8..341bbb5dad1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -292,7 +292,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne Set prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp); groups = prev != null ? prev : tmp; } - groups.add(subscriptionData.getTopic()); + groups.add(group); } boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index 2afd071b5ec..43c389d0762 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -210,4 +210,24 @@ public void removeExpireConsumerGroupInfo() { assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull(); assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull(); } + + @Test + public void testRegisterConsumerWithoutSub() { + ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); + groupInfo.getSubscriptionTable().put(TOPIC, subscriptionData); + consumerManager.getConsumerTable().put(GROUP, groupInfo); + + consumerManager.registerConsumerWithoutSub(GROUP, + clientChannelInfo, + ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, + ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, + true); + + Set actual = consumerManager.queryTopicConsumeByWho(TOPIC); + assertThat(actual).contains(GROUP); + assertThat(actual).doesNotContain(TOPIC); + } } From 9c2f2cea25700c904e17a09e1a146434453b0480 Mon Sep 17 00:00:00 2001 From: yx9o Date: Mon, 2 Jun 2025 09:05:09 +0800 Subject: [PATCH 2/2] Update test --- .../rocketmq/broker/client/ConsumerManagerTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index 43c389d0762..1b8293159d3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.Set; +import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_PASSIVELY; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -115,7 +116,7 @@ public void registerConsumerTest() { final Set subList = new HashSet<>(); SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); subList.add(subscriptionData); - consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY, + consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY, MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true); verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any()); assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull(); @@ -195,7 +196,7 @@ private void register() { final Set subList = new HashSet<>(); SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); subList.add(subscriptionData); - consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY, + consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY, MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true); } @@ -213,7 +214,7 @@ public void removeExpireConsumerGroupInfo() { @Test public void testRegisterConsumerWithoutSub() { - ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, ConsumeType.CONSUME_PASSIVELY, + ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, CONSUME_PASSIVELY, MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); groupInfo.getSubscriptionTable().put(TOPIC, subscriptionData); @@ -221,7 +222,7 @@ public void testRegisterConsumerWithoutSub() { consumerManager.registerConsumerWithoutSub(GROUP, clientChannelInfo, - ConsumeType.CONSUME_PASSIVELY, + CONSUME_PASSIVELY, MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, true);