Skip to content

Commit 9f23894

Browse files
authored
[ISSUE #9980] Skip invalid records when the group is absent in Pop (#9981)
Signed-off-by: terrance.lzm <terrance.lzm@alibaba-inc.com>
1 parent 6ab57ad commit 9f23894

3 files changed

Lines changed: 39 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,15 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime,
501501
PopConsumerRecord ackRecord = new PopConsumerRecord(
502502
popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null);
503503

504-
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
504+
// No need to generate new records when the group does not exist,
505+
// because these retry messages will not be consumed by anyone.
506+
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
507+
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) {
508+
log.info("PopConsumerService change invisibility skip, time={}, " +
509+
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
510+
} else {
511+
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
512+
}
505513

506514
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
507515
if (popConsumerCache.deleteRecords(Collections.singletonList(ackRecord)).isEmpty()) {
@@ -519,6 +527,13 @@ public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(Po
519527
}
520528

521529
public CompletableFuture<Boolean> revive(PopConsumerRecord record) {
530+
531+
if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
532+
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(record.getGroupId())) {
533+
log.info("PopConsumerService skip revive message, record={}", record);
534+
return CompletableFuture.completedFuture(true);
535+
}
536+
522537
return this.getMessageAsync(record)
523538
.thenCompose(result -> {
524539
if (result == null) {

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,18 @@ public void ackAsyncTest() {
324324
consumerService.shutdown();
325325
}
326326

327+
@Test
328+
public void reviveSkipIfGroupAbsent() {
329+
String groupName = "PopGroupAbsent";
330+
brokerController.getBrokerConfig().setPopReviveSkipIfGroupAbsent(true);
331+
PopConsumerRecord record = Mockito.mock(PopConsumerRecord.class);
332+
Mockito.when(record.getGroupId()).thenReturn(groupName);
333+
Mockito.when(brokerController.getSubscriptionGroupManager()
334+
.containsSubscriptionGroup(groupName)).thenReturn(false);
335+
CompletableFuture<Boolean> result = consumerService.revive(record);
336+
Assert.assertTrue(result.join());
337+
}
338+
327339
@Test
328340
public void reviveRetryTest() {
329341
Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null);
@@ -393,6 +405,8 @@ public void reviveRetryTest() {
393405
@Test
394406
public void reviveBackoffRetryTest() {
395407
Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class));
408+
Mockito.when(brokerController.getSubscriptionGroupManager()
409+
.containsSubscriptionGroup(anyString())).thenReturn(true);
396410
PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
397411

398412
consumerService.getPopConsumerStore().start();

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public class BrokerConfig extends BrokerIdentity {
251251
private int popReviveMaxReturnSizePerRead = 16 * 1024;
252252
private int popReviveConcurrency = 32;
253253
private int popReviveMaxAttemptTimes = 16;
254+
private boolean popReviveSkipIfGroupAbsent = true;
254255
// each message queue will have a corresponding retry queue
255256
private boolean useSeparateRetryQueue = false;
256257
private boolean realTimeNotifyConsumerChange = true;
@@ -699,6 +700,14 @@ public void setPopReviveMaxAttemptTimes(int popReviveMaxAttemptTimes) {
699700
this.popReviveMaxAttemptTimes = popReviveMaxAttemptTimes;
700701
}
701702

703+
public boolean isPopReviveSkipIfGroupAbsent() {
704+
return popReviveSkipIfGroupAbsent;
705+
}
706+
707+
public void setPopReviveSkipIfGroupAbsent(boolean popReviveSkipIfGroupAbsent) {
708+
this.popReviveSkipIfGroupAbsent = popReviveSkipIfGroupAbsent;
709+
}
710+
702711
public boolean isTraceOn() {
703712
return traceOn;
704713
}

0 commit comments

Comments
 (0)