diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index dde13a5ed73..1138ff4afe9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -162,17 +162,15 @@ public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult, return result; } - public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMessageResult result, + public PopConsumerContext handleGetMessageResult(PopConsumerContext context, GetMessageResult result, String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) { - if (result.getStatus() == GetMessageStatus.FOUND && !result.getMessageQueueOffset().isEmpty()) { + if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) { if (context.isFifo()) { this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset()); } - - // build request header here + // build response header here context.addGetMessageResult(result, topicId, queueId, retryType, offset); - if (brokerConfig.isPopConsumerKVServiceLog()) { log.info("PopConsumerService pop, time={}, invisible={}, " + "groupId={}, topic={}, queueId={}, offset={}, attemptId={}", @@ -181,20 +179,23 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes } } - if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) { + long commitOffset = offset; + if (context.isFifo()) { + if (!GetMessageStatus.FOUND.equals(result.getStatus())) { + commitOffset = result.getNextBeginOffset(); + } + } else { this.brokerController.getConsumerOffsetManager().commitPullOffset( context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset()); - long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset(); if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) { long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId); if (minOffset != OFFSET_NOT_EXIST) { commitOffset = minOffset; } } - this.brokerController.getConsumerOffsetManager().commitOffset( - context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset); } - + this.brokerController.getConsumerOffsetManager().commitOffset( + context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset); return context; } @@ -310,7 +311,7 @@ protected CompletableFuture getMessageAsync(CompletableFutur } else { final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode()); return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter) - .thenApply(getMessageResult -> addGetMessageResult( + .thenApply(getMessageResult -> handleGetMessageResult( result, getMessageResult, topicId, queueId, retryType, consumeOffset)); } }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index dd8314b7e0d..d73acc84df6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -680,7 +680,9 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, CompletableFuture future = new CompletableFuture<>(); if (!queueLockManager.tryLock(lockKey)) { try { - restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; + if (!requestHeader.isOrder()) { + restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; + } future.complete(restNum); } catch (ConsumeQueueException e) { future.completeExceptionally(e); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 7fb619f7400..9c23a8625eb 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -195,7 +195,7 @@ public void addGetMessageResultTest() { GetMessageResult result = new GetMessageResult(); result.setStatus(GetMessageStatus.FOUND); result.getMessageQueueOffset().add(100L); - consumerService.addGetMessageResult( + consumerService.handleGetMessageResult( context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100); Assert.assertEquals(1, context.getGetMessageResultList().size()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index fc6bc4213a3..360523be852 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -875,7 +875,7 @@ public GetMessageResult getMessage(final String group, final String topic, final boolean isInMem = estimateInMemByCommitOffset(offsetPy, maxOffsetPy); - if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() > maxFilterMessageSize) { + if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() >= maxFilterMessageSize) { break; }