Skip to content

Commit 2fee158

Browse files
authored
[AMQ-9569][#1882] SSL handshake write timeout enforcement (#1883)
* feat(AMQ-9559)/feat(#1882): The failover test correctly reproduces the bug: it blocks for 20+ seconds proving WriteTimeoutFilter.start() does not enforce soWriteTimeout. * feat(AMQ-9559)/feat(#1882): Similar to oneway() add registerWrite/deRegisterWrite around super.start() Adding a null check in the TimeoutThread is required to avoid random NPE if the TimeoutThread kicks in after the registerWrite but before initializeStreams() is called resulting in a NPE because TcpTransport.buffOut is null.
1 parent 3b4e819 commit 2fee158

2 files changed

Lines changed: 204 additions & 3 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail
129129

130130
@Override
131131
public void start() throws Exception {
132-
super.start();
132+
try {
133+
registerWrite(this);
134+
super.start();
135+
} finally {
136+
deRegisterWrite(this, false, null);
137+
}
133138
}
134139

135140
@Override
@@ -157,8 +162,10 @@ public void run() {
157162
while (run && filters.hasNext()) {
158163
WriteTimeoutFilter filter = filters.next();
159164
if (filter.getWriteTimeout()<=0) continue; //no timeout set
160-
long writeStart = filter.getWriter().getWriteTimestamp();
161-
long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
165+
TimeStampStream writer = filter.getWriter();
166+
if (writer == null) continue; //stream not yet initialized
167+
long writeStart = writer.getWriteTimestamp();
168+
long delta = (writer.isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
162169
if (delta>filter.getWriteTimeout()) {
163170
WriteTimeoutFilter.deRegisterWrite(filter, true,null);
164171
}//if timeout
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.transport;
18+
19+
import java.io.IOException;
20+
import java.net.ServerSocket;
21+
import java.net.Socket;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import jakarta.jms.Connection;
30+
import jakarta.jms.JMSException;
31+
32+
import org.apache.activemq.ActiveMQConnectionFactory;
33+
import org.junit.After;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
import static org.junit.Assert.assertNotNull;
40+
import static org.junit.Assert.fail;
41+
42+
43+
/**
44+
* Test for AMQ-9569: WriteTimeoutFilter does not timeout SSL write (handshake).
45+
*
46+
* This test demonstrates that when a client connects via SSL to a server that
47+
* accepts the TCP connection but never responds to the SSL handshake, the
48+
* WriteTimeoutFilter does NOT enforce the soWriteTimeout during transport start().
49+
*
50+
* The SSL handshake is triggered during WireFormatNegotiator.start() ->
51+
* sendWireFormat() -> TcpTransport.oneway() -> TcpBufferedOutputStream.flush(),
52+
* which calls SSLSocketImpl.startHandshake() implicitly on the first write.
53+
* Since WriteTimeoutFilter.start() does not call registerWrite(), the
54+
* TimeoutThread has nothing to monitor, and the connection blocks indefinitely.
55+
*/
56+
public class SoWriteTimeoutSslHandshakeTest {
57+
58+
private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutSslHandshakeTest.class);
59+
60+
private static final String KEYSTORE_TYPE = "jks";
61+
private static final String PASSWORD = "password";
62+
private static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
63+
private static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
64+
65+
/** A plain TCP ServerSocket that accepts connections but never responds (simulates unresponsive SSL peer) */
66+
private ServerSocket silentServer;
67+
private ExecutorService executor;
68+
private final AtomicBoolean serverRunning = new AtomicBoolean(true);
69+
70+
@Before
71+
public void setUp() throws Exception {
72+
// Configure SSL system properties for the client side
73+
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
74+
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
75+
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
76+
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
77+
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
78+
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
79+
80+
// Start a plain TCP server that accepts connections but never reads/writes
81+
// This simulates a peer that is unreachable at the SSL layer
82+
silentServer = new ServerSocket(0);
83+
executor = Executors.newCachedThreadPool();
84+
executor.execute(() -> {
85+
while (serverRunning.get()) {
86+
try {
87+
Socket accepted = silentServer.accept();
88+
LOG.info("Silent server accepted connection from: {}", accepted.getRemoteSocketAddress());
89+
// Intentionally do nothing - don't read, don't write, don't close
90+
// This will cause the SSL handshake to block on the client side
91+
} catch (IOException e) {
92+
if (serverRunning.get()) {
93+
LOG.debug("Silent server accept error: {}", e.getMessage());
94+
}
95+
}
96+
}
97+
});
98+
LOG.info("Silent TCP server started on port: {}", silentServer.getLocalPort());
99+
}
100+
101+
@After
102+
public void tearDown() throws Exception {
103+
serverRunning.set(false);
104+
if (silentServer != null) {
105+
silentServer.close();
106+
}
107+
if (executor != null) {
108+
executor.shutdownNow();
109+
executor.awaitTermination(5, TimeUnit.SECONDS);
110+
}
111+
}
112+
113+
/**
114+
* This test proves the bug: WriteTimeoutFilter.start() does NOT register
115+
* the write timeout, so the SSL handshake blocks beyond the configured
116+
* soWriteTimeout.
117+
*
118+
* Expected behavior (after fix): connection attempt should fail within
119+
* roughly soWriteTimeout + TimeoutThread polling interval (~2s + ~5s = ~7s).
120+
*
121+
* Current behavior (bug): connection attempt blocks for much longer than
122+
* soWriteTimeout because WriteTimeoutFilter.start() never calls registerWrite().
123+
*
124+
* We use a generous upper bound of 15 seconds. If the write timeout worked
125+
* during start(), the connection should fail within ~7-8 seconds (2s timeout
126+
* + 5s polling interval + margin). If it takes more than 15 seconds, the
127+
* timeout is NOT being enforced during start().
128+
*/
129+
@Test
130+
public void testSslHandshakeWriteTimeoutNotEnforcedDuringStart() throws Exception {
131+
final int soWriteTimeout = 2000; // 2 second write timeout
132+
// Upper bound: soWriteTimeout + TimeoutThread sleep (5s) + margin
133+
final int expectedMaxSeconds = 15;
134+
135+
// Use ssl:// with soWriteTimeout pointing to our silent TCP server.
136+
// The failover transport ensures the connection attempt doesn't just throw
137+
// immediately but actually tries to establish the SSL connection.
138+
// maxReconnectAttempts=1 to avoid infinite reconnects.
139+
String uri = "failover:(ssl://localhost:" + silentServer.getLocalPort()
140+
+ "?soWriteTimeout=" + soWriteTimeout
141+
+ "&socket.verifyHostName=false"
142+
+ ")?maxReconnectAttempts=2"
143+
+ "&startupMaxReconnectAttempts=1"
144+
+ "&initialReconnectDelay=500";
145+
146+
LOG.info("Connecting with URI: {}", uri);
147+
148+
final CountDownLatch connectFinished = new CountDownLatch(1);
149+
final AtomicReference<Exception> connectException = new AtomicReference<>();
150+
151+
// Run connection attempt in a separate thread since it may block
152+
executor.execute(() -> {
153+
Connection connection = null;
154+
try {
155+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
156+
connection = factory.createConnection();
157+
connection.start();
158+
LOG.info("Connection unexpectedly succeeded");
159+
} catch (JMSException e) {
160+
LOG.info("Connection failed as expected: {}", e.getMessage());
161+
connectException.set(e);
162+
} finally {
163+
connectFinished.countDown();
164+
if (connection != null) {
165+
try {
166+
connection.close();
167+
} catch (JMSException ignored) {
168+
}
169+
}
170+
}
171+
});
172+
173+
// Wait for the connection attempt to complete or timeout
174+
boolean finished = connectFinished.await(expectedMaxSeconds, TimeUnit.SECONDS);
175+
176+
if (finished) {
177+
// The connection attempt completed within the time limit.
178+
// This means the timeout WAS enforced during start() (fix is working).
179+
assertNotNull("Connection should have failed with an exception", connectException.get());
180+
LOG.info("PASS: SSL handshake was timed out correctly within {} seconds", expectedMaxSeconds);
181+
} else {
182+
// The connection attempt is still blocking after expectedMaxSeconds.
183+
// This proves the bug: WriteTimeoutFilter.start() does NOT enforce
184+
// the write timeout during SSL handshake.
185+
LOG.warn("BUG CONFIRMED: SSL handshake blocked for more than {} seconds. "
186+
+ "WriteTimeoutFilter.start() does not register the write timeout. "
187+
+ "See AMQ-9569.", expectedMaxSeconds);
188+
fail("AMQ-9569: WriteTimeoutFilter.start() did not enforce soWriteTimeout during SSL handshake. "
189+
+ "Connection blocked for more than " + expectedMaxSeconds + " seconds. "
190+
+ "Expected the write timeout (" + soWriteTimeout + "ms) to abort the blocked handshake.");
191+
}
192+
}
193+
194+
}

0 commit comments

Comments
 (0)