Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,15 @@ public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult,
return result;
}

public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMessageResult result,
public PopConsumerContext handleGetMessageResult(PopConsumerContext context, GetMessageResult result,
String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) {

if (result.getStatus() == GetMessageStatus.FOUND && !result.getMessageQueueOffset().isEmpty()) {
if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) {
if (context.isFifo()) {
this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset());
}

// build request header here
// build response header here
context.addGetMessageResult(result, topicId, queueId, retryType, offset);

if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService pop, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={}, attemptId={}",
Expand All @@ -181,20 +179,23 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
}
}

if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) {
long commitOffset = offset;
if (context.isFifo()) {
if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
commitOffset = result.getNextBeginOffset();
}
} else {
this.brokerController.getConsumerOffsetManager().commitPullOffset(
context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset());
long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset();
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId);
if (minOffset != OFFSET_NOT_EXIST) {
commitOffset = minOffset;
}
}
this.brokerController.getConsumerOffsetManager().commitOffset(
context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset);
}

this.brokerController.getConsumerOffsetManager().commitOffset(
context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset);
return context;
}

Expand Down Expand Up @@ -310,7 +311,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
} else {
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode());
return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter)
.thenApply(getMessageResult -> addGetMessageResult(
.thenApply(getMessageResult -> handleGetMessageResult(
result, getMessageResult, topicId, queueId, retryType, consumeOffset));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,9 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
CompletableFuture<Long> future = new CompletableFuture<>();
if (!queueLockManager.tryLock(lockKey)) {
try {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
if (!requestHeader.isOrder()) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
future.complete(restNum);
} catch (ConsumeQueueException e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void addGetMessageResultTest() {
GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
result.getMessageQueueOffset().add(100L);
consumerService.addGetMessageResult(
consumerService.handleGetMessageResult(
context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100);
Assert.assertEquals(1, context.getGetMessageResultList().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ public GetMessageResult getMessage(final String group, final String topic, final

boolean isInMem = estimateInMemByCommitOffset(offsetPy, maxOffsetPy);

if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() > maxFilterMessageSize) {
if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() >= maxFilterMessageSize) {
break;
}

Expand Down
Loading