Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PopConsumerContext {

private final boolean fifo;

private final int initMode;

private final String attemptId;

private final AtomicLong restCount;
Expand All @@ -50,13 +52,14 @@ public class PopConsumerContext {
private List<PopConsumerRecord> popConsumerRecordList;

public PopConsumerContext(String clientHost,
long popTime, long invisibleTime, String groupId, boolean fifo, String attemptId) {
long popTime, long invisibleTime, String groupId, boolean fifo, int initMode, String attemptId) {

this.clientHost = clientHost;
this.popTime = popTime;
this.invisibleTime = invisibleTime;
this.groupId = groupId;
this.fifo = fifo;
this.initMode = initMode;
this.attemptId = attemptId;
this.restCount = new AtomicLong(0);
this.startOffsetInfo = new StringBuilder();
Expand Down Expand Up @@ -120,6 +123,10 @@ public boolean isFifo() {
return fifo;
}

public int getInitMode() {
return initMode;
}

public long getInvisibleTime() {
return invisibleTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -197,7 +198,18 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
return context;
}

public Long getPopOffset(String groupId, String topicId, int queueId) {
public long getPopOffset(String groupId, String topicId, int queueId, int initMode) {
long offset = this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
if (offset < 0L) {
try {
offset = this.brokerController.getPopMessageProcessor()
.getInitOffset(topicId, groupId, queueId, initMode, true);
log.info("PopConsumerService init offset, groupId={}, topicId={}, queueId={}, init={}, offset={}",
groupId, topicId, queueId, ConsumeInitMode.MIN == initMode ? "min" : "max", offset);
} catch (ConsumeQueueException e) {
throw new RuntimeException(e);
}
}
Long resetOffset =
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
if (resetOffset != null) {
Expand All @@ -206,7 +218,7 @@ public Long getPopOffset(String groupId, String topicId, int queueId) {
this.brokerController.getConsumerOffsetManager()
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
}
return resetOffset;
return resetOffset != null ? resetOffset : offset;
}

public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
Expand All @@ -215,9 +227,6 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}",
groupId, topicId, offset, queueId, batchSize, filter != null);

Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
final long currentOffset = resetOffset != null ? resetOffset : offset;

CompletableFuture<GetMessageResult> getMessageFuture =
brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter);

Expand All @@ -240,7 +249,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,

log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " +
"groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}",
groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset());
groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset());

return brokerController.getMessageStore().getMessageAsync(
groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter);
Expand Down Expand Up @@ -299,7 +308,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId));
return CompletableFuture.completedFuture(result);
} else {
long consumeOffset = brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId);
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode());
return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter)
.thenApply(getMessageResult -> addGetMessageResult(
result, getMessageResult, topicId, queueId, retryType, consumeOffset));
Expand All @@ -308,11 +317,11 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
}

public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime,
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId,
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode,
MessageFilter filter) {

PopConsumerContext popConsumerContext =
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, attemptId);
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId);

TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId);
if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
CompletableFuture<PopConsumerContext> popAsyncFuture = brokerController.getPopConsumerService().popAsync(
RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getAttemptId(), messageFilter);
requestHeader.getMaxMsgNums(), requestHeader.isOrder(),
requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter);

popAsyncFuture.thenApply(result -> {
if (result.isFound()) {
Expand Down Expand Up @@ -888,7 +889,7 @@ private long getPopOffset(String topic, String group, int queueId, int initMode,
}
}

private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init)
public long getInitOffset(String topic, String group, int queueId, int initMode, boolean init)
throws ConsumeQueueException {
long offset;
if (ConsumeInitMode.MIN == initMode || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.pop;

import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
Expand All @@ -29,7 +30,7 @@ public class PopConsumerContextTest {
public void consumerContextTest() {
long popTime = System.currentTimeMillis();
PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789",
popTime, 20_000, "GroupId", true, "attemptId");
popTime, 20_000, "GroupId", true, ConsumeInitMode.MIN, "attemptId");

Assert.assertFalse(context.isFound());
Assert.assertEquals("127.0.0.1:6789", context.getClientHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -190,7 +191,7 @@ public void recodeRetryMessageTest() throws Exception {
@Test
public void addGetMessageResultTest() {
PopConsumerContext context = new PopConsumerContext(
clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId);
clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId);
GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
result.getMessageQueueOffset().add(100L);
Expand Down Expand Up @@ -231,7 +232,7 @@ public void getMessageAsyncTest() throws Exception {

// fifo block
PopConsumerContext context = new PopConsumerContext(
clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId);
clientHost, System.currentTimeMillis(), 20000, groupId, false, ConsumeInitMode.MIN, attemptId);
consumerService.setFifoBlocked(context, groupId, topicId, queueId, Collections.singletonList(100L));
Mockito.when(brokerController.getConsumerOrderInfoManager()
.checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong())).thenReturn(true);
Expand All @@ -257,7 +258,7 @@ public void getMessageAsyncTest() throws Exception {

// fifo block test
context = new PopConsumerContext(
clientHost, System.currentTimeMillis(), 20000, groupId, true, attemptId);
clientHost, System.currentTimeMillis(), 20000, groupId, true, ConsumeInitMode.MIN, attemptId);
future = CompletableFuture.completedFuture(context);
Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId,
10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount());
Expand Down Expand Up @@ -306,7 +307,7 @@ public void popAsyncTest() {

// pop broker
consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(),
20000, groupId, topicId, -1, 10, false, attemptId, null).join();
20000, groupId, topicId, -1, 10, false, attemptId, ConsumeInitMode.MIN, null).join();
}

@Test
Expand Down
Loading