Skip to content

Commit e6a587a

Browse files
authored
[ISSUE #9626] Prevent premature offset commit before consumer record flush (#9627)
* [ISSUE #9626] Prevent premature offset commit before consumer record flush
1 parent f40a69f commit e6a587a

2 files changed

Lines changed: 54 additions & 63 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java

Lines changed: 48 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
import java.util.Iterator;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.TreeMap;
2423
import java.util.concurrent.ConcurrentHashMap;
2524
import java.util.concurrent.ConcurrentMap;
25+
import java.util.concurrent.ConcurrentSkipListMap;
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.atomic.AtomicInteger;
28-
import java.util.concurrent.locks.Lock;
29-
import java.util.concurrent.locks.ReentrantLock;
3028
import java.util.function.Consumer;
3129
import org.apache.rocketmq.broker.BrokerController;
3230
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -128,33 +126,33 @@ public int cleanupRecords(Consumer<PopConsumerRecord> consumer) {
128126
records.getGroupId(), records.getTopicId());
129127

130128
if (timeout) {
131-
List<PopConsumerRecord> removeExpiredRecords =
132-
records.removeExpiredRecords(Long.MAX_VALUE);
133-
if (removeExpiredRecords != null) {
134-
consumerRecordStore.writeRecords(removeExpiredRecords);
129+
records.stageExpiredRecords(Long.MAX_VALUE);
130+
List<PopConsumerRecord> writeConsumerRecords =
131+
new ArrayList<>(records.getRemoveTreeMap().values());
132+
if (!writeConsumerRecords.isEmpty()) {
133+
consumerRecordStore.writeRecords(writeConsumerRecords);
135134
}
135+
records.clearStagedRecords();
136136
log.info("PopConsumerOffline, so clean expire records, groupId={}, topic={}, queueId={}, records={}",
137-
records.getGroupId(), records.getTopicId(), records.getQueueId(),
138-
removeExpiredRecords != null ? removeExpiredRecords.size() : 0);
137+
records.getGroupId(), records.getTopicId(), records.getQueueId(), records.getInFlightRecordCount());
139138
iterator.remove();
140139
continue;
141140
}
142141

143142
long currentTime = System.currentTimeMillis();
143+
records.stageExpiredRecords(currentTime);
144144
List<PopConsumerRecord> writeConsumerRecords = new ArrayList<>();
145-
List<PopConsumerRecord> consumerRecords = records.removeExpiredRecords(currentTime);
146-
if (consumerRecords != null) {
147-
consumerRecords.forEach(consumerRecord -> {
148-
if (consumerRecord.getVisibilityTimeout() <= currentTime) {
149-
consumer.accept(consumerRecord);
150-
} else {
151-
writeConsumerRecords.add(consumerRecord);
152-
}
153-
});
154-
}
145+
records.getRemoveTreeMap().values().forEach(record -> {
146+
if (record.getVisibilityTimeout() <= currentTime) {
147+
consumer.accept(record);
148+
} else {
149+
writeConsumerRecords.add(record);
150+
}
151+
});
155152

156153
// write to store and handle it later
157154
consumerRecordStore.writeRecords(writeConsumerRecords);
155+
records.clearStagedRecords();
158156

159157
// commit min offset in buffer to offset store
160158
long offset = records.getMinOffsetInBuffer();
@@ -209,72 +207,64 @@ public void run() {
209207

210208
protected static class ConsumerRecords {
211209

212-
private final Lock lock;
213210
private final String groupId;
214211
private final String topicId;
215212
private final int queueId;
216213
private final BrokerConfig brokerConfig;
217-
private final TreeMap<Long /* offset */, PopConsumerRecord> recordTreeMap;
214+
private final ConcurrentSkipListMap<Long /* offset */, PopConsumerRecord> removeTreeMap;
215+
private final ConcurrentSkipListMap<Long /* offset */, PopConsumerRecord> recordTreeMap;
218216

219217
public ConsumerRecords(BrokerConfig brokerConfig, String groupId, String topicId, int queueId) {
220218
this.groupId = groupId;
221219
this.topicId = topicId;
222220
this.queueId = queueId;
223-
this.lock = new ReentrantLock();
224221
this.brokerConfig = brokerConfig;
225-
this.recordTreeMap = new TreeMap<>();
222+
this.removeTreeMap = new ConcurrentSkipListMap<>();
223+
this.recordTreeMap = new ConcurrentSkipListMap<>();
226224
}
227225

228226
public void write(PopConsumerRecord record) {
229-
lock.lock();
230-
try {
231-
recordTreeMap.put(record.getOffset(), record);
232-
} finally {
233-
lock.unlock();
234-
}
227+
recordTreeMap.put(record.getOffset(), record);
235228
}
236229

237230
public boolean delete(PopConsumerRecord record) {
238-
PopConsumerRecord popConsumerRecord;
239-
lock.lock();
240-
try {
241-
popConsumerRecord = recordTreeMap.remove(record.getOffset());
242-
} finally {
243-
lock.unlock();
244-
}
245-
return popConsumerRecord != null;
231+
return recordTreeMap.remove(record.getOffset()) != null;
246232
}
247233

248234
public long getMinOffsetInBuffer() {
249-
Map.Entry<Long, PopConsumerRecord> entry = recordTreeMap.firstEntry();
235+
Map.Entry<Long, PopConsumerRecord> entry = removeTreeMap.firstEntry();
236+
if (entry != null) {
237+
return entry.getKey();
238+
}
239+
entry = recordTreeMap.firstEntry();
250240
return entry != null ? entry.getKey() : OFFSET_NOT_EXIST;
251241
}
252242

253243
public int getInFlightRecordCount() {
254-
return recordTreeMap.size();
244+
return removeTreeMap.size() + recordTreeMap.size();
255245
}
256246

257-
public List<PopConsumerRecord> removeExpiredRecords(long currentTime) {
258-
List<PopConsumerRecord> result = null;
259-
lock.lock();
260-
try {
261-
Iterator<Map.Entry<Long, PopConsumerRecord>> iterator = recordTreeMap.entrySet().iterator();
262-
while (iterator.hasNext()) {
263-
Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
264-
// org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
265-
if (entry.getValue().getVisibilityTimeout() <= currentTime ||
266-
entry.getValue().getPopTime() + brokerConfig.getPopCkStayBufferTime() <= currentTime) {
267-
if (result == null) {
268-
result = new ArrayList<>();
269-
}
270-
result.add(entry.getValue());
271-
iterator.remove();
272-
}
247+
public void stageExpiredRecords(long currentTime) {
248+
Iterator<Map.Entry<Long, PopConsumerRecord>>
249+
iterator = recordTreeMap.entrySet().iterator();
250+
251+
// refer: org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
252+
while (iterator.hasNext()) {
253+
Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
254+
if (entry.getValue().getVisibilityTimeout() <= currentTime ||
255+
entry.getValue().getPopTime() + brokerConfig.getPopCkStayBufferTime() <= currentTime) {
256+
removeTreeMap.put(entry.getKey(), entry.getValue());
257+
iterator.remove();
273258
}
274-
} finally {
275-
lock.unlock();
276259
}
277-
return result;
260+
}
261+
262+
public void clearStagedRecords() {
263+
removeTreeMap.clear();
264+
}
265+
266+
public ConcurrentSkipListMap<Long, PopConsumerRecord> getRemoveTreeMap() {
267+
return removeTreeMap;
278268
}
279269

280270
public String getGroupId() {
@@ -292,7 +282,6 @@ public int getQueueId() {
292282
@Override
293283
public String toString() {
294284
return "ConsumerRecords{" +
295-
"lock=" + lock +
296285
", topicId=" + topicId +
297286
", groupId=" + groupId +
298287
", queueId=" + queueId +

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,12 @@ public void consumerRecordsTest() {
6161
Assert.assertEquals(3, consumerRecords.getInFlightRecordCount());
6262

6363
long bufferTimeout = brokerConfig.getPopCkStayBufferTime();
64-
Assert.assertEquals(1, consumerRecords.removeExpiredRecords(bufferTimeout + 2).size());
65-
Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 2));
66-
Assert.assertEquals(2, consumerRecords.removeExpiredRecords(bufferTimeout + 4).size());
67-
Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 4));
64+
consumerRecords.stageExpiredRecords(bufferTimeout + 2);
65+
Assert.assertEquals(1, consumerRecords.getRemoveTreeMap().size());
66+
consumerRecords.clearStagedRecords();
67+
consumerRecords.stageExpiredRecords(bufferTimeout + 4);
68+
Assert.assertEquals(2, consumerRecords.getRemoveTreeMap().size());
69+
consumerRecords.clearStagedRecords();
6870
}
6971

7072
@Test

0 commit comments

Comments
 (0)