Skip to content

Commit be67d61

Browse files
[ISSUE #9447] Avoiding possible resource leaks in InputStream (#9459)
1 parent d3d126e commit be67d61

4 files changed

Lines changed: 66 additions & 68 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,10 @@ public SystemConfigFileHelper() {
294294
}
295295

296296
public Properties loadConfig() throws Exception {
297-
InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));
298297
Properties properties = new Properties();
299-
properties.load(in);
300-
in.close();
298+
try (InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)))) {
299+
properties.load(in);
300+
}
301301
return properties;
302302
}
303303

container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,10 +405,11 @@ public SystemConfigFileHelper() {
405405
}
406406

407407
public Properties loadConfig() throws Exception {
408-
InputStream in = new BufferedInputStream(new FileInputStream(file));
409408
Properties properties = new Properties();
410-
properties.load(in);
411-
in.close();
409+
410+
try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
411+
properties.load(in);
412+
}
412413
return properties;
413414
}
414415

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ public byte[] encode(boolean compress) {
5454
}
5555
long start = System.currentTimeMillis();
5656
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
57-
DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
58-
DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
59-
ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
60-
assert topicConfigTable != null;
61-
try {
57+
try (DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION))) {
58+
DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
59+
ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
60+
assert topicConfigTable != null;
6261
byte[] buffer = dataVersion.encode();
6362

6463
// write data version
@@ -117,58 +116,56 @@ public static RegisterBrokerBody decode(byte[] data, boolean compressed, MQVersi
117116
return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
118117
}
119118
long start = System.currentTimeMillis();
120-
InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
121-
int dataVersionLength = readInt(inflaterInputStream);
122-
byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
123-
DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
124-
125-
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
126-
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
127-
ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
128-
129-
int topicConfigNumber = readInt(inflaterInputStream);
130-
LOGGER.debug("{} topic configs to extract", topicConfigNumber);
131-
132-
for (int i = 0; i < topicConfigNumber; i++) {
133-
int topicConfigJsonLength = readInt(inflaterInputStream);
134-
135-
byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
136-
TopicConfig topicConfig = new TopicConfig();
137-
String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
138-
topicConfig.decode(topicConfigJson);
139-
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
140-
}
141-
142-
int filterServerListJsonLength = readInt(inflaterInputStream);
143-
144-
byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
145-
String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
146-
List<String> filterServerList = new ArrayList<>();
147-
try {
148-
filterServerList = JSON.parseArray(filterServerListJson, String.class);
149-
} catch (Exception e) {
150-
LOGGER.error("Decompressing occur Exception {}", filterServerListJson);
151-
}
152-
153-
registerBrokerBody.setFilterServerList(filterServerList);
119+
try (InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data))) {
120+
int dataVersionLength = readInt(inflaterInputStream);
121+
byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
122+
DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
123+
124+
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
125+
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
126+
ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
127+
128+
int topicConfigNumber = readInt(inflaterInputStream);
129+
LOGGER.debug("{} topic configs to extract", topicConfigNumber);
130+
131+
for (int i = 0; i < topicConfigNumber; i++) {
132+
int topicConfigJsonLength = readInt(inflaterInputStream);
133+
byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
134+
TopicConfig topicConfig = new TopicConfig();
135+
String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
136+
topicConfig.decode(topicConfigJson);
137+
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
138+
}
154139

155-
if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) {
156-
int topicQueueMappingNum = readInt(inflaterInputStream);
157-
Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<>();
158-
for (int i = 0; i < topicQueueMappingNum; i++) {
159-
int mappingJsonLen = readInt(inflaterInputStream);
160-
byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
161-
TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
162-
topicQueueMappingInfoMap.put(info.getTopic(), info);
140+
int filterServerListJsonLength = readInt(inflaterInputStream);
141+
byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
142+
String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
143+
List<String> filterServerList = new ArrayList<>();
144+
try {
145+
filterServerList = JSON.parseArray(filterServerListJson, String.class);
146+
} catch (Exception e) {
147+
LOGGER.error("Decompressing occur Exception {}", filterServerListJson, e);
148+
}
149+
registerBrokerBody.setFilterServerList(filterServerList);
150+
151+
if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) {
152+
int topicQueueMappingNum = readInt(inflaterInputStream);
153+
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = new ConcurrentHashMap<>();
154+
for (int i = 0; i < topicQueueMappingNum; i++) {
155+
int mappingJsonLen = readInt(inflaterInputStream);
156+
byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
157+
TopicQueueMappingInfo info = TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
158+
topicQueueMappingInfoMap.put(info.getTopic(), info);
159+
}
160+
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
163161
}
164-
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
165-
}
166162

167-
long takeTime = System.currentTimeMillis() - start;
168-
if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
169-
LOGGER.info("Decompressing takes {}ms", takeTime);
163+
long takeTime = System.currentTimeMillis() - start;
164+
if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
165+
LOGGER.info("Decompressing takes {}ms", takeTime);
166+
}
167+
return registerBrokerBody;
170168
}
171-
return registerBrokerBody;
172169
}
173170

174171
private static byte[] convertIntToByteArray(int n) {

test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,17 @@ public void checkDuplicatedMessageInfo(boolean bPrintLog,
101101
if (bPrintLog) {
102102
String logFileNameStr = "D:" + File.separator + "checkDuplicatedMessageInfo.txt";
103103
File logFileNameFile = new File(logFileNameStr);
104-
OutputStream out = new FileOutputStream(logFileNameFile, true);
104+
try (OutputStream out = new FileOutputStream(logFileNameFile, true)) {
105105

106-
String strToWrite;
107-
byte[] byteToWrite;
108-
strToWrite = strBuilder + titleString;
109-
for (int i = 0; i < msgListSize; i++)
110-
strToWrite += strBQueue.get(i).toString() + "\r\n";
106+
String strToWrite;
107+
byte[] byteToWrite;
108+
strToWrite = strBuilder + titleString;
109+
for (int i = 0; i < msgListSize; i++)
110+
strToWrite += strBQueue.get(i).toString() + "\r\n";
111111

112-
byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
113-
out.write(byteToWrite);
114-
out.close();
112+
byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
113+
out.write(byteToWrite);
114+
}
115115
}
116116
}
117117

0 commit comments

Comments
 (0)