Skip to content

Commit 49be182

Browse files
lizhiminsRongtongJin
authored andcommitted
[ISSUE #9379] Fix timeStoreTable delete logic in IndexService (#9384)
* [ISSUE #9379] Fix timeStoreTable delete logic in IndexService * [ISSUE #9379] Fix delete logic from TimeStoreTable in IndexService
1 parent 15eb188 commit 49be182

3 files changed

Lines changed: 50 additions & 9 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public void destroyExpiredFile(long expireTimestamp) {
253253
FileSegment fileSegment = fileSegmentTable.get(0);
254254

255255
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
256-
fileSegment.getMaxTimestamp() > expireTimestamp) {
256+
fileSegment.getMaxTimestamp() >= expireTimestamp) {
257257
log.debug("FileSegment has not expired, filePath={}, fileType={}, " +
258258
"offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType,
259259
fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp());

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,12 @@ public CompletableFuture<List<IndexItem>> queryAsync(
254254
.whenComplete((v, t) -> {
255255
// Try to return the query results as much as possible here
256256
// rather than directly throwing exceptions
257-
if (result.isEmpty() && t != null) {
258-
future.completeExceptionally(t);
259-
} else {
260-
List<IndexItem> resultList = new ArrayList<>(result.values());
261-
future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount)));
257+
if (t != null) {
258+
log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}",
259+
topic, key, maxCount, beginTime, endTime, t);
262260
}
261+
List<IndexItem> resultList = new ArrayList<>(result.values());
262+
future.complete(resultList.subList(0, Math.min(resultList.size(), maxCount)));
263263
});
264264
} catch (Exception e) {
265265
log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}",
@@ -344,10 +344,15 @@ public void destroyExpiredFile(long expireTimestamp) {
344344
// delete file in time store table
345345
readWriteLock.writeLock().lock();
346346
try {
347-
timeStoreTable.entrySet().removeIf(entry ->
348-
entry.getKey() < expireTimestamp &&
349-
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()));
350347
flatAppendFile.destroyExpiredFile(expireTimestamp);
348+
timeStoreTable.entrySet().removeIf(entry ->
349+
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
350+
entry.getKey() < flatAppendFile.getMinTimestamp());
351+
int tableSize = (int) timeStoreTable.entrySet().stream()
352+
.filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
353+
.count();
354+
log.info("IndexStoreService delete file, timestamp={}, remote={}, table={}, all={}",
355+
expireTimestamp, flatAppendFile.getFileSegmentList().size(), tableSize, timeStoreTable.size());
351356
} finally {
352357
readWriteLock.writeLock().unlock();
353358
}

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicBoolean;
3535
import java.util.concurrent.atomic.AtomicInteger;
36+
import org.apache.commons.lang3.reflect.FieldUtils;
3637
import org.apache.rocketmq.common.ThreadFactoryImpl;
3738
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
3839
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
3940
import org.apache.rocketmq.tieredstore.common.AppendResult;
41+
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
4042
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
4143
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
4244
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
4345
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
4446
import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest;
47+
import org.awaitility.Awaitility;
4548
import org.junit.After;
4649
import org.junit.Assert;
4750
import org.junit.Before;
@@ -206,6 +209,39 @@ public void runServiceTest() throws InterruptedException {
206209
});
207210
}
208211

212+
@Test
213+
public void deleteFileTest() throws InterruptedException, IllegalAccessException {
214+
indexService = new IndexStoreService(fileAllocator, filePath);
215+
indexService.start();
216+
217+
for (int i = 0; i < 2 * 20; i++) {
218+
AppendResult result = indexService.putKey(
219+
TOPIC_NAME, TOPIC_ID, QUEUE_ID, Collections.singleton(String.valueOf(i)),
220+
i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
221+
Assert.assertEquals(AppendResult.SUCCESS, result);
222+
TimeUnit.MILLISECONDS.sleep(1);
223+
}
224+
225+
indexService.wakeup();
226+
Awaitility.await().until(() -> {
227+
int tableSize = (int) indexService.getTimeStoreTable().entrySet().stream()
228+
.filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
229+
.count();
230+
return tableSize == 2;
231+
});
232+
233+
long timestamp = indexService.getTimeStoreTable().firstEntry().getValue().getEndTimestamp();
234+
FlatAppendFile flatAppendFile = (FlatAppendFile)
235+
FieldUtils.readField(indexService, "flatAppendFile", true);
236+
237+
indexService.destroyExpiredFile(timestamp);
238+
Assert.assertEquals(2, flatAppendFile.getFileSegmentList().size());
239+
Assert.assertEquals(3, indexService.getTimeStoreTable().size());
240+
indexService.destroyExpiredFile(timestamp + 1);
241+
Assert.assertEquals(1, flatAppendFile.getFileSegmentList().size());
242+
Assert.assertEquals(2, indexService.getTimeStoreTable().size());
243+
}
244+
209245
@Test
210246
public void restartServiceTest() throws InterruptedException {
211247
indexService = new IndexStoreService(fileAllocator, filePath);

0 commit comments

Comments
 (0)