Skip to content

Commit 1738e0b

Browse files
committed
refactor(proxy): 重构 TLS证书管理功能
- 新增 FileCertChangeSource 类,用于监听证书文件变化 - 添加 GrpcServer 和 MultiProtocolRemotingServer 的 TLS 证书重载处理 - 修改 ProxyAndTlsProtocolNegotiator 类,支持从输入流加载 SSL 上下文 - 删除冗余测试类,优化测试用例 Signed-off-by: Async <raisinata@foxmail.com>
1 parent 6fae362 commit 1738e0b

16 files changed

Lines changed: 1217 additions & 588 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class GrpcTlsReloadHandler implements TlsCertificateManager.TlsContextReloadList
7878
@Override
7979
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
8080
try {
81-
ProxyAndTlsProtocolNegotiator.loadSslContext(null, null);
81+
ProxyAndTlsProtocolNegotiator.loadSslContext(certInputStream, keyInputStream);
8282
log.info("SSLContext reloaded for grpc server");
8383
} catch (CertificateException | IOException e) {
8484
log.error("Failed to reload SSLContext for server", e);

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
103103
public void close() {
104104
}
105105

106-
public static void loadSslContext(InputStream certInputStream, InputStream keyInputStream) throws CertificateException, IOException {
106+
public static void loadSslContext(InputStream certInputStream,
107+
InputStream keyInputStream) throws CertificateException, IOException {
107108
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
108109
SslProvider provider;
109110
if (OpenSsl.isAvailable()) {
@@ -165,8 +166,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
165166
}
166167
if (ha.state() == ProtocolDetectionState.DETECTED) {
167168
ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
168-
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
169-
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
169+
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
170+
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
170171
} else {
171172
ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
172173
}
@@ -232,7 +233,7 @@ private void handleWithMessage(HAProxyMessage msg) {
232233
msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
233234
}
234235
pne = InternalProtocolNegotiationEvent
235-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
236+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
236237
} finally {
237238
msg.release();
238239
}

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.channel.ChannelPipeline;
2121
import io.netty.channel.socket.SocketChannel;
2222
import io.netty.handler.timeout.IdleStateHandler;
23+
import java.io.InputStream;
2324
import org.apache.rocketmq.common.constant.LoggerName;
2425
import org.apache.rocketmq.logging.org.slf4j.Logger;
2526
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -61,14 +62,14 @@ public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelE
6162
}
6263

6364
@Override
64-
public void loadSslContext() {
65+
public void loadSslContext(InputStream certInputStream, InputStream keyInputStream) {
6566
TlsMode tlsMode = TlsSystemConfig.tlsMode;
6667
log.info("Server is running in TLS {} mode", tlsMode.getName());
6768

6869
if (tlsMode != TlsMode.DISABLED) {
6970
try {
7071
sslContext = MultiProtocolTlsHelper.buildSslContext();
71-
log.info("SSLContext created for remoting server");
72+
log.info("SSLContext created for multi protocol remoting server");
7273
} catch (CertificateException | IOException e) {
7374
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SSLContext for server", e);
7475
}

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.File;
3030
import java.io.FileInputStream;
3131
import java.io.IOException;
32+
import java.io.InputStream;
3233
import java.nio.file.Files;
3334
import java.nio.file.Paths;
3435
import java.security.cert.CertificateException;
@@ -51,6 +52,10 @@ public class MultiProtocolTlsHelper extends TlsHelper {
5152
private static final DecryptionStrategy DECRYPTION_STRATEGY = (privateKeyEncryptPath, forClient) -> new FileInputStream(privateKeyEncryptPath);
5253

5354
public static SslContext buildSslContext() throws IOException, CertificateException {
55+
return buildSslContext(null, null);
56+
}
57+
public static SslContext buildSslContext(InputStream certInputStream,
58+
InputStream keyInputStream) throws IOException, CertificateException {
5459
TlsHelper.buildSslContext(false);
5560
SslProvider provider;
5661
if (OpenSsl.isAvailable()) {
@@ -61,19 +66,28 @@ public static SslContext buildSslContext() throws IOException, CertificateExcept
6166
log.info("Using JDK SSL provider");
6267
}
6368

64-
SslContextBuilder sslContextBuilder = null;
69+
SslContextBuilder sslContextBuilder;
6570
if (tlsTestModeEnable) {
6671
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
6772
sslContextBuilder = SslContextBuilder
6873
.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
69-
.sslProvider(SslProvider.OPENSSL)
74+
.sslProvider(provider)
7075
.clientAuth(ClientAuth.OPTIONAL);
7176
} else {
72-
sslContextBuilder = SslContextBuilder.forServer(
73-
!StringUtils.isBlank(tlsServerCertPath) ? Files.newInputStream(Paths.get(tlsServerCertPath)) : null,
74-
!StringUtils.isBlank(tlsServerKeyPath) ? DECRYPTION_STRATEGY.decryptPrivateKey(tlsServerKeyPath, false) : null,
75-
!StringUtils.isBlank(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
76-
.sslProvider(provider);
77+
78+
boolean fromStream = certInputStream != null && keyInputStream != null;
79+
// Give priority to reading from input stream
80+
if (fromStream) {
81+
sslContextBuilder = SslContextBuilder.forServer(
82+
certInputStream, keyInputStream)
83+
.sslProvider(provider);
84+
} else {
85+
sslContextBuilder = SslContextBuilder.forServer(
86+
!StringUtils.isBlank(tlsServerCertPath) ? Files.newInputStream(Paths.get(tlsServerCertPath)) : null,
87+
!StringUtils.isBlank(tlsServerKeyPath) ? DECRYPTION_STRATEGY.decryptPrivateKey(tlsServerKeyPath, false) : null,
88+
!StringUtils.isBlank(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
89+
.sslProvider(provider);
90+
}
7791

7892
if (!tlsServerAuthClient) {
7993
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsCon
203203
@Override
204204
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
205205
if (defaultRemotingServer instanceof NettyRemotingServer) {
206-
((NettyRemotingServer) defaultRemotingServer).loadSslContext();
206+
((NettyRemotingServer) defaultRemotingServer).loadSslContext(certInputStream, keyInputStream);
207207
log.info("SSLContext reloaded for remoting server");
208208
}
209209
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/FileCertChangeSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class FileCertChangeSource implements CertChangeSource {
3030
private ChangeListener listener;
3131

3232
public FileCertChangeSource(String certPath, String keyPath) {
33+
this(certPath, keyPath, 10 * 60 * 1000);
34+
}
35+
36+
public FileCertChangeSource(String certPath, String keyPath, int watchInterval) {
3337
try {
3438
this.watchService = new FileWatchService(
3539
new String[] {certPath, keyPath},
@@ -38,15 +42,15 @@ public FileCertChangeSource(String certPath, String keyPath) {
3842
return;
3943
}
4044
CertChangeEvent.Kind kind;
41-
if (path.equals(ConfigurationManager.getProxyConfig().getTlsCertPath())) {
45+
if (path.equals(certPath)) {
4246
kind = CertChangeEvent.Kind.SERVER_CERT;
4347
} else {
4448
kind = CertChangeEvent.Kind.SERVER_KEY;
4549
}
4650

4751
listener.onCertChanged(new CertChangeEvent(kind, CertChangeEvent.SourceType.FILE,
4852
Collections.singletonList(path)));
49-
}, 10 * 60 * 1000);
53+
}, watchInterval);
5054
} catch (Exception e) {
5155
throw new RuntimeException(e);
5256
}

0 commit comments

Comments
 (0)