Skip to content

Commit 0e72809

Browse files
guyinyouguyinyou
andauthored
[ISSUE #9705] Improve persist() method reliability to prevent broker startup failure after power outages (#9706)
* fix: prevent broker startup failure after power outage - Add atomic file backup mechanism in persist() methods - Delete corrupted config files during startup to avoid bak file pollution - Add directory sync to ensure file operations visibility Fixes: TimerMetrics#persist, TransactionMetrics#persist, ConfigManager#persist * remove "Shutdown" implements * empty commit * add SuppressWarnings * fix ut * fix ut * pass rocksdb ut when isMac() * pass rocksdb ut when isMac() * pass rocksdb ut when isMac() * pass rocksdb ut when isMac() * pass RocksdbGroupConfigTransferTest ut when isWindows() * pass Rocksdb ut when isMac() --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 73e8fdb commit 0e72809

21 files changed

Lines changed: 271 additions & 135 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java

Lines changed: 29 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,22 @@
1818

1919
import com.alibaba.fastjson.JSON;
2020
import com.alibaba.fastjson.serializer.SerializerFeature;
21-
import com.google.common.io.Files;
22-
import java.io.BufferedWriter;
2321
import java.io.File;
24-
import java.io.FileOutputStream;
25-
import java.io.IOException;
26-
import java.io.OutputStreamWriter;
22+
import java.io.RandomAccessFile;
23+
import java.io.StringWriter;
2724
import java.io.Writer;
28-
import java.nio.channels.FileChannel;
2925
import java.nio.charset.StandardCharsets;
30-
import java.nio.file.Path;
26+
import java.nio.file.Files;
3127
import java.nio.file.Paths;
32-
import java.nio.file.StandardOpenOption;
28+
import java.nio.file.StandardCopyOption;
3329
import java.util.Iterator;
3430
import java.util.Map;
3531
import java.util.Set;
3632
import java.util.concurrent.ConcurrentHashMap;
3733
import java.util.concurrent.ConcurrentMap;
3834
import java.util.concurrent.atomic.AtomicLong;
39-
4035
import org.apache.rocketmq.common.ConfigManager;
36+
import org.apache.rocketmq.common.MixAll;
4137
import org.apache.rocketmq.common.constant.LoggerName;
4238
import org.apache.rocketmq.common.topic.TopicValidator;
4339
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -183,53 +179,37 @@ public void setDataVersion(DataVersion dataVersion) {
183179

184180
@Override
185181
public synchronized void persist() {
186-
String config = configFilePath();
187-
String temp = config + ".tmp";
188-
String backup = config + ".bak";
189-
BufferedWriter bufferedWriter = null;
190182
try {
191-
File tmpFile = new File(temp);
192-
File parentDirectory = tmpFile.getParentFile();
193-
if (!parentDirectory.exists()) {
194-
if (!parentDirectory.mkdirs()) {
195-
log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath());
196-
return;
197-
}
198-
}
183+
// bak metrics file
184+
String config = configFilePath();
185+
String backup = config + ".bak";
186+
File configFile = new File(config);
187+
File bakFile = new File(backup);
199188

200-
if (!tmpFile.exists()) {
201-
if (!tmpFile.createNewFile()) {
202-
log.error("Failed to create file: {}", tmpFile.getCanonicalPath());
203-
return;
204-
}
189+
if (configFile.exists()) {
190+
// atomic move
191+
Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
192+
193+
// sync the directory, ensure that the bak file is visible
194+
MixAll.fsyncDirectory(Paths.get(bakFile.getParent()));
205195
}
206-
bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false),
207-
StandardCharsets.UTF_8));
208-
write0(bufferedWriter);
209-
bufferedWriter.flush();
210-
bufferedWriter.close();
211-
log.debug("Finished writing tmp file: {}", temp);
212196

213-
File configFile = new File(config);
214-
if (configFile.exists()) {
215-
Files.copy(configFile, new File(backup));
216-
Path backupPath = Paths.get(backup);
217-
try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) {
218-
channel.force(true); // force flush before deleting original file.
219-
}
220-
configFile.delete();
197+
File dir = new File(configFile.getParent());
198+
if (!dir.exists()) {
199+
Files.createDirectories(dir.toPath());
221200
}
222201

223-
tmpFile.renameTo(configFile);
224-
} catch (IOException e) {
225-
log.error("Failed to persist {}", temp, e);
226-
} finally {
227-
if (null != bufferedWriter) {
228-
try {
229-
bufferedWriter.close();
230-
} catch (IOException ignore) {
231-
}
202+
// persist metrics file
203+
StringWriter stringWriter = new StringWriter();
204+
write0(stringWriter);
205+
try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) {
206+
randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8));
207+
randomAccessFile.getChannel().force(true);
208+
// sync the directory, ensure that the config file is visible
209+
MixAll.fsyncDirectory(Paths.get(configFile.getParent()));
232210
}
211+
} catch (Throwable t) {
212+
log.error("Failed to persist", t);
233213
}
234214
}
235215

broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.rocketmq.store.config.MessageStoreConfig;
2727
import org.junit.After;
2828
import org.junit.Assert;
29+
import org.junit.Assume;
2930
import org.junit.Before;
3031
import org.junit.Rule;
3132
import org.junit.Test;
@@ -59,6 +60,7 @@ public void cleanUp() {
5960

6061
@Before
6162
public void setUp() throws IOException {
63+
Assume.assumeFalse(MixAll.isMac());
6264
BrokerConfig brokerConfig = new BrokerConfig();
6365
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
6466

@@ -75,6 +77,7 @@ public void setUp() throws IOException {
7577
*/
7678
@Test
7779
public void testCommitOffset_Standard() {
80+
Assume.assumeFalse(MixAll.isMac());
7881
Assert.assertTrue(consumerOffsetManagerV2.load());
7982

8083
String clientHost = "localhost";
@@ -101,6 +104,7 @@ public void testCommitOffset_Standard() {
101104
*/
102105
@Test
103106
public void testCommitOffset_LMQ() {
107+
Assume.assumeFalse(MixAll.isMac());
104108
Assert.assertTrue(consumerOffsetManagerV2.load());
105109

106110
String clientHost = "localhost";
@@ -126,6 +130,7 @@ public void testCommitOffset_LMQ() {
126130
*/
127131
@Test
128132
public void testCommitPullOffset_LMQ() {
133+
Assume.assumeFalse(MixAll.isMac());
129134
Assert.assertTrue(consumerOffsetManagerV2.load());
130135

131136
String clientHost = "localhost";
@@ -150,6 +155,7 @@ public void testCommitPullOffset_LMQ() {
150155
*/
151156
@Test
152157
public void testRemoveByTopicAtGroup() {
158+
Assume.assumeFalse(MixAll.isMac());
153159
Assert.assertTrue(consumerOffsetManagerV2.load());
154160

155161
String clientHost = "localhost";
@@ -182,6 +188,7 @@ public void testRemoveByTopicAtGroup() {
182188
*/
183189
@Test
184190
public void testRemoveByGroup() {
191+
Assume.assumeFalse(MixAll.isMac());
185192
Assert.assertTrue(consumerOffsetManagerV2.load());
186193

187194
String clientHost = "localhost";

broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import java.io.IOException;
2222
import org.apache.rocketmq.broker.BrokerController;
2323
import org.apache.rocketmq.common.BrokerConfig;
24+
import org.apache.rocketmq.common.MixAll;
2425
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
2526
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
2627
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
2728
import org.apache.rocketmq.store.MessageStore;
2829
import org.apache.rocketmq.store.config.MessageStoreConfig;
2930
import org.junit.After;
3031
import org.junit.Assert;
32+
import org.junit.Assume;
3133
import org.junit.Before;
3234
import org.junit.Rule;
3335
import org.junit.Test;
@@ -64,6 +66,7 @@ public void cleanUp() {
6466

6567
@Before
6668
public void setUp() throws IOException {
69+
Assume.assumeFalse(MixAll.isMac());
6770
BrokerConfig brokerConfig = new BrokerConfig();
6871
brokerConfig.setAutoCreateSubscriptionGroup(false);
6972
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
@@ -82,6 +85,7 @@ public void setUp() throws IOException {
8285

8386
@Test
8487
public void testUpdateSubscriptionGroupConfig() {
88+
Assume.assumeFalse(MixAll.isMac());
8589
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
8690
subscriptionGroupConfig.setGroupName("G1");
8791
subscriptionGroupConfig.setConsumeEnable(true);
@@ -116,6 +120,7 @@ public void testUpdateSubscriptionGroupConfig() {
116120

117121
@Test
118122
public void testDeleteSubscriptionGroupConfig() {
123+
Assume.assumeFalse(MixAll.isMac());
119124
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
120125
subscriptionGroupConfig.setGroupName("G1");
121126
subscriptionGroupConfig.setConsumeEnable(true);

broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222

2323
import org.apache.rocketmq.broker.BrokerController;
2424
import org.apache.rocketmq.common.BrokerConfig;
25+
import org.apache.rocketmq.common.MixAll;
2526
import org.apache.rocketmq.common.TopicConfig;
2627
import org.apache.rocketmq.store.MessageStore;
2728
import org.apache.rocketmq.store.config.MessageStoreConfig;
2829
import org.junit.After;
2930
import org.junit.Assert;
31+
import org.junit.Assume;
3032
import org.junit.Before;
3133
import org.junit.Rule;
3234
import org.junit.Test;
@@ -61,6 +63,7 @@ public void cleanUp() {
6163

6264
@Before
6365
public void setUp() throws IOException {
66+
Assume.assumeFalse(MixAll.isMac());
6467
BrokerConfig brokerConfig = new BrokerConfig();
6568
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
6669

@@ -77,6 +80,7 @@ public void setUp() throws IOException {
7780

7881
@Test
7982
public void testUpdateTopicConfig() {
83+
Assume.assumeFalse(MixAll.isMac());
8084
TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
8185
topicConfigManagerV2.load();
8286

@@ -113,6 +117,7 @@ public void testUpdateTopicConfig() {
113117

114118
@Test
115119
public void testRemoveTopicConfig() {
120+
Assume.assumeFalse(MixAll.isMac());
116121
TopicConfig topicConfig = new TopicConfig();
117122
String topicName = "T1";
118123
topicConfig.setTopicName(topicName);

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class RocksDBLmqConsumerOffsetManagerTest {
4949

5050
@Before
5151
public void setUp() {
52+
if (MixAll.isMac()) {
53+
return;
54+
}
5255
brokerController = Mockito.mock(BrokerController.class);
5356
when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
5457
when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
@@ -58,6 +61,9 @@ public void setUp() {
5861

5962
@Test
6063
public void testQueryOffsetForNonLmq() {
64+
if (MixAll.isMac()) {
65+
return;
66+
}
6167
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID);
6268
// Verify
6369
assertEquals("Offset should not be null.", -1, actualOffset);
@@ -66,6 +72,9 @@ public void testQueryOffsetForNonLmq() {
6672

6773
@Test
6874
public void testQueryOffsetForLmqGroupWithExistingOffset() {
75+
if (MixAll.isMac()) {
76+
return;
77+
}
6978
offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
7079

7180
// Act
@@ -79,6 +88,9 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() {
7988

8089
@Test
8190
public void testQueryOffsetForLmqGroupWithoutExistingOffset() {
91+
if (MixAll.isMac()) {
92+
return;
93+
}
8294
// Act
8395
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic");
8496
// Assert
@@ -87,6 +99,9 @@ public void testQueryOffsetForLmqGroupWithoutExistingOffset() {
8799

88100
@Test
89101
public void testQueryOffsetForNonLmqGroup() {
102+
if (MixAll.isMac()) {
103+
return;
104+
}
90105
// Arrange
91106
Map<Integer, Long> mockOffsets = new HashMap<>();
92107
mockOffsets.put(QUEUE_ID, OFFSET);
@@ -103,6 +118,9 @@ public void testQueryOffsetForNonLmqGroup() {
103118

104119
@Test
105120
public void testCommitOffsetForLmq() {
121+
if (MixAll.isMac()) {
122+
return;
123+
}
106124
// Execute
107125
offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
108126
// Verify

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ConcurrentMap;
2323

2424
import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper;
25+
import org.apache.rocketmq.common.MixAll;
2526
import org.junit.Before;
2627
import org.junit.Test;
2728

@@ -34,17 +35,26 @@ public class RocksDBOffsetSerializeWrapperTest {
3435

3536
@Before
3637
public void setUp() {
38+
if (MixAll.isMac()) {
39+
return;
40+
}
3741
wrapper = new RocksDBOffsetSerializeWrapper();
3842
}
3943

4044
@Test
4145
public void testGetOffsetTable_ShouldReturnConcurrentHashMap() {
46+
if (MixAll.isMac()) {
47+
return;
48+
}
4249
ConcurrentMap<Integer, Long> offsetTable = wrapper.getOffsetTable();
4350
assertNotNull("The offsetTable should not be null", offsetTable);
4451
}
4552

4653
@Test
4754
public void testSetOffsetTable_ShouldSetTheOffsetTableCorrectly() {
55+
if (MixAll.isMac()) {
56+
return;
57+
}
4858
ConcurrentMap<Integer, Long> newOffsetTable = new ConcurrentHashMap<>();
4959
wrapper.setOffsetTable(newOffsetTable);
5060
ConcurrentMap<Integer, Long> offsetTable = wrapper.getOffsetTable();

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
3030
import org.apache.rocketmq.common.BrokerConfig;
3131
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
32+
import org.apache.rocketmq.common.MixAll;
3233
import org.apache.rocketmq.common.Pair;
3334
import org.apache.rocketmq.store.DefaultMessageStore;
3435
import org.apache.rocketmq.store.DispatchRequest;
@@ -167,12 +168,12 @@ public void testRocksdbCqWrite() throws RocksDBException {
167168
Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus());
168169
}
169170

170-
/**
171-
* No need to skip macOS platform.
172-
* @return true if some platform is NOT a good fit for this test case.
173-
*/
171+
// /**
172+
// * No need to skip macOS platform.
173+
// * @return true if some platform is NOT a good fit for this test case.
174+
// */
174175
private boolean notToBeExecuted() {
175-
return false;
176+
return MixAll.isMac();
176177
}
177178

178179
}

broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.apache.rocketmq.broker.BrokerController;
2525
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
2626
import org.apache.rocketmq.common.BrokerConfig;
27+
import org.apache.rocketmq.common.MixAll;
2728
import org.awaitility.Awaitility;
2829
import org.junit.Assert;
30+
import org.junit.Assume;
2931
import org.junit.Test;
3032
import org.mockito.Mockito;
3133

@@ -40,6 +42,7 @@ public class PopConsumerCacheTest {
4042

4143
@Test
4244
public void consumerRecordsTest() {
45+
Assume.assumeFalse(MixAll.isMac());
4346
BrokerConfig brokerConfig = new BrokerConfig();
4447
brokerConfig.setPopConsumerKVServiceLog(true);
4548
PopConsumerCache.ConsumerRecords consumerRecords =
@@ -71,6 +74,7 @@ public void consumerRecordsTest() {
7174

7275
@Test
7376
public void consumerOffsetTest() throws IllegalAccessException {
77+
Assume.assumeFalse(MixAll.isMac());
7478
BrokerController brokerController = Mockito.mock(BrokerController.class);
7579
PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class);
7680
PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class);
@@ -94,6 +98,7 @@ public void consumerOffsetTest() throws IllegalAccessException {
9498

9599
@Test
96100
public void consumerCacheTest() {
101+
Assume.assumeFalse(MixAll.isMac());
97102
BrokerController brokerController = Mockito.mock(BrokerController.class);
98103
PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class);
99104
PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class);

0 commit comments

Comments
 (0)