Skip to content

Commit 832562f

Browse files
authored
[ISSUE #8920] Refactor SSL context loading process to support multiple protocols dynamic loading (#9483)
* feat(proxy): 添加 gRPC 和 Remoting 服务器的 TLS 证书热更新支持 - 在 GrpcServer 和 RemotingProtocolServer 中添加文件监视服务,用于监控 TLS 证书和密钥的变化 - 实现证书和密钥变更时重新加载 SSL 上下文的逻辑 - 优化 ProxyAndTlsProtocolNegotiator 中的 SSL 上下文加载过程 - 添加日志记录,方便调试和监控 TLS 相关操作 * refactor(proxy): 重构 gRPC 证书监控逻辑并添加单元测试 - 重构 GrpcServer 中的证书监控逻辑,提取到独立的 GrpcCertKeyFileWatchListener 类中 - 优化证书变更处理流程,提高代码可读性和维护性 - 新增 GrpcServerTest 类,为 gRPC服务器和证书监控添加单元测试- 测试覆盖了各种证书变更场景,包括单独变更和组合变更 - 验证了证书变更时 SSLContext 的重新加载和错误处理 Signed-off-by: Async <raisinata@foxmail.com> * refactor(proxy): 重构 gRPC 证书监控逻辑并添加单元测试 - 重构 GrpcServer 中的证书监控逻辑,提取到独立的 GrpcCertKeyFileWatchListener 类中 - 优化证书变更处理流程,提高代码可读性和维护性 - 新增 GrpcServerTest 类,为 gRPC服务器和证书监控添加单元测试- 测试覆盖了各种证书变更场景,包括单独变更和组合变更 - 验证了证书变更时 SSLContext 的重新加载和错误处理 Signed-off-by: Async <raisinata@foxmail.com> * fix: code format Signed-off-by: Async <raisinata@foxmail.com> * test: add test cases Signed-off-by: Async <raisinata@foxmail.com> * fix: code format Signed-off-by: Async <raisinata@foxmail.com> * refactor(proxy): 重构 TLS证书更新逻辑 - 移除 FileWatchService,改用 TlsCertificateManager 统一管理 TLS证书 - 实现 TlsContextReloadListener 接口,响应 TLS 证书更新 - 优化 GrpcServer 和 RemotingProtocolServer 中的 TLS 证书更新逻辑 - 新增单元测试验证 TLS 证书更新功能 Signed-off-by: Async <raisinata@foxmail.com> * test(proxy): 优化 TLS 相关测试用例 - 重构了多个测试类中的重复代码- 提高了测试的可读性和维护性 - 确保在测试中正确关闭资源 Signed-off-by: Async <raisinata@foxmail.com> * refactor(proxy): 优化代码导入结构 - 移除了不必要的导入项 - 显式导入了所有活动类,提高了代码的可读性和维护性 Signed-off-by: Async <raisinata@foxmail.com> * update * fix: no static Signed-off-by: Async <raisinata@foxmail.com> * fix: add SingletonHolder for TlsCertificateManager Signed-off-by: Async <raisinata@foxmail.com> * refactor * refactor(proxy): 重构 TLS证书管理 - 将 TlsCertificateManager 实例化移至 ProxyStartup 类 - 更新 GrpcServer 和 RemotingProtocolServer 类以使用 TlsCertificateManager - 移除冗余的 TLS 证书管理相关测试用例 - 优化 TLS 上下文重载逻辑 Signed-off-by: Async <raisinata@foxmail.com> * refactor(proxy): 优化日志信息内容 - 将 cert file changed 日志信息改为更通用的 File changed - 保持代码风格一致性,提高日志的可读性和维护性 Signed-off-by: Async <raisinata@foxmail.com> * test(proxy): 重构并增强 TlsCertificateManager 测试用例- 重新设计测试用例,使用临时文件模拟证书和密钥 - 增加对 TlsCertificateManager 各种方法的单元测试 - 涉及到的测试场景包括: - 构造函数 - 启动和关闭 - 注册和注销监听器 - 文件变更通知(证书、密钥、未知文件等) - 多个监听器的情况 - 监听器抛出异常的情况 - 增加对内部 CertKeyFileWatchListener 的测试 Signed-off-by: Async <raisinata@foxmail.com> * refactor * test(proxy): 优化 TlsCertificateManager 单元测试 -移除了未使用的 import 语句 - 替换了 import static语句,使其更加有序 - 删除了未使用的静态方法断言(verify、times、never) - 重置了 mock 对象以避免测试之间的干扰 Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> * fix format Signed-off-by: Async <raisinata@foxmail.com> --------- Signed-off-by: Async <raisinata@foxmail.com>
1 parent d556460 commit 832562f

13 files changed

Lines changed: 561 additions & 68 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
4545
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
4646
import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
47+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
4748
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
4849
import org.apache.rocketmq.srvutil.ServerUtil;
4950

@@ -76,8 +77,13 @@ public static void main(String[] args) {
7677

7778
MessagingProcessor messagingProcessor = createMessagingProcessor();
7879

80+
// tls cert update
81+
TlsCertificateManager tlsCertificateManager = new TlsCertificateManager();
82+
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(tlsCertificateManager);
83+
7984
// create grpcServer
80-
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
85+
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor,
86+
ConfigurationManager.getProxyConfig().getGrpcServerPort(), tlsCertificateManager)
8187
.addService(createServiceProcessor(messagingProcessor))
8288
.addService(ChannelzService.newInstance(100))
8389
.addService(ProtoReflectionService.newInstance())
@@ -86,7 +92,7 @@ public static void main(String[] args) {
8692
.build();
8793
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
8894

89-
RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
95+
RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, tlsCertificateManager);
9096
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
9197

9298
// start servers one by one.

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717

1818
package org.apache.rocketmq.proxy.grpc;
1919

20-
import java.util.concurrent.TimeUnit;
20+
import com.google.common.annotations.VisibleForTesting;
2121
import io.grpc.Server;
2222
import org.apache.rocketmq.common.constant.LoggerName;
23+
import org.apache.rocketmq.common.utils.StartAndShutdown;
2324
import org.apache.rocketmq.logging.org.slf4j.Logger;
2425
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
25-
import org.apache.rocketmq.common.utils.StartAndShutdown;
26+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
27+
28+
import java.io.IOException;
29+
import java.security.cert.CertificateException;
30+
import java.util.concurrent.TimeUnit;
2631

2732
public class GrpcServer implements StartAndShutdown {
2833
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -33,23 +38,50 @@ public class GrpcServer implements StartAndShutdown {
3338

3439
private final TimeUnit unit;
3540

36-
protected GrpcServer(Server server, long timeout, TimeUnit unit) {
41+
private final TlsCertificateManager tlsCertificateManager;
42+
@VisibleForTesting final GrpcTlsReloadHandler tlsReloadHandler;
43+
44+
protected GrpcServer(Server server, long timeout, TimeUnit unit,
45+
TlsCertificateManager tlsCertificateManager) throws Exception {
3746
this.server = server;
3847
this.timeout = timeout;
3948
this.unit = unit;
49+
this.tlsCertificateManager = tlsCertificateManager;
50+
this.tlsReloadHandler = new GrpcTlsReloadHandler();
4051
}
4152

4253
public void start() throws Exception {
54+
// Register the TLS context reload handler
55+
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
56+
4357
this.server.start();
4458
log.info("grpc server start successfully.");
4559
}
4660

4761
public void shutdown() {
4862
try {
63+
// Unregister the TLS context reload handler
64+
tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
65+
4966
this.server.shutdown().awaitTermination(timeout, unit);
67+
5068
log.info("grpc server shutdown successfully.");
5169
} catch (Exception e) {
5270
e.printStackTrace();
71+
log.error("Failed to shutdown grpc server", e);
72+
}
73+
}
74+
75+
@VisibleForTesting
76+
class GrpcTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
77+
@Override
78+
public void onTlsContextReload() {
79+
try {
80+
ProxyAndTlsProtocolNegotiator.loadSslContext();
81+
log.info("SslContext reloaded for grpc server");
82+
} catch (CertificateException | IOException e) {
83+
log.error("Failed to reload SslContext for server", e);
84+
}
5385
}
5486
}
55-
}
87+
}

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.util.concurrent.ThreadPoolExecutor;
3636
import java.util.concurrent.TimeUnit;
37+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3738

3839
public class GrpcServerBuilder {
3940
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -43,11 +44,15 @@ public class GrpcServerBuilder {
4344

4445
protected TimeUnit unit = TimeUnit.SECONDS;
4546

46-
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) {
47-
return new GrpcServerBuilder(executor, port);
47+
protected TlsCertificateManager tlsCertificateManager;
48+
49+
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port,
50+
TlsCertificateManager tlsCertificateManager) {
51+
return new GrpcServerBuilder(executor, port, tlsCertificateManager);
4852
}
4953

50-
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
54+
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
55+
this.tlsCertificateManager = tlsCertificateManager;
5156
serverBuilder = NettyServerBuilder.forPort(port);
5257

5358
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
@@ -71,7 +76,7 @@ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
7176
}
7277

7378
serverBuilder.maxInboundMessageSize(maxInboundMessageSize)
74-
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
79+
.maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
7580

7681
log.info("grpc server has built. port: {}, bossLoopNum: {}, workerLoopNum: {}, maxInboundMessageSize: {}",
7782
port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
@@ -98,8 +103,8 @@ public GrpcServerBuilder appendInterceptor(ServerInterceptor interceptor) {
98103
return this;
99104
}
100105

101-
public GrpcServer build() {
102-
return new GrpcServer(this.serverBuilder.build(), time, unit);
106+
public GrpcServer build() throws Exception {
107+
return new GrpcServer(this.serverBuilder.build(), time, unit, tlsCertificateManager);
103108
}
104109

105110
public GrpcServerBuilder configInterceptor() {

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,23 @@
3636
import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
3737
import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
3838
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
39+
import io.grpc.netty.shaded.io.netty.handler.ssl.OpenSsl;
3940
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
4041
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
42+
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
43+
4144
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
4245
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
4346
import io.grpc.netty.shaded.io.netty.util.AsciiString;
4447
import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
48+
49+
import java.io.IOException;
4550
import java.io.InputStream;
4651
import java.nio.file.Files;
4752
import java.nio.file.Paths;
53+
import java.security.cert.CertificateException;
4854
import java.util.List;
55+
4956
import org.apache.commons.collections.CollectionUtils;
5057
import org.apache.commons.lang3.StringUtils;
5158
import org.apache.rocketmq.common.constant.HAProxyConstants;
@@ -73,7 +80,13 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
7380
private static SslContext sslContext;
7481

7582
public ProxyAndTlsProtocolNegotiator() {
76-
sslContext = loadSslContext();
83+
try {
84+
loadSslContext();
85+
log.info("SslContext created for proxy server");
86+
} catch (IOException | CertificateException e) {
87+
log.error("SslContext init error", e);
88+
throw new RuntimeException(e);
89+
}
7790
}
7891

7992
@Override
@@ -90,35 +103,36 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
90103
public void close() {
91104
}
92105

93-
private static SslContext loadSslContext() {
94-
try {
95-
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
96-
if (proxyConfig.isTlsTestModeEnable()) {
97-
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
98-
return GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
99-
selfSignedCertificate.privateKey())
100-
.trustManager(InsecureTrustManagerFactory.INSTANCE)
101-
.clientAuth(ClientAuth.NONE)
102-
.build();
103-
} else {
104-
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
105-
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
106-
try (InputStream serverKeyInputStream = Files.newInputStream(
107-
Paths.get(tlsKeyPath));
108-
InputStream serverCertificateStream = Files.newInputStream(
109-
Paths.get(tlsCertPath))) {
110-
SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
111-
serverKeyInputStream)
112-
.trustManager(InsecureTrustManagerFactory.INSTANCE)
113-
.clientAuth(ClientAuth.NONE)
114-
.build();
115-
log.info("grpc load TLS configured OK");
116-
return res;
117-
}
106+
public static void loadSslContext() throws CertificateException, IOException {
107+
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
108+
SslProvider provider;
109+
if (OpenSsl.isAvailable()) {
110+
provider = SslProvider.OPENSSL;
111+
log.info("Using OpenSSL provider");
112+
} else {
113+
provider = SslProvider.JDK;
114+
log.info("Using JDK SSL provider");
115+
}
116+
if (proxyConfig.isTlsTestModeEnable()) {
117+
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
118+
sslContext = GrpcSslContexts.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
119+
.sslProvider(provider)
120+
.trustManager(InsecureTrustManagerFactory.INSTANCE)
121+
.clientAuth(ClientAuth.NONE)
122+
.build();
123+
} else {
124+
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
125+
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
126+
try (InputStream serverKeyInputStream = Files.newInputStream(
127+
Paths.get(tlsKeyPath));
128+
InputStream serverCertificateStream = Files.newInputStream(
129+
Paths.get(tlsCertPath))) {
130+
sslContext = GrpcSslContexts.forServer(serverCertificateStream,
131+
serverKeyInputStream)
132+
.trustManager(InsecureTrustManagerFactory.INSTANCE)
133+
.clientAuth(ClientAuth.NONE)
134+
.build();
118135
}
119-
} catch (Exception e) {
120-
log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
121-
throw new RuntimeException("grpc tls set failed: " + e.getMessage());
122136
}
123137
}
124138

@@ -135,15 +149,14 @@ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) {
135149
@Override
136150
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
137151
try {
138-
ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(
139-
in);
152+
ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
140153
if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
141154
return;
142155
}
143156
if (ha.state() == ProtocolDetectionState.DETECTED) {
144157
ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
145-
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
146-
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
158+
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
159+
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
147160
} else {
148161
ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
149162
}
@@ -209,7 +222,7 @@ private void handleWithMessage(HAProxyMessage msg) {
209222
msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
210223
}
211224
pne = InternalProtocolNegotiationEvent
212-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
225+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
213226
} finally {
214227
msg.release();
215228
}
@@ -244,9 +257,9 @@ private class TlsModeHandler extends ByteToMessageDecoder {
244257

245258
public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
246259
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
247-
.newHandler(grpcHandler);
260+
.newHandler(grpcHandler);
248261
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
249-
.newHandler(grpcHandler);
262+
.newHandler(grpcHandler);
250263
}
251264

252265
@Override
@@ -258,7 +271,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
258271
} else if (TlsMode.DISABLED.equals(tlsMode)) {
259272
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
260273
} else {
261-
// in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not
274+
// in SslHandler.isEncrypted, it needs at least 5 bytes to judge is encrypted or not
262275
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
263276
return;
264277
}

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public void loadSslContext() {
6868
if (tlsMode != TlsMode.DISABLED) {
6969
try {
7070
sslContext = MultiProtocolTlsHelper.buildSslContext();
71-
log.info("SSLContext created for server");
71+
log.info("SslContext created for multi protocol remoting server");
7272
} catch (CertificateException | IOException e) {
73-
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SSLContext for server", e);
73+
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SslContext for server", e);
7474
}
7575
}
7676
}

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ public static SslContext buildSslContext() throws IOException, CertificateExcept
6161
log.info("Using JDK SSL provider");
6262
}
6363

64-
SslContextBuilder sslContextBuilder = null;
64+
SslContextBuilder sslContextBuilder;
6565
if (tlsTestModeEnable) {
6666
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
6767
sslContextBuilder = SslContextBuilder
6868
.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
69-
.sslProvider(SslProvider.OPENSSL)
69+
.sslProvider(provider)
7070
.clientAuth(ClientAuth.OPTIONAL);
7171
} else {
7272
sslContextBuilder = SslContextBuilder.forServer(

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.rocketmq.proxy.remoting.pipeline.AuthorizationPipeline;
4747
import org.apache.rocketmq.proxy.remoting.pipeline.ContextInitPipeline;
4848
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
49+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
4950
import org.apache.rocketmq.remoting.ChannelEventListener;
5051
import org.apache.rocketmq.remoting.InvokeCallback;
5152
import org.apache.rocketmq.remoting.RemotingServer;
@@ -88,8 +89,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
8889
protected final ThreadPoolExecutor topicRouteExecutor;
8990
protected final ThreadPoolExecutor defaultExecutor;
9091
protected final ScheduledExecutorService timerExecutor;
92+
protected final TlsCertificateManager tlsCertificateManager;
93+
protected final RemotingTlsReloadHandler tlsReloadHandler;
9194

92-
public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
95+
96+
public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception {
9397
this.messagingProcessor = messagingProcessor;
9498
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
9599

@@ -114,6 +118,8 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
114118
System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH, config.getTlsCertPath());
115119
TlsSystemConfig.tlsServerKeyPath = config.getTlsKeyPath();
116120
System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH, config.getTlsKeyPath());
121+
this.tlsCertificateManager = tlsCertificateManager;
122+
this.tlsReloadHandler = new RemotingTlsReloadHandler();
117123

118124
this.clientHousekeepingService = new ClientHousekeepingService(this.clientManagerActivity);
119125

@@ -191,6 +197,16 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
191197
this.registerRemotingServer(this.defaultRemotingServer);
192198
}
193199

200+
protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
201+
@Override
202+
public void onTlsContextReload() {
203+
if (defaultRemotingServer instanceof NettyRemotingServer) {
204+
((NettyRemotingServer) defaultRemotingServer).loadSslContext();
205+
log.info("SslContext reloaded for remoting server");
206+
}
207+
}
208+
}
209+
194210
protected void registerRemotingServer(RemotingServer remotingServer) {
195211
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor);
196212
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor);
@@ -226,6 +242,9 @@ protected void registerRemotingServer(RemotingServer remotingServer) {
226242

227243
@Override
228244
public void shutdown() throws Exception {
245+
// Unregister the TLS context reload handler
246+
tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
247+
229248
this.defaultRemotingServer.shutdown();
230249
this.remotingChannelManager.shutdown();
231250
this.sendMessageExecutor.shutdown();
@@ -238,6 +257,9 @@ public void shutdown() throws Exception {
238257

239258
@Override
240259
public void start() throws Exception {
260+
// Register the TLS context reload handler
261+
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
262+
241263
this.remotingChannelManager.start();
242264
this.defaultRemotingServer.start();
243265
}

0 commit comments

Comments
 (0)