Skip to content

Commit edaffe6

Browse files
authored
[ISSUE #9695] Not use pull offset when use pop orderly consume (#9696)
1 parent 7967edf commit edaffe6

1 file changed

Lines changed: 9 additions & 3 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,14 @@ public PopConsumerContext handleGetMessageResult(PopConsumerContext context, Get
199199
return context;
200200
}
201201

202-
public long getPopOffset(String groupId, String topicId, int queueId, int initMode) {
203-
long offset = this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
202+
public long getPopOffset(String groupId, String topicId, int queueId, int initMode, boolean fifo) {
203+
204+
// For FIFO messages, the pull offset is not used.
205+
// This preserves compatibility when switching from pull consumer to pop consumer.
206+
long offset = fifo ?
207+
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, queueId) :
208+
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
209+
204210
if (offset < 0L) {
205211
try {
206212
offset = this.brokerController.getPopMessageProcessor()
@@ -309,7 +315,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
309315
result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId));
310316
return CompletableFuture.completedFuture(result);
311317
} else {
312-
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode());
318+
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode(), result.isFifo());
313319
return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter)
314320
.thenApply(getMessageResult -> handleGetMessageResult(
315321
result, getMessageResult, topicId, queueId, retryType, consumeOffset));

0 commit comments

Comments
 (0)