Skip to content

Commit 103b7bc

Browse files
authored
[ISSUE #10229] Fix server-side reset offset does not take effect for FIFO (orderly) pop consumers (#10230)
1 parent 35c88e3 commit 103b7bc

1 file changed

Lines changed: 23 additions & 18 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@
1919
import com.alibaba.fastjson2.JSON;
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.base.Stopwatch;
22+
import java.nio.ByteBuffer;
23+
import java.nio.file.Paths;
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.Queue;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.ConcurrentMap;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.Semaphore;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicLong;
2239
import org.apache.commons.lang3.StringUtils;
2340
import org.apache.commons.lang3.tuple.Triple;
2441
import org.apache.rocketmq.broker.BrokerController;
@@ -52,24 +69,6 @@
5269
import org.slf4j.Logger;
5370
import org.slf4j.LoggerFactory;
5471

55-
import java.nio.ByteBuffer;
56-
import java.nio.file.Paths;
57-
import java.util.ArrayList;
58-
import java.util.Collections;
59-
import java.util.HashMap;
60-
import java.util.List;
61-
import java.util.Objects;
62-
import java.util.Queue;
63-
import java.util.concurrent.CompletableFuture;
64-
import java.util.concurrent.CompletionException;
65-
import java.util.concurrent.ConcurrentHashMap;
66-
import java.util.concurrent.ConcurrentMap;
67-
import java.util.concurrent.LinkedBlockingQueue;
68-
import java.util.concurrent.Semaphore;
69-
import java.util.concurrent.TimeUnit;
70-
import java.util.concurrent.atomic.AtomicBoolean;
71-
import java.util.concurrent.atomic.AtomicLong;
72-
7372
public class PopConsumerService extends ServiceThread {
7473

7574
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -287,6 +286,12 @@ public void setFifoBlocked(PopConsumerContext context,
287286
}
288287

289288
public boolean isFifoBlocked(PopConsumerContext context, String groupId, String topicId, int queueId) {
289+
// If server-side reset offset is enabled, and there is a reset offset,
290+
// then return false to make sure that the reset offset takes effect.
291+
if (brokerController.getBrokerConfig().isUseServerSideResetOffset() &&
292+
this.brokerController.getConsumerOffsetManager().hasOffsetReset(topicId, groupId, queueId)) {
293+
return false;
294+
}
290295
return brokerController.getConsumerOrderInfoManager().checkBlock(
291296
context.getAttemptId(), topicId, groupId, queueId, context.getInvisibleTime());
292297
}

0 commit comments

Comments
 (0)