Skip to content

Commit 921664f

Browse files
authored
[ISSUE #9443] Fix add wrong value to topicGroupTable in ConsumerManager (#9444)
1 parent cd6bec6 commit 921664f

2 files changed

Lines changed: 24 additions & 3 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
292292
Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
293293
groups = prev != null ? prev : tmp;
294294
}
295-
groups.add(subscriptionData.getTopic());
295+
groups.add(group);
296296
}
297297

298298
boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashSet;
3838
import java.util.Set;
3939

40+
import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_PASSIVELY;
4041
import static org.assertj.core.api.Assertions.assertThat;
4142
import static org.junit.Assert.assertEquals;
4243
import static org.junit.Assert.assertFalse;
@@ -115,7 +116,7 @@ public void registerConsumerTest() {
115116
final Set<SubscriptionData> subList = new HashSet<>();
116117
SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
117118
subList.add(subscriptionData);
118-
consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY,
119+
consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY,
119120
MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
120121
verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any());
121122
assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
@@ -195,7 +196,7 @@ private void register() {
195196
final Set<SubscriptionData> subList = new HashSet<>();
196197
SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
197198
subList.add(subscriptionData);
198-
consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY,
199+
consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY,
199200
MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
200201
}
201202

@@ -210,4 +211,24 @@ public void removeExpireConsumerGroupInfo() {
210211
assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull();
211212
assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull();
212213
}
214+
215+
@Test
216+
public void testRegisterConsumerWithoutSub() {
217+
ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, CONSUME_PASSIVELY,
218+
MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
219+
SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
220+
groupInfo.getSubscriptionTable().put(TOPIC, subscriptionData);
221+
consumerManager.getConsumerTable().put(GROUP, groupInfo);
222+
223+
consumerManager.registerConsumerWithoutSub(GROUP,
224+
clientChannelInfo,
225+
CONSUME_PASSIVELY,
226+
MessageModel.CLUSTERING,
227+
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET,
228+
true);
229+
230+
Set<String> actual = consumerManager.queryTopicConsumeByWho(TOPIC);
231+
assertThat(actual).contains(GROUP);
232+
assertThat(actual).doesNotContain(TOPIC);
233+
}
213234
}

0 commit comments

Comments
 (0)