From 9baa23c09d3674b50f7c0fb092e797990f823f36 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Fri, 14 Mar 2025 19:49:16 +0800 Subject: [PATCH] [ISSUE #9246] Support init offset mode in PopConsumerService --- .../broker/pop/PopConsumerContext.java | 9 ++++++- .../broker/pop/PopConsumerService.java | 27 ++++++++++++------- .../broker/processor/PopMessageProcessor.java | 5 ++-- .../broker/pop/PopConsumerContextTest.java | 3 ++- .../broker/pop/PopConsumerServiceTest.java | 9 ++++--- 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java index 09bc4e6b47c..0ad8bacab1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java @@ -35,6 +35,8 @@ public class PopConsumerContext { private final boolean fifo; + private final int initMode; + private final String attemptId; private final AtomicLong restCount; @@ -50,13 +52,14 @@ public class PopConsumerContext { private List popConsumerRecordList; public PopConsumerContext(String clientHost, - long popTime, long invisibleTime, String groupId, boolean fifo, String attemptId) { + long popTime, long invisibleTime, String groupId, boolean fifo, int initMode, String attemptId) { this.clientHost = clientHost; this.popTime = popTime; this.invisibleTime = invisibleTime; this.groupId = groupId; this.fifo = fifo; + this.initMode = initMode; this.attemptId = attemptId; this.restCount = new AtomicLong(0); this.startOffsetInfo = new StringBuilder(); @@ -120,6 +123,10 @@ public boolean isFifo() { return fifo; } + public int getInitMode() { + return initMode; + } + public long getInvisibleTime() { return invisibleTime; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 1f0125412a7..dde13a5ed73 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageAccessor; @@ -197,7 +198,18 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes return context; } - public Long getPopOffset(String groupId, String topicId, int queueId) { + public long getPopOffset(String groupId, String topicId, int queueId, int initMode) { + long offset = this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId); + if (offset < 0L) { + try { + offset = this.brokerController.getPopMessageProcessor() + .getInitOffset(topicId, groupId, queueId, initMode, true); + log.info("PopConsumerService init offset, groupId={}, topicId={}, queueId={}, init={}, offset={}", + groupId, topicId, queueId, ConsumeInitMode.MIN == initMode ? "min" : "max", offset); + } catch (ConsumeQueueException e) { + throw new RuntimeException(e); + } + } Long resetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId); if (resetOffset != null) { @@ -206,7 +218,7 @@ public Long getPopOffset(String groupId, String topicId, int queueId) { this.brokerController.getConsumerOffsetManager() .commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset); } - return resetOffset; + return resetOffset != null ? resetOffset : offset; } public CompletableFuture getMessageAsync(String clientHost, @@ -215,9 +227,6 @@ public CompletableFuture getMessageAsync(String clientHost, log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}", groupId, topicId, offset, queueId, batchSize, filter != null); - Long resetOffset = this.getPopOffset(groupId, topicId, queueId); - final long currentOffset = resetOffset != null ? resetOffset : offset; - CompletableFuture getMessageFuture = brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter); @@ -240,7 +249,7 @@ public CompletableFuture getMessageAsync(String clientHost, log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " + "groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}", - groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset()); + groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset()); return brokerController.getMessageStore().getMessageAsync( groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter); @@ -299,7 +308,7 @@ protected CompletableFuture getMessageAsync(CompletableFutur result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId)); return CompletableFuture.completedFuture(result); } else { - long consumeOffset = brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId); + final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode()); return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter) .thenApply(getMessageResult -> addGetMessageResult( result, getMessageResult, topicId, queueId, retryType, consumeOffset)); @@ -308,11 +317,11 @@ protected CompletableFuture getMessageAsync(CompletableFutur } public CompletableFuture popAsync(String clientHost, long popTime, long invisibleTime, - String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, + String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode, MessageFilter filter) { PopConsumerContext popConsumerContext = - new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, attemptId); + new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId); TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId); if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index b84afe21943..dd8314b7e0d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -383,7 +383,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC CompletableFuture popAsyncFuture = brokerController.getPopConsumerService().popAsync( RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getAttemptId(), messageFilter); + requestHeader.getMaxMsgNums(), requestHeader.isOrder(), + requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter); popAsyncFuture.thenApply(result -> { if (result.isFound()) { @@ -888,7 +889,7 @@ private long getPopOffset(String topic, String group, int queueId, int initMode, } } - private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) + public long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) throws ConsumeQueueException { long offset; if (ConsumeInitMode.MIN == initMode || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java index 554933eabc4..6f009423f9d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.pop; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -29,7 +30,7 @@ public class PopConsumerContextTest { public void consumerContextTest() { long popTime = System.currentTimeMillis(); PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789", - popTime, 20_000, "GroupId", true, "attemptId"); + popTime, 20_000, "GroupId", true, ConsumeInitMode.MIN, "attemptId"); Assert.assertFalse(context.isFound()); Assert.assertEquals("127.0.0.1:6789", context.getClientHost()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 2b930d5852c..7fb619f7400 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -190,7 +191,7 @@ public void recodeRetryMessageTest() throws Exception { @Test public void addGetMessageResultTest() { PopConsumerContext context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId); GetMessageResult result = new GetMessageResult(); result.setStatus(GetMessageStatus.FOUND); result.getMessageQueueOffset().add(100L); @@ -231,7 +232,7 @@ public void getMessageAsyncTest() throws Exception { // fifo block PopConsumerContext context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId); consumerService.setFifoBlocked(context, groupId, topicId, queueId, Collections.singletonList(100L)); Mockito.when(brokerController.getConsumerOrderInfoManager() .checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong())).thenReturn(true); @@ -257,7 +258,7 @@ public void getMessageAsyncTest() throws Exception { // fifo block test context = new PopConsumerContext( - clientHost, System.currentTimeMillis(), 20000, groupId, true, attemptId); + clientHost, System.currentTimeMillis(), 20000, groupId, true, ConsumeInitMode.MIN, attemptId); future = CompletableFuture.completedFuture(context); Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId, 10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount()); @@ -306,7 +307,7 @@ public void popAsyncTest() { // pop broker consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(), - 20000, groupId, topicId, -1, 10, false, attemptId, null).join(); + 20000, groupId, topicId, -1, 10, false, attemptId, ConsumeInitMode.MIN, null).join(); } @Test