Skip to content

Commit e60b67a

Browse files
RongtongJinRongtongJin
andauthored
[ISSUE #9838] IndexStoreService use forceShutdown when disk is not writable (#9839)
* Update store and tieredstore implementation Change-Id: I29c931148110c4238ce9a256ab2bf69d7b6139a8 Co-developed-by: Cursor <noreply@cursor.com> * Fix UT NPE Change-Id: I699ec538118166743308af9e78da50f2aa1b56b3 --------- Co-authored-by: RongtongJin <user@example.com>
1 parent 3b78ab0 commit e60b67a

5 files changed

Lines changed: 30 additions & 4 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ public boolean isWriteable() {
8585
return false;
8686
}
8787

88+
public boolean isStoreWriteable() {
89+
if ((this.flagBits & NOT_WRITEABLE_BIT) == 0) {
90+
return true;
91+
}
92+
93+
return false;
94+
}
95+
96+
8897
//for consume queue, just ignore the DISK_FULL_BIT
8998
public boolean isCQWriteable() {
9099
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT | LOGIC_DISK_FULL_BIT)) == 0) {
@@ -94,7 +103,7 @@ public boolean isCQWriteable() {
94103
return false;
95104
}
96105

97-
public boolean getAndMakeNotWriteable() {
106+
public boolean getAndMakeStoreNotWriteable() {
98107
boolean result = this.isWriteable();
99108
if (result) {
100109
this.flagBits |= NOT_WRITEABLE_BIT;

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ public boolean getAndMakeNotWriteable() {
592592
if (runningFlags == null) {
593593
return false;
594594
}
595-
return runningFlags.getAndMakeNotWriteable();
595+
return runningFlags.getAndMakeStoreNotWriteable();
596596
}
597597

598598
public boolean isWriteable() {

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int q
338338
.thenApply(time -> {
339339
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
340340
.put(TieredStoreMetricsConstant.LABEL_OPERATION,
341-
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
341+
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
342342
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
343343
.build();
344344
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
@@ -465,8 +465,13 @@ public synchronized void shutdown() {
465465
dispatcher.shutdown();
466466
}
467467
if (indexService != null) {
468-
indexService.shutdown();
468+
if (defaultStore.getRunningFlags() != null && defaultStore.getRunningFlags().isStoreWriteable()) {
469+
indexService.shutdown();
470+
} else {
471+
indexService.forceShutdown();
472+
}
469473
}
474+
470475
if (flatFileStore != null) {
471476
flatFileStore.shutdown();
472477
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ default void forceUpload() {
6060
*/
6161
void shutdown();
6262

63+
/**
64+
* Force shutdown the index service.
65+
*/
66+
default void forceShutdown() {
67+
shutdown();
68+
};
69+
6370
/**
6471
* Destroys the index service and releases all resources.
6572
*/

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,11 @@ public void shutdown() {
414414
}
415415
}
416416

417+
@Override
418+
public void forceShutdown() {
419+
super.shutdown();
420+
}
421+
417422
@Override
418423
public void run() {
419424
while (!this.isStopped()) {

0 commit comments

Comments
 (0)