Skip to content

Commit 26bfbe6

Browse files
lizhiminsRongtongJin
authored andcommitted
[ISSUE #9371] Delete ConsumeQueue index before CommitLog in tiered storage (#9372)
1 parent a867938 commit 26bfbe6

File tree

3 files changed

+7
-10
lines changed

3 files changed

+7
-10
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,17 +124,17 @@ public List<FileSegment> getFileSegmentList() {
124124

125125
public long getMinOffset() {
126126
List<FileSegment> list = this.fileSegmentTable;
127-
return list.isEmpty() ? GET_FILE_SIZE_ERROR : list.get(0).getBaseOffset();
127+
return list.isEmpty() ? 0L : list.get(0).getBaseOffset();
128128
}
129129

130130
public long getCommitOffset() {
131131
List<FileSegment> list = this.fileSegmentTable;
132-
return list.isEmpty() ? GET_FILE_SIZE_ERROR : list.get(list.size() - 1).getCommitOffset();
132+
return list.isEmpty() ? 0L : list.get(list.size() - 1).getCommitOffset();
133133
}
134134

135135
public long getAppendOffset() {
136136
List<FileSegment> list = this.fileSegmentTable;
137-
return list.isEmpty() ? GET_FILE_SIZE_ERROR : list.get(list.size() - 1).getAppendOffset();
137+
return list.isEmpty() ? 0L : list.get(list.size() - 1).getAppendOffset();
138138
}
139139

140140
public long getMinTimestamp() {

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void destroyExpiredFile(long expireTimestamp) {
7777
super.destroyExpiredFile(expireTimestamp);
7878
long afterOffset = this.getMinOffset();
7979

80-
if (beforeOffset != afterOffset) {
80+
if (beforeOffset != afterOffset && afterOffset > 0) {
8181
log.info("CommitLog min cq offset reset, filePath={}, offset={}, expireTimestamp={}, change={}-{}",
8282
filePath, firstOffset.get(), expireTimestamp, beforeOffset, afterOffset);
8383
firstOffset.set(GET_OFFSET_ERROR);

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ConcurrentMap;
2727
import java.util.concurrent.Semaphore;
28-
import java.util.concurrent.atomic.AtomicLong;
2928
import java.util.concurrent.locks.Lock;
3029
import java.util.concurrent.locks.ReentrantLock;
3130
import org.apache.commons.lang3.StringUtils;
@@ -59,7 +58,6 @@ public class FlatMessageFile implements FlatFileInterface {
5958
protected final MetadataStore metadataStore;
6059
protected final FlatCommitLogFile commitLog;
6160
protected final FlatConsumeQueueFile consumeQueue;
62-
protected final AtomicLong lastDestroyTime;
6361

6462
protected final ConcurrentMap<String, CompletableFuture<?>> inFlightRequestMap;
6563

@@ -77,7 +75,6 @@ public FlatMessageFile(FlatFileFactory fileFactory, String filePath) {
7775
this.metadataStore = fileFactory.getMetadataStore();
7876
this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
7977
this.consumeQueue = fileFactory.createFlatFileForConsumeQueue(filePath);
80-
this.lastDestroyTime = new AtomicLong();
8178
this.inFlightRequestMap = new ConcurrentHashMap<>();
8279
}
8380

@@ -388,8 +385,8 @@ public void shutdown() {
388385
closed = true;
389386
fileLock.lock();
390387
try {
391-
commitLog.shutdown();
392388
consumeQueue.shutdown();
389+
commitLog.shutdown();
393390
} finally {
394391
fileLock.unlock();
395392
}
@@ -399,8 +396,8 @@ public void shutdown() {
399396
public void destroyExpiredFile(long timestamp) {
400397
fileLock.lock();
401398
try {
402-
commitLog.destroyExpiredFile(timestamp);
403399
consumeQueue.destroyExpiredFile(timestamp);
400+
commitLog.destroyExpiredFile(timestamp);
404401
} finally {
405402
fileLock.unlock();
406403
}
@@ -410,8 +407,8 @@ public void destroy() {
410407
this.shutdown();
411408
fileLock.lock();
412409
try {
413-
commitLog.destroyExpiredFile(Long.MAX_VALUE);
414410
consumeQueue.destroyExpiredFile(Long.MAX_VALUE);
411+
commitLog.destroyExpiredFile(Long.MAX_VALUE);
415412
if (queueMetadata != null) {
416413
metadataStore.deleteQueue(queueMetadata.getQueue());
417414
}

0 commit comments

Comments
 (0)