Skip to content

Commit 054f910

Browse files
authored
[ISSUE #9246] Support init offset mode in PopConsumerService (#9247)
1 parent 622c807 commit 054f910

File tree

5 files changed

+36
-17
lines changed

5 files changed

+36
-17
lines changed

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class PopConsumerContext {
3535

3636
private final boolean fifo;
3737

38+
private final int initMode;
39+
3840
private final String attemptId;
3941

4042
private final AtomicLong restCount;
@@ -50,13 +52,14 @@ public class PopConsumerContext {
5052
private List<PopConsumerRecord> popConsumerRecordList;
5153

5254
public PopConsumerContext(String clientHost,
53-
long popTime, long invisibleTime, String groupId, boolean fifo, String attemptId) {
55+
long popTime, long invisibleTime, String groupId, boolean fifo, int initMode, String attemptId) {
5456

5557
this.clientHost = clientHost;
5658
this.popTime = popTime;
5759
this.invisibleTime = invisibleTime;
5860
this.groupId = groupId;
5961
this.fifo = fifo;
62+
this.initMode = initMode;
6063
this.attemptId = attemptId;
6164
this.restCount = new AtomicLong(0);
6265
this.startOffsetInfo = new StringBuilder();
@@ -120,6 +123,10 @@ public boolean isFifo() {
120123
return fifo;
121124
}
122125

126+
public int getInitMode() {
127+
return initMode;
128+
}
129+
123130
public long getInvisibleTime() {
124131
return invisibleTime;
125132
}

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.rocketmq.common.ServiceThread;
4545
import org.apache.rocketmq.common.TopicConfig;
4646
import org.apache.rocketmq.common.TopicFilterType;
47+
import org.apache.rocketmq.common.constant.ConsumeInitMode;
4748
import org.apache.rocketmq.common.constant.LoggerName;
4849
import org.apache.rocketmq.common.constant.PermName;
4950
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -197,7 +198,18 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
197198
return context;
198199
}
199200

200-
public Long getPopOffset(String groupId, String topicId, int queueId) {
201+
public long getPopOffset(String groupId, String topicId, int queueId, int initMode) {
202+
long offset = this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
203+
if (offset < 0L) {
204+
try {
205+
offset = this.brokerController.getPopMessageProcessor()
206+
.getInitOffset(topicId, groupId, queueId, initMode, true);
207+
log.info("PopConsumerService init offset, groupId={}, topicId={}, queueId={}, init={}, offset={}",
208+
groupId, topicId, queueId, ConsumeInitMode.MIN == initMode ? "min" : "max", offset);
209+
} catch (ConsumeQueueException e) {
210+
throw new RuntimeException(e);
211+
}
212+
}
201213
Long resetOffset =
202214
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
203215
if (resetOffset != null) {
@@ -206,7 +218,7 @@ public Long getPopOffset(String groupId, String topicId, int queueId) {
206218
this.brokerController.getConsumerOffsetManager()
207219
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
208220
}
209-
return resetOffset;
221+
return resetOffset != null ? resetOffset : offset;
210222
}
211223

212224
public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
@@ -215,9 +227,6 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
215227
log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}",
216228
groupId, topicId, offset, queueId, batchSize, filter != null);
217229

218-
Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
219-
final long currentOffset = resetOffset != null ? resetOffset : offset;
220-
221230
CompletableFuture<GetMessageResult> getMessageFuture =
222231
brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter);
223232

@@ -240,7 +249,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
240249

241250
log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " +
242251
"groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}",
243-
groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset());
252+
groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset());
244253

245254
return brokerController.getMessageStore().getMessageAsync(
246255
groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter);
@@ -299,7 +308,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
299308
result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId));
300309
return CompletableFuture.completedFuture(result);
301310
} else {
302-
long consumeOffset = brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
311+
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode());
303312
return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter)
304313
.thenApply(getMessageResult -> addGetMessageResult(
305314
result, getMessageResult, topicId, queueId, retryType, consumeOffset));
@@ -308,11 +317,11 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
308317
}
309318

310319
public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime,
311-
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId,
320+
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode,
312321
MessageFilter filter) {
313322

314323
PopConsumerContext popConsumerContext =
315-
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, attemptId);
324+
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId);
316325

317326
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId);
318327
if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) {

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
383383
CompletableFuture<PopConsumerContext> popAsyncFuture = brokerController.getPopConsumerService().popAsync(
384384
RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(),
385385
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
386-
requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getAttemptId(), messageFilter);
386+
requestHeader.getMaxMsgNums(), requestHeader.isOrder(),
387+
requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter);
387388

388389
popAsyncFuture.thenApply(result -> {
389390
if (result.isFound()) {
@@ -888,7 +889,7 @@ private long getPopOffset(String topic, String group, int queueId, int initMode,
888889
}
889890
}
890891

891-
private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init)
892+
public long getInitOffset(String topic, String group, int queueId, int initMode, boolean init)
892893
throws ConsumeQueueException {
893894
long offset;
894895
if (ConsumeInitMode.MIN == initMode || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.pop;
1818

19+
import org.apache.rocketmq.common.constant.ConsumeInitMode;
1920
import org.apache.rocketmq.store.GetMessageResult;
2021
import org.apache.rocketmq.store.GetMessageStatus;
2122
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -29,7 +30,7 @@ public class PopConsumerContextTest {
2930
public void consumerContextTest() {
3031
long popTime = System.currentTimeMillis();
3132
PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789",
32-
popTime, 20_000, "GroupId", true, "attemptId");
33+
popTime, 20_000, "GroupId", true, ConsumeInitMode.MIN, "attemptId");
3334

3435
Assert.assertFalse(context.isFound());
3536
Assert.assertEquals("127.0.0.1:6789", context.getClientHost());

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.rocketmq.common.BrokerConfig;
4242
import org.apache.rocketmq.common.KeyBuilder;
4343
import org.apache.rocketmq.common.TopicConfig;
44+
import org.apache.rocketmq.common.constant.ConsumeInitMode;
4445
import org.apache.rocketmq.common.constant.PermName;
4546
import org.apache.rocketmq.common.message.MessageDecoder;
4647
import org.apache.rocketmq.common.message.MessageExt;
@@ -190,7 +191,7 @@ public void recodeRetryMessageTest() throws Exception {
190191
@Test
191192
public void addGetMessageResultTest() {
192193
PopConsumerContext context = new PopConsumerContext(
193-
clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId);
194+
clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId);
194195
GetMessageResult result = new GetMessageResult();
195196
result.setStatus(GetMessageStatus.FOUND);
196197
result.getMessageQueueOffset().add(100L);
@@ -231,7 +232,7 @@ public void getMessageAsyncTest() throws Exception {
231232

232233
// fifo block
233234
PopConsumerContext context = new PopConsumerContext(
234-
clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId);
235+
clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId);
235236
consumerService.setFifoBlocked(context, groupId, topicId, queueId, Collections.singletonList(100L));
236237
Mockito.when(brokerController.getConsumerOrderInfoManager()
237238
.checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong())).thenReturn(true);
@@ -257,7 +258,7 @@ public void getMessageAsyncTest() throws Exception {
257258

258259
// fifo block test
259260
context = new PopConsumerContext(
260-
clientHost, System.currentTimeMillis(), 20000, groupId, true, attemptId);
261+
clientHost, System.currentTimeMillis(), 20000, groupId, true, ConsumeInitMode.MIN, attemptId);
261262
future = CompletableFuture.completedFuture(context);
262263
Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId,
263264
10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount());
@@ -306,7 +307,7 @@ public void popAsyncTest() {
306307

307308
// pop broker
308309
consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(),
309-
20000, groupId, topicId, -1, 10, false, attemptId, null).join();
310+
20000, groupId, topicId, -1, 10, false, attemptId, ConsumeInitMode.MIN, null).join();
310311
}
311312

312313
@Test

0 commit comments

Comments
 (0)