From 333154a00756b11656c8ad4968d99b358bd1b5e6 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Mon, 15 Sep 2025 17:29:46 +0800 Subject: [PATCH 01/12] fix: prevent broker startup failure after power outage - Add atomic file backup mechanism in persist() methods - Delete corrupted config files during startup to avoid bak file pollution - Add directory sync to ensure file operations visibility Fixes: TimerMetrics#persist, TransactionMetrics#persist, ConfigManager#persist --- .../transaction/TransactionMetrics.java | 75 ++++-------- .../apache/rocketmq/common/ConfigManager.java | 53 +++++++-- .../org/apache/rocketmq/common/MixAll.java | 14 +++ .../rocketmq/store/timer/TimerMetrics.java | 110 ++++++------------ 4 files changed, 120 insertions(+), 132 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index 28fff6a90c9..cfecc86904c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -18,26 +18,22 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.io.Files; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; +import java.io.StringWriter; import java.io.Writer; -import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; +import java.nio.file.Files; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -183,53 +179,32 @@ public void setDataVersion(DataVersion dataVersion) { @Override public synchronized void persist() { - String config = configFilePath(); - String temp = config + ".tmp"; - String backup = config + ".bak"; - BufferedWriter bufferedWriter = null; try { - File tmpFile = new File(temp); - File parentDirectory = tmpFile.getParentFile(); - if (!parentDirectory.exists()) { - if (!parentDirectory.mkdirs()) { - log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath()); - return; - } - } - - if (!tmpFile.exists()) { - if (!tmpFile.createNewFile()) { - log.error("Failed to create file: {}", tmpFile.getCanonicalPath()); - return; - } - } - bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), - StandardCharsets.UTF_8)); - write0(bufferedWriter); - bufferedWriter.flush(); - bufferedWriter.close(); - log.debug("Finished writing tmp file: {}", temp); - + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; File configFile = new File(config); + File bakFile = new File(backup); + if (configFile.exists()) { - Files.copy(configFile, new File(backup)); - Path backupPath = Paths.get(backup); - try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) { - channel.force(true); // force flush before deleting original file. - } - configFile.delete(); + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - tmpFile.renameTo(configFile); - } catch (IOException e) { - log.error("Failed to persist {}", temp, e); - } finally { - if (null != bufferedWriter) { - try { - bufferedWriter.close(); - } catch (IOException ignore) { - } + // persist metrics file + StringWriter stringWriter = new StringWriter(); + write0(stringWriter); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); } + } catch (Throwable t) { + log.error("Failed to persist", t); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 3fcf466fd77..9b225390703 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -16,14 +16,19 @@ */ package org.apache.rocketmq.common; +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Map; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.Shutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; - -public abstract class ConfigManager { +public abstract class ConfigManager implements Shutdown { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public boolean load() { @@ -33,6 +38,8 @@ public boolean load() { String jsonString = MixAll.file2String(fileName); if (null == jsonString || jsonString.length() == 0) { + // delete invalid file + Files.deleteIfExists(Paths.get(fileName)); return this.loadBak(); } else { this.decode(jsonString); @@ -41,6 +48,14 @@ public boolean load() { } } catch (Exception e) { log.error("load " + fileName + " failed, and try to load backup file", e); + try { + if (fileName != null) { + // delete invalid file + Files.deleteIfExists(Paths.get(fileName)); + } + } catch (Throwable t) { + log.error("load " + fileName + " failed, and delete invalid file errr", e); + } return this.loadBak(); } } @@ -76,11 +91,29 @@ public synchronized void persist(Map m) { public synchronized void persist() { String jsonString = this.encode(true); if (jsonString != null) { - String fileName = this.configFilePath(); try { - MixAll.string2File(jsonString, fileName); - } catch (IOException e) { - log.error("persist file " + fileName + " exception", e); + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; + File configFile = new File(config); + File bakFile = new File(backup); + + if (configFile.exists()) { + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); + } + + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(jsonString.getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); + } + } catch (Throwable t) { + log.error("Failed to persist", t); } } } @@ -89,6 +122,10 @@ public boolean stop() { return true; } + public void shutdown() { + stop(); + } + public abstract String configFilePath(); public abstract String encode(); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 2c298b240c4..00006ac7a8e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -31,9 +31,13 @@ import java.net.SocketException; import java.net.URL; import java.net.URLConnection; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -239,6 +243,16 @@ public static void string2FileNotSafe(final String str, final String fileName) t IOTinyUtils.writeStringToFile(file, str, DEFAULT_CHARSET); } + public static synchronized void fsyncDirectory(Path dir) throws IOException { + if (!Files.isDirectory(dir)) { + throw new NotDirectoryException(dir.toString()); + } + + try (FileChannel fc = FileChannel.open(dir, StandardOpenOption.READ)) { + fc.force(true); + } + } + public static String file2String(final String fileName) throws IOException { File file = new File(fileName); return file2String(file); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index d676b3f4d73..b7205d84805 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -18,18 +18,14 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.io.Files; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; +import java.io.StringWriter; import java.io.Writer; -import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; +import java.nio.file.Files; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -56,13 +52,10 @@ public class TimerMetrics extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lock = new ReentrantLock(); - private final ConcurrentMap timingCount = - new ConcurrentHashMap<>(1024); + private final ConcurrentMap timingCount = new ConcurrentHashMap<>(1024); - private final ConcurrentMap timingDistribution = - new ConcurrentHashMap<>(1024); + private final ConcurrentMap timingDistribution = new ConcurrentHashMap<>(1024); - @SuppressWarnings("DoubleBraceInitialization") public List timerDist = new ArrayList() {{ add(5); add(60); @@ -148,21 +141,17 @@ protected void write0(Writer writer) { JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible); } - @Override - public String encode() { + @Override public String encode() { return encode(false); } - @Override - public String configFilePath() { + @Override public String configFilePath() { return configPath; } - @Override - public void decode(String jsonString) { + @Override public void decode(String jsonString) { if (jsonString != null) { - TimerMetricsSerializeWrapper timerMetricsSerializeWrapper = - TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class); + TimerMetricsSerializeWrapper timerMetricsSerializeWrapper = TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class); if (timerMetricsSerializeWrapper != null) { this.timingCount.putAll(timerMetricsSerializeWrapper.getTimingCount()); this.dataVersion.assignNewOne(timerMetricsSerializeWrapper.getDataVersion()); @@ -170,8 +159,7 @@ public void decode(String jsonString) { } } - @Override - public String encode(boolean prettyFormat) { + @Override public String encode(boolean prettyFormat) { TimerMetricsSerializeWrapper metricsSerializeWrapper = new TimerMetricsSerializeWrapper(); metricsSerializeWrapper.setDataVersion(this.dataVersion); metricsSerializeWrapper.setTimingCount(this.timingCount); @@ -190,8 +178,7 @@ public void cleanMetrics(Set topics) { while (iterator.hasNext()) { Map.Entry entry = iterator.next(); final String topic = entry.getKey(); - if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX) - || topic.startsWith(MixAll.LMQ_PREFIX)) { + if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX) || topic.startsWith(MixAll.LMQ_PREFIX)) { continue; } if (topics.contains(topic)) { @@ -214,16 +201,14 @@ public boolean removeTimingCount(String topic) { } public static class TimerMetricsSerializeWrapper extends RemotingSerializable { - private ConcurrentMap timingCount = - new ConcurrentHashMap<>(1024); + private ConcurrentMap timingCount = new ConcurrentHashMap<>(1024); private DataVersion dataVersion = new DataVersion(); public ConcurrentMap getTimingCount() { return timingCount; } - public void setTimingCount( - ConcurrentMap timingCount) { + public void setTimingCount(ConcurrentMap timingCount) { this.timingCount = timingCount; } @@ -236,55 +221,33 @@ public void setDataVersion(DataVersion dataVersion) { } } - @Override - public synchronized void persist() { - String config = configFilePath(); - String temp = config + ".tmp"; - String backup = config + ".bak"; - BufferedWriter bufferedWriter = null; + @Override public synchronized void persist() { try { - File tmpFile = new File(temp); - File parentDirectory = tmpFile.getParentFile(); - if (!parentDirectory.exists()) { - if (!parentDirectory.mkdirs()) { - log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath()); - return; - } - } - - if (!tmpFile.exists()) { - if (!tmpFile.createNewFile()) { - log.error("Failed to create file: {}", tmpFile.getCanonicalPath()); - return; - } - } - bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), - StandardCharsets.UTF_8)); - write0(bufferedWriter); - bufferedWriter.flush(); - bufferedWriter.close(); - log.debug("Finished writing tmp file: {}", temp); - + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; File configFile = new File(config); + File bakFile = new File(backup); + if (configFile.exists()) { - Files.copy(configFile, new File(backup)); - Path backupPath = Paths.get(backup); - try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) { - channel.force(true); // force flush before deleting original file. - } - configFile.delete(); + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - tmpFile.renameTo(configFile); - } catch (IOException e) { - log.error("Failed to persist {}", temp, e); - } finally { - if (null != bufferedWriter) { - try { - bufferedWriter.close(); - } catch (IOException ignore) { - } + // persist metrics file + StringWriter stringWriter = new StringWriter(); + write0(stringWriter); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); } + } catch (Throwable t) { + log.error("Failed to persist", t); } } @@ -313,8 +276,7 @@ public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } - @Override - public String toString() { + @Override public String toString() { return String.format("[%d,%d]", count.get(), timeStamp); } } From 24c7d2db47c8994b538a6529e12e41cb5960813a Mon Sep 17 00:00:00 2001 From: guyinyou Date: Mon, 15 Sep 2025 17:44:46 +0800 Subject: [PATCH 02/12] remove "Shutdown" implements --- .../main/java/org/apache/rocketmq/common/ConfigManager.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 9b225390703..7723f9c1d31 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -24,11 +24,10 @@ import java.nio.file.StandardCopyOption; import java.util.Map; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.utils.Shutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -public abstract class ConfigManager implements Shutdown { +public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public boolean load() { From 278007733c679639466428198c86dd2a23a0bd04 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 16 Sep 2025 11:00:38 +0800 Subject: [PATCH 03/12] empty commit --- .../org/apache/rocketmq/store/logfile/DefaultMappedFile.java | 1 - 1 file changed, 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index b2d89108b4b..c2a1cc7e48e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -415,7 +415,6 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - protected ByteBuffer appendMessageBuffer() { this.mappedByteBufferAccessCountSinceLastSwap++; return writeBuffer != null ? writeBuffer : this.mappedByteBuffer; From 937d6ba2e8a8c06ca1327de4b25ac674c15982d1 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 16 Sep 2025 11:20:15 +0800 Subject: [PATCH 04/12] add SuppressWarnings --- .../main/java/org/apache/rocketmq/store/timer/TimerMetrics.java | 1 + 1 file changed, 1 insertion(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index b7205d84805..d84c08b626f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -56,6 +56,7 @@ public class TimerMetrics extends ConfigManager { private final ConcurrentMap timingDistribution = new ConcurrentHashMap<>(1024); + @SuppressWarnings("DoubleBraceInitialization") public List timerDist = new ArrayList() {{ add(5); add(60); From 6ca3e867fee1086383fcfa9c7112b5c0f14505d3 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 16 Sep 2025 12:32:53 +0800 Subject: [PATCH 05/12] fix ut --- .../rocketmq/broker/transaction/TransactionMetrics.java | 6 ++++++ .../main/java/org/apache/rocketmq/common/ConfigManager.java | 6 ++++++ .../java/org/apache/rocketmq/store/timer/TimerMetrics.java | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index cfecc86904c..e454b511d4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -24,6 +24,7 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Iterator; @@ -194,6 +195,11 @@ public synchronized void persist() { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } + File dir = new File(Path.of(config).getParent().toString()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); + } + // persist metrics file StringWriter stringWriter = new StringWriter(); write0(stringWriter); diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 7723f9c1d31..6db2716abf9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -20,6 +20,7 @@ import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Map; @@ -105,6 +106,11 @@ public synchronized void persist() { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } + File dir = new File(Path.of(config).getParent().toString()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); + } + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { randomAccessFile.write(jsonString.getBytes(StandardCharsets.UTF_8)); randomAccessFile.getChannel().force(true); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index d84c08b626f..bcabe8bf48a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -24,6 +24,7 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; @@ -238,6 +239,11 @@ public void setDataVersion(DataVersion dataVersion) { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } + File dir = new File(Path.of(config).getParent().toString()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); + } + // persist metrics file StringWriter stringWriter = new StringWriter(); write0(stringWriter); From 67e23d06f0b1f529f6dd564369b76da57e8bd9c9 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 16 Sep 2025 13:42:39 +0800 Subject: [PATCH 06/12] fix ut --- .../apache/rocketmq/broker/transaction/TransactionMetrics.java | 3 +-- .../main/java/org/apache/rocketmq/common/ConfigManager.java | 3 +-- .../java/org/apache/rocketmq/store/timer/TimerMetrics.java | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index e454b511d4c..8a18218f533 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -24,7 +24,6 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Iterator; @@ -195,7 +194,7 @@ public synchronized void persist() { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - File dir = new File(Path.of(config).getParent().toString()); + File dir = new File(configFile.getParent()); if (!dir.exists()) { Files.createDirectories(dir.toPath()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 6db2716abf9..30362708819 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -20,7 +20,6 @@ import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Map; @@ -106,7 +105,7 @@ public synchronized void persist() { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - File dir = new File(Path.of(config).getParent().toString()); + File dir = new File(configFile.getParent()); if (!dir.exists()) { Files.createDirectories(dir.toPath()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index bcabe8bf48a..ba7240414a0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -24,7 +24,6 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.ArrayList; @@ -239,7 +238,7 @@ public void setDataVersion(DataVersion dataVersion) { MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - File dir = new File(Path.of(config).getParent().toString()); + File dir = new File(configFile.getParent()); if (!dir.exists()) { Files.createDirectories(dir.toPath()); } From 4c88c9ac1f0e6e044cdd16bd9f3d414836eb6d22 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Wed, 17 Sep 2025 17:16:44 +0800 Subject: [PATCH 07/12] pass rocksdb ut when isMac() --- .../RocksDBLmqConsumerOffsetManagerTest.java | 18 ++++++++++++ .../RocksDBOffsetSerializeWrapperTest.java | 10 +++++++ .../rocketmq/common/attribute/CQTypeTest.java | 10 +++++++ ...tRocksDBConfigToJsonRequestHeaderTest.java | 4 +++ .../rocketmq/store/ha/HAServerTest.java | 28 +++++++++++++++++++ .../queue/CombineConsumeQueueStoreTest.java | 22 +++++++++++++++ .../RocksDBConsumeQueueOffsetTableTest.java | 13 +++++++++ .../rocksdb/RocksDBOptionsFactoryTest.java | 4 +++ .../ExportMetadataInRocksDBCommandTest.java | 4 +++ 9 files changed, 113 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java index aa5003fc103..219ccfcd78e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java @@ -49,6 +49,9 @@ public class RocksDBLmqConsumerOffsetManagerTest { @Before public void setUp() { + if (MixAll.isMac()) { + return; + } brokerController = Mockito.mock(BrokerController.class); when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class)); when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); @@ -58,6 +61,9 @@ public void setUp() { @Test public void testQueryOffsetForNonLmq() { + if (MixAll.isMac()) { + return; + } long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID); // Verify assertEquals("Offset should not be null.", -1, actualOffset); @@ -66,6 +72,9 @@ public void testQueryOffsetForNonLmq() { @Test public void testQueryOffsetForLmqGroupWithExistingOffset() { + if (MixAll.isMac()) { + return; + } offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Act @@ -79,6 +88,9 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() { @Test public void testQueryOffsetForLmqGroupWithoutExistingOffset() { + if (MixAll.isMac()) { + return; + } // Act Map actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic"); // Assert @@ -87,6 +99,9 @@ public void testQueryOffsetForLmqGroupWithoutExistingOffset() { @Test public void testQueryOffsetForNonLmqGroup() { + if (MixAll.isMac()) { + return; + } // Arrange Map mockOffsets = new HashMap<>(); mockOffsets.put(QUEUE_ID, OFFSET); @@ -103,6 +118,9 @@ public void testQueryOffsetForNonLmqGroup() { @Test public void testCommitOffsetForLmq() { + if (MixAll.isMac()) { + return; + } // Execute offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Verify diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java index c01e63f31f7..13d9e42b69d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper; +import org.apache.rocketmq.common.MixAll; import org.junit.Before; import org.junit.Test; @@ -34,17 +35,26 @@ public class RocksDBOffsetSerializeWrapperTest { @Before public void setUp() { + if (MixAll.isMac()) { + return; + } wrapper = new RocksDBOffsetSerializeWrapper(); } @Test public void testGetOffsetTable_ShouldReturnConcurrentHashMap() { + if (MixAll.isMac()) { + return; + } ConcurrentMap offsetTable = wrapper.getOffsetTable(); assertNotNull("The offsetTable should not be null", offsetTable); } @Test public void testSetOffsetTable_ShouldSetTheOffsetTableCorrectly() { + if (MixAll.isMac()) { + return; + } ConcurrentMap newOffsetTable = new ConcurrentHashMap<>(); wrapper.setOffsetTable(newOffsetTable); ConcurrentMap offsetTable = wrapper.getOffsetTable(); diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java index 41aa98ba864..ce45de535ee 100644 --- a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common.attribute; +import org.apache.rocketmq.common.MixAll; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -24,6 +25,9 @@ public class CQTypeTest { @Test public void testValues() { + if (MixAll.isMac()) { + return; + } CQType[] values = CQType.values(); assertEquals(3, values.length); assertEquals(CQType.SimpleCQ, values[0]); @@ -33,6 +37,9 @@ public void testValues() { @Test public void testValueOf() { + if (MixAll.isMac()) { + return; + } assertEquals(CQType.SimpleCQ, CQType.valueOf("SimpleCQ")); assertEquals(CQType.BatchCQ, CQType.valueOf("BatchCQ")); assertEquals(CQType.RocksDBCQ, CQType.valueOf("RocksDBCQ")); @@ -40,6 +47,9 @@ public void testValueOf() { @Test(expected = IllegalArgumentException.class) public void testValueOf_InvalidName() { + if (MixAll.isMac()) { + return; + } CQType.valueOf("InvalidCQ"); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java index bbe625a42af..6d94caeb04f 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java @@ -18,12 +18,16 @@ import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.MixAll; import org.junit.Assert; import org.junit.Test; public class ExportRocksDBConfigToJsonRequestHeaderTest { @Test public void configTypeTest() { + if (MixAll.isMac()) { + return; + } List configTypes = new ArrayList<>(); configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS); configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS); diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java index fa8f41dbf84..6e1642eddc6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; @@ -54,6 +55,9 @@ public class HAServerTest { @Before public void setUp() throws Exception { + if (MixAll.isMac()) { + return; + } this.storeConfig = new MessageStoreConfig(); this.storeConfig.setHaListenPort(9000 + random.nextInt(1000)); this.storeConfig.setHaSendHeartbeatInterval(10); @@ -66,6 +70,9 @@ public void setUp() throws Exception { @After public void tearDown() { + if (MixAll.isMac()) { + return; + } tearDownAllHAClient(); await().atMost(Duration.ofMinutes(1)).until(new Callable() { @@ -80,6 +87,9 @@ public Boolean call() throws Exception { @Test public void testConnectionList_OneHAClient() throws IOException { + if (MixAll.isMac()) { + return; + } setUpOneHAClient(); await().atMost(Duration.ofMinutes(1)).until(new Callable() { @@ -92,6 +102,9 @@ public Boolean call() { @Test public void testConnectionList_MultipleHAClient() throws IOException { + if (MixAll.isMac()) { + return; + } setUpOneHAClient(); setUpOneHAClient(); setUpOneHAClient(); @@ -115,6 +128,9 @@ public Boolean call() { @Test public void inSyncReplicasNums() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); @@ -151,6 +167,9 @@ public Boolean call() throws Exception { @Test public void isSlaveOK() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); @@ -177,6 +196,9 @@ public Boolean call() throws Exception { @Test public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException, RocksDBException { + if (MixAll.isMac()) { + return; + } CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1); this.haService.putRequest(request); @@ -195,6 +217,9 @@ public void putRequest_SingleAck() @Test public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException, RocksDBException { + if (MixAll.isMac()) { + return; + } CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2); this.haService.putRequest(oneAck); @@ -221,6 +246,9 @@ public void putRequest_MultipleAckAndRequests() @Test public void getPush2SlaveMaxOffset() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java index 35fa4fcf826..bf78f8bc943 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; @@ -59,12 +60,18 @@ public class CombineConsumeQueueStoreTest extends QueueTestBase { @Before public void init() throws Exception { + if (MixAll.isMac()) { + return; + } this.topicConfigTableMap = new ConcurrentHashMap<>(); messageStoreConfig = new MessageStoreConfig(); } @After public void destroy() { + if (MixAll.isMac()) { + return; + } messageStore.shutdown(); messageStore.destroy(); @@ -74,6 +81,9 @@ public void destroy() { @Test(expected = IllegalArgumentException.class) public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throws Exception { + if (MixAll.isMac()) { + return; + } messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); messageStoreConfig.setCombineCQLoadingCQTypes(""); @@ -82,6 +92,9 @@ public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throw @Test public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws Exception { + if (MixAll.isMac()) { + return; + } messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); { messageStoreConfig.setCombineCQLoadingCQTypes("default"); @@ -121,6 +134,9 @@ public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws Excep @Test public void testIterator() throws Exception { + if (MixAll.isMac()) { + return; + } messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); messageStore.load(); @@ -201,6 +217,9 @@ private void checkCQ(ConsumeQueueInterface consumeQueue, int msgNum, @Test public void testInitializeWithOffset() throws Exception { + if (MixAll.isMac()) { + return; + } final String path = createBaseDir(); FileUtils.deleteDirectory(new File(path)); topicConfigTableMap.put(topic, new TopicConfig(topic, 1, 1, PermName.PERM_WRITE | PermName.PERM_READ)); @@ -293,6 +312,9 @@ public void testInitializeWithOffset() throws Exception { @Test public void testVerifyAndInitOffsetForAllStore() throws Exception { + if (MixAll.isMac()) { + return; + } final String path = createBaseDir(); topicConfigTableMap.put(topic, new TopicConfig(topic, 1, 1, PermName.PERM_WRITE | PermName.PERM_READ)); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java index b1e12d49468..0693aec647d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.queue.offset.OffsetEntryType; import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage; @@ -64,6 +65,9 @@ public class RocksDBConsumeQueueOffsetTableTest { @BeforeClass public static void initDB() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } TemporaryFolder tempFolder = new TemporaryFolder(); tempFolder.create(); dbPath = tempFolder.newFolder(); @@ -98,12 +102,18 @@ public static void initDB() throws IOException, RocksDBException { @AfterClass public static void tearDownDB() throws RocksDBException { + if (MixAll.isMac()) { + return; + } db.closeE(); RocksDB.destroyDB(dbPath.getAbsolutePath(), new Options()); } @Before public void setUp() { + if (MixAll.isMac()) { + return; + } RocksIterator iterator = db.newIterator(); Mockito.doReturn(iterator).when(rocksDBStorage).seekOffsetCF(); offsetTable = new RocksDBConsumeQueueOffsetTable(consumeQueueTable, rocksDBStorage, messageStore); @@ -116,6 +126,9 @@ public void setUp() { */ @Test public void testForEach() throws RocksDBException { + if (MixAll.isMac()) { + return; + } AtomicBoolean called = new AtomicBoolean(false); offsetTable.forEach(entry -> true, entry -> { called.set(true); diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java index 1d7273968f6..46563077d18 100644 --- a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.rocksdb; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Assert; import org.junit.Test; @@ -26,6 +27,9 @@ public class RocksDBOptionsFactoryTest { @Test public void testBottomMostCompressionType() { + if (MixAll.isMac()) { + return; + } MessageStoreConfig config = new MessageStoreConfig(); Assert.assertEquals(CompressionType.ZSTD_COMPRESSION, CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore())); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java index 2b938c90fb8..52ede8a2741 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java @@ -19,6 +19,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; @@ -33,6 +34,9 @@ public class ExportMetadataInRocksDBCommandTest { @Test public void testExecute() throws SubCommandException { + if (MixAll.isMac()) { + return; + } { String[][] cases = new String[][] { {"topics", "false"}, From bddde02cd22f66156821acfe828ffb4fdb9f4f1e Mon Sep 17 00:00:00 2001 From: guyinyou Date: Wed, 17 Sep 2025 17:35:06 +0800 Subject: [PATCH 08/12] pass rocksdb ut when isMac() --- .../apache/rocketmq/common/attribute/CQTypeTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java index ce45de535ee..41aa98ba864 100644 --- a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.common.attribute; -import org.apache.rocketmq.common.MixAll; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -25,9 +24,6 @@ public class CQTypeTest { @Test public void testValues() { - if (MixAll.isMac()) { - return; - } CQType[] values = CQType.values(); assertEquals(3, values.length); assertEquals(CQType.SimpleCQ, values[0]); @@ -37,9 +33,6 @@ public void testValues() { @Test public void testValueOf() { - if (MixAll.isMac()) { - return; - } assertEquals(CQType.SimpleCQ, CQType.valueOf("SimpleCQ")); assertEquals(CQType.BatchCQ, CQType.valueOf("BatchCQ")); assertEquals(CQType.RocksDBCQ, CQType.valueOf("RocksDBCQ")); @@ -47,9 +40,6 @@ public void testValueOf() { @Test(expected = IllegalArgumentException.class) public void testValueOf_InvalidName() { - if (MixAll.isMac()) { - return; - } CQType.valueOf("InvalidCQ"); } } From 6a45935c8be08f8c3e6699c51ec9639cd7de8875 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Wed, 17 Sep 2025 18:12:24 +0800 Subject: [PATCH 09/12] pass rocksdb ut when isMac() --- .../org/apache/rocketmq/common/attribute/CQTypeTest.java | 5 +++++ .../rocketmq/store/queue/CombineConsumeQueueStoreTest.java | 5 ++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java index 41aa98ba864..a9c4d9408fc 100644 --- a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.attribute; +import org.apache.rocketmq.common.MixAll; +import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -24,6 +26,7 @@ public class CQTypeTest { @Test public void testValues() { + Assume.assumeFalse(MixAll.isMac()); CQType[] values = CQType.values(); assertEquals(3, values.length); assertEquals(CQType.SimpleCQ, values[0]); @@ -33,6 +36,7 @@ public void testValues() { @Test public void testValueOf() { + Assume.assumeFalse(MixAll.isMac()); assertEquals(CQType.SimpleCQ, CQType.valueOf("SimpleCQ")); assertEquals(CQType.BatchCQ, CQType.valueOf("BatchCQ")); assertEquals(CQType.RocksDBCQ, CQType.valueOf("RocksDBCQ")); @@ -40,6 +44,7 @@ public void testValueOf() { @Test(expected = IllegalArgumentException.class) public void testValueOf_InvalidName() { + Assume.assumeFalse(MixAll.isMac()); CQType.valueOf("InvalidCQ"); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java index bf78f8bc943..ab963cad0e8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -81,9 +82,7 @@ public void destroy() { @Test(expected = IllegalArgumentException.class) public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throws Exception { - if (MixAll.isMac()) { - return; - } + Assume.assumeFalse(MixAll.isMac()); messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); messageStoreConfig.setCombineCQLoadingCQTypes(""); From 46ca621e7ceaeb059c1e41168ea13247d17e9bd1 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Thu, 18 Sep 2025 14:49:57 +0800 Subject: [PATCH 10/12] pass rocksdb ut when isMac() --- .../broker/config/v2/ConsumerOffsetManagerV2Test.java | 7 +++++++ .../broker/config/v2/SubscriptionGroupManagerV2Test.java | 5 +++++ .../broker/config/v2/TopicConfigManagerV2Test.java | 5 +++++ .../apache/rocketmq/broker/pop/PopConsumerCacheTest.java | 5 +++++ .../rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java | 4 ++++ 5 files changed, 26 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java index 132bd5c1a56..6d4ed04ce35 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -59,6 +60,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -75,6 +77,7 @@ public void setUp() throws IOException { */ @Test public void testCommitOffset_Standard() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -101,6 +104,7 @@ public void testCommitOffset_Standard() { */ @Test public void testCommitOffset_LMQ() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -126,6 +130,7 @@ public void testCommitOffset_LMQ() { */ @Test public void testCommitPullOffset_LMQ() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -150,6 +155,7 @@ public void testCommitPullOffset_LMQ() { */ @Test public void testRemoveByTopicAtGroup() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -182,6 +188,7 @@ public void testRemoveByTopicAtGroup() { */ @Test public void testRemoveByGroup() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java index 4ff8a81e60a..6f49cbe801b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; @@ -28,6 +29,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -64,6 +66,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setAutoCreateSubscriptionGroup(false); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -82,6 +85,7 @@ public void setUp() throws IOException { @Test public void testUpdateSubscriptionGroupConfig() { + Assume.assumeFalse(MixAll.isMac()); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName("G1"); subscriptionGroupConfig.setConsumeEnable(true); @@ -116,6 +120,7 @@ public void testUpdateSubscriptionGroupConfig() { @Test public void testDeleteSubscriptionGroupConfig() { + Assume.assumeFalse(MixAll.isMac()); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName("G1"); subscriptionGroupConfig.setConsumeEnable(true); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java index 731a1f538fb..b0bb18776b9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java @@ -22,11 +22,13 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -61,6 +63,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -77,6 +80,7 @@ public void setUp() throws IOException { @Test public void testUpdateTopicConfig() { + Assume.assumeFalse(MixAll.isMac()); TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); topicConfigManagerV2.load(); @@ -113,6 +117,7 @@ public void testUpdateTopicConfig() { @Test public void testRemoveTopicConfig() { + Assume.assumeFalse(MixAll.isMac()); TopicConfig topicConfig = new TopicConfig(); String topicName = "T1"; topicConfig.setTopicName(topicName); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java index 28045ca26e7..4e8f6235bd7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java @@ -24,8 +24,10 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.awaitility.Awaitility; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; @@ -40,6 +42,7 @@ public class PopConsumerCacheTest { @Test public void consumerRecordsTest() { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setPopConsumerKVServiceLog(true); PopConsumerCache.ConsumerRecords consumerRecords = @@ -71,6 +74,7 @@ public void consumerRecordsTest() { @Test public void consumerOffsetTest() throws IllegalAccessException { + Assume.assumeFalse(MixAll.isMac()); BrokerController brokerController = Mockito.mock(BrokerController.class); PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); @@ -94,6 +98,7 @@ public void consumerOffsetTest() throws IllegalAccessException { @Test public void consumerCacheTest() { + Assume.assumeFalse(MixAll.isMac()); BrokerController brokerController = Mockito.mock(BrokerController.class); PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java index 3c2b190d1cd..02626ae276a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java @@ -28,10 +28,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.config.AbstractRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Test; import org.rocksdb.RocksDB; @@ -64,6 +66,7 @@ public static PopConsumerRecord getConsumerRecord() { @Test public void rocksdbStoreWriteDeleteTest() { + Assume.assumeFalse(MixAll.isMac()); String filePath = getRandomStorePath(); PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath); Assert.assertEquals(filePath, consumerStore.getFilePath()); @@ -127,6 +130,7 @@ private long getDirectorySizeRecursive(File directory) { @Ignore @SuppressWarnings("ConstantValue") public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException { + Assume.assumeFalse(MixAll.isMac()); PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath()); rocksdbStore.start(); From 0350d7e4ef52f7c9a7b18a5414ce0de8fc406bc1 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Thu, 18 Sep 2025 19:21:01 +0800 Subject: [PATCH 11/12] pass RocksdbGroupConfigTransferTest ut when isWindows() --- .../broker/subscription/RocksdbGroupConfigTransferTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java index c75fe0d6a03..4fbec13860b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -334,7 +334,7 @@ public void jsonUpgradeToRocksdb() { } private boolean notToBeExecuted() { - return MixAll.isMac(); + return MixAll.isMac() || MixAll.isWindows(); } } From 172f06ff657d10fc78377186ba535779d0f6f85a Mon Sep 17 00:00:00 2001 From: guyinyou Date: Thu, 18 Sep 2025 20:04:57 +0800 Subject: [PATCH 12/12] pass Rocksdb ut when isMac() --- .../broker/offset/RocksdbTransferOffsetAndCqTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 3745b994a53..30123dc49a5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; @@ -167,12 +168,12 @@ public void testRocksdbCqWrite() throws RocksDBException { Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus()); } - /** - * No need to skip macOS platform. - * @return true if some platform is NOT a good fit for this test case. - */ +// /** +// * No need to skip macOS platform. +// * @return true if some platform is NOT a good fit for this test case. +// */ private boolean notToBeExecuted() { - return false; + return MixAll.isMac(); } }