diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 90006b087e6..32c79febc6d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -294,10 +294,10 @@ public SystemConfigFileHelper() { } public Properties loadConfig() throws Exception { - InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file))); Properties properties = new Properties(); - properties.load(in); - in.close(); + try (InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)))) { + properties.load(in); + } return properties; } diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java index f909e623b21..0a057a42469 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java @@ -405,10 +405,11 @@ public SystemConfigFileHelper() { } public Properties loadConfig() throws Exception { - InputStream in = new BufferedInputStream(new FileInputStream(file)); Properties properties = new Properties(); - properties.load(in); - in.close(); + + try (InputStream in = new BufferedInputStream(new FileInputStream(file))) { + properties.load(in); + } return properties; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java index 99557b1d3fb..7312b702331 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java @@ -54,11 +54,10 @@ public byte[] encode(boolean compress) { } long start = System.currentTimeMillis(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); - DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); - ConcurrentMap topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); - assert topicConfigTable != null; - try { + try (DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION))) { + DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); + ConcurrentMap topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); + assert topicConfigTable != null; byte[] buffer = dataVersion.encode(); // write data version @@ -117,58 +116,56 @@ public static RegisterBrokerBody decode(byte[] data, boolean compressed, MQVersi return RegisterBrokerBody.decode(data, RegisterBrokerBody.class); } long start = System.currentTimeMillis(); - InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data)); - int dataVersionLength = readInt(inflaterInputStream); - byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength); - DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class); - - RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); - registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion); - ConcurrentMap topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable(); - - int topicConfigNumber = readInt(inflaterInputStream); - LOGGER.debug("{} topic configs to extract", topicConfigNumber); - - for (int i = 0; i < topicConfigNumber; i++) { - int topicConfigJsonLength = readInt(inflaterInputStream); - - byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength); - TopicConfig topicConfig = new TopicConfig(); - String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET); - topicConfig.decode(topicConfigJson); - topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - - int filterServerListJsonLength = readInt(inflaterInputStream); - - byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength); - String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET); - List filterServerList = new ArrayList<>(); - try { - filterServerList = JSON.parseArray(filterServerListJson, String.class); - } catch (Exception e) { - LOGGER.error("Decompressing occur Exception {}", filterServerListJson); - } - - registerBrokerBody.setFilterServerList(filterServerList); + try (InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data))) { + int dataVersionLength = readInt(inflaterInputStream); + byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength); + DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class); + + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion); + ConcurrentMap topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable(); + + int topicConfigNumber = readInt(inflaterInputStream); + LOGGER.debug("{} topic configs to extract", topicConfigNumber); + + for (int i = 0; i < topicConfigNumber; i++) { + int topicConfigJsonLength = readInt(inflaterInputStream); + byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength); + TopicConfig topicConfig = new TopicConfig(); + String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET); + topicConfig.decode(topicConfigJson); + topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } - if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) { - int topicQueueMappingNum = readInt(inflaterInputStream); - Map topicQueueMappingInfoMap = new ConcurrentHashMap<>(); - for (int i = 0; i < topicQueueMappingNum; i++) { - int mappingJsonLen = readInt(inflaterInputStream); - byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen); - TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class); - topicQueueMappingInfoMap.put(info.getTopic(), info); + int filterServerListJsonLength = readInt(inflaterInputStream); + byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength); + String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET); + List filterServerList = new ArrayList<>(); + try { + filterServerList = JSON.parseArray(filterServerListJson, String.class); + } catch (Exception e) { + LOGGER.error("Decompressing occur Exception {}", filterServerListJson, e); + } + registerBrokerBody.setFilterServerList(filterServerList); + + if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) { + int topicQueueMappingNum = readInt(inflaterInputStream); + Map topicQueueMappingInfoMap = new ConcurrentHashMap<>(); + for (int i = 0; i < topicQueueMappingNum; i++) { + int mappingJsonLen = readInt(inflaterInputStream); + byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen); + TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class); + topicQueueMappingInfoMap.put(info.getTopic(), info); + } + registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); } - registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); - } - long takeTime = System.currentTimeMillis() - start; - if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) { - LOGGER.info("Decompressing takes {}ms", takeTime); + long takeTime = System.currentTimeMillis() - start; + if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) { + LOGGER.info("Decompressing takes {}ms", takeTime); + } + return registerBrokerBody; } - return registerBrokerBody; } private static byte[] convertIntToByteArray(int n) { diff --git a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java index 8aa28403120..8073e00902d 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java @@ -101,17 +101,17 @@ public void checkDuplicatedMessageInfo(boolean bPrintLog, if (bPrintLog) { String logFileNameStr = "D:" + File.separator + "checkDuplicatedMessageInfo.txt"; File logFileNameFile = new File(logFileNameStr); - OutputStream out = new FileOutputStream(logFileNameFile, true); + try (OutputStream out = new FileOutputStream(logFileNameFile, true)) { - String strToWrite; - byte[] byteToWrite; - strToWrite = strBuilder + titleString; - for (int i = 0; i < msgListSize; i++) - strToWrite += strBQueue.get(i).toString() + "\r\n"; + String strToWrite; + byte[] byteToWrite; + strToWrite = strBuilder + titleString; + for (int i = 0; i < msgListSize; i++) + strToWrite += strBQueue.get(i).toString() + "\r\n"; - byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8); - out.write(byteToWrite); - out.close(); + byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8); + out.write(byteToWrite); + } } }