Skip to content

Commit 7c4e38f

Browse files
committed
refactor(proxy): 重构 TLS证书管理
- 移除 CertChangeSource 接口和相关实现 - 使用 FileWatchService 监控证书和密钥文件变化- 简化 TlsCertificateManager 类结构和逻辑 - 更新相关测试用例 Signed-off-by: Async <raisinata@foxmail.com>
2 parents 611a38d + 6b5c4a5 commit 7c4e38f

8 files changed

Lines changed: 281 additions & 468 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
4545
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
4646
import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
47-
import org.apache.rocketmq.proxy.service.cert.CertChangeSource;
48-
import org.apache.rocketmq.proxy.service.cert.FileCertChangeSource;
4947
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
5048
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
5149
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -79,10 +77,8 @@ public static void main(String[] args) {
7977

8078
MessagingProcessor messagingProcessor = createMessagingProcessor();
8179

82-
CertChangeSource fileCertChangeSource = new FileCertChangeSource(
83-
ConfigurationManager.getProxyConfig().getTlsCertPath(),
84-
ConfigurationManager.getProxyConfig().getTlsKeyPath());
85-
TlsCertificateManager tlsCertificateManager = new TlsCertificateManager(fileCertChangeSource);
80+
// tls cert update
81+
TlsCertificateManager tlsCertificateManager = new TlsCertificateManager();
8682
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(tlsCertificateManager);
8783

8884
// create grpcServer

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
2727

2828
import java.io.IOException;
29-
import java.io.InputStream;
3029
import java.security.cert.CertificateException;
3130
import java.util.concurrent.TimeUnit;
3231

@@ -55,6 +54,7 @@ protected GrpcServer(Server server, long timeout, TimeUnit unit, TlsCertificateM
5554
public void start() throws Exception {
5655
// Register the TLS context reload handler
5756
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
57+
5858
this.server.start();
5959
log.info("grpc server start successfully.");
6060
}
@@ -76,9 +76,9 @@ public void shutdown() {
7676
@VisibleForTesting
7777
class GrpcTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
7878
@Override
79-
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
79+
public void onTlsContextReload() {
8080
try {
81-
ProxyAndTlsProtocolNegotiator.loadSslContext(certInputStream, keyInputStream);
81+
ProxyAndTlsProtocolNegotiator.loadSslContext();
8282
log.info("SSLContext reloaded for grpc server");
8383
} catch (CertificateException | IOException e) {
8484
log.error("Failed to reload SSLContext for server", e);

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
3232
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
3333
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
34-
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3534

3635
import java.util.concurrent.ThreadPoolExecutor;
3736
import java.util.concurrent.TimeUnit;
37+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3838

3939
public class GrpcServerBuilder {
4040
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -52,7 +52,6 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
5252

5353
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
5454
this.tlsCertificateManager = tlsCertificateManager;
55-
5655
serverBuilder = NettyServerBuilder.forPort(port);
5756

5857
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.rocketmq.remoting.protocol.RequestCode;
6060
import org.apache.rocketmq.remoting.protocol.ResponseCode;
6161

62-
import java.io.InputStream;
6362
import java.util.concurrent.BlockingQueue;
6463
import java.util.concurrent.CompletableFuture;
6564
import java.util.concurrent.ScheduledExecutorService;
@@ -93,6 +92,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
9392
protected final TlsCertificateManager tlsCertificateManager;
9493
protected final RemotingTlsReloadHandler tlsReloadHandler;
9594

95+
9696
public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception {
9797
this.messagingProcessor = messagingProcessor;
9898
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
@@ -118,6 +118,8 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific
118118
System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH, config.getTlsCertPath());
119119
TlsSystemConfig.tlsServerKeyPath = config.getTlsKeyPath();
120120
System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH, config.getTlsKeyPath());
121+
this.tlsCertificateManager = tlsCertificateManager;
122+
this.tlsReloadHandler = new RemotingTlsReloadHandler();
121123

122124
this.clientHousekeepingService = new ClientHousekeepingService(this.clientManagerActivity);
123125

@@ -192,18 +194,14 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific
192194
);
193195
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
194196

195-
this.tlsCertificateManager = tlsCertificateManager;
196-
this.tlsReloadHandler = new RemotingTlsReloadHandler();
197-
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
198-
199197
this.registerRemotingServer(this.defaultRemotingServer);
200198
}
201199

202200
protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
203201
@Override
204-
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
202+
public void onTlsContextReload() {
205203
if (defaultRemotingServer instanceof NettyRemotingServer) {
206-
((NettyRemotingServer) defaultRemotingServer).loadSslContext(certInputStream, keyInputStream);
204+
((NettyRemotingServer) defaultRemotingServer).loadSslContext();
207205
log.info("SSLContext reloaded for remoting server");
208206
}
209207
}
@@ -259,6 +257,9 @@ public void shutdown() throws Exception {
259257

260258
@Override
261259
public void start() throws Exception {
260+
// Register the TLS context reload handler
261+
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
262+
262263
this.remotingChannelManager.start();
263264
this.defaultRemotingServer.start();
264265
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManager.java

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,40 @@
1515
* limitations under the License.
1616
*/
1717
package org.apache.rocketmq.proxy.service.cert;
18-
19-
import java.io.InputStream;
20-
import java.nio.charset.StandardCharsets;
21-
import java.util.ArrayList;
22-
import java.util.List;
23-
import org.apache.commons.io.IOUtils;
2418
import org.apache.rocketmq.common.constant.LoggerName;
2519
import org.apache.rocketmq.common.utils.StartAndShutdown;
2620
import org.apache.rocketmq.logging.org.slf4j.Logger;
2721
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2822
import org.apache.rocketmq.proxy.config.ConfigurationManager;
23+
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
24+
import org.apache.rocketmq.srvutil.FileWatchService;
25+
import java.util.ArrayList;
26+
import java.util.List;
2927

30-
public class TlsCertificateManager implements StartAndShutdown, CertChangeSource.ChangeListener {
28+
public class TlsCertificateManager implements StartAndShutdown {
3129
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
3230

33-
private final CertChangeSource certChangeSource;
31+
private final FileWatchService fileWatchService;
3432
private final List<TlsContextReloadListener> reloadListeners = new ArrayList<>();
3533

36-
private boolean certChanged = false;
37-
private boolean keyChanged = false;
34+
public TlsCertificateManager() {
35+
try {
36+
this.fileWatchService = new FileWatchService(
37+
new String[] {
38+
ConfigurationManager.getProxyConfig().getTlsCertPath(),
39+
ConfigurationManager.getProxyConfig().getTlsKeyPath()
40+
},
41+
new CertKeyFileWatchListener(),
42+
60 * 60 * 1000 /* 1 hour */
43+
);
44+
} catch (Exception e) {
45+
log.error("Failed to initialize TLS certificate watch service", e);
46+
throw new RuntimeException("Failed to initialize TLS certificate manager", e);
47+
}
48+
}
3849

39-
public TlsCertificateManager(CertChangeSource source) {
40-
this.certChangeSource = source;
41-
source.setListener(this);
50+
public FileWatchService getFileWatchService() {
51+
return this.fileWatchService;
4252
}
4353

4454
public void registerReloadListener(TlsContextReloadListener listener) {
@@ -53,58 +63,59 @@ public void unregisterReloadListener(TlsContextReloadListener listener) {
5363
}
5464
}
5565

66+
public List<TlsContextReloadListener> getReloadListeners() {
67+
return this.reloadListeners;
68+
}
69+
5670
@Override
5771
public void start() throws Exception {
58-
certChangeSource.start();
59-
log.info("TLS certificate manager started successfully");
72+
this.fileWatchService.start();
73+
log.info("TLS certificate manager started successfully, start watching: {} {}",
74+
ConfigurationManager.getProxyConfig().getTlsCertPath(),
75+
ConfigurationManager.getProxyConfig().getTlsKeyPath()
76+
);
6077
}
6178

6279
@Override
6380
public void shutdown() throws Exception {
64-
certChangeSource.shutdown();
81+
this.fileWatchService.shutdown();
6582
log.info("TLS certificate manager shutdown successfully");
6683
}
6784

68-
@Override
69-
public void onCertChanged(List<CertChangeEvent> events) {
70-
log.info("cert changed: {}", events);
71-
if (events.size() == 1 && events.get(0).getSourceType() == CertChangeEvent.SourceType.FILE) {
72-
CertChangeEvent event = events.get(0);
73-
if (event.getValue() == null) {
74-
log.warn("cert path is empty, ignore");
75-
return;
76-
}
77-
String path = event.getValue();
78-
if (path.equals(ConfigurationManager.getProxyConfig().getTlsCertPath())) {
85+
private class CertKeyFileWatchListener implements FileWatchService.Listener {
86+
private boolean certChanged = false;
87+
private boolean keyChanged = false;
88+
89+
@Override
90+
public void onChanged(String path) {
91+
log.info("File changed: {}", path);
92+
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
7993
certChanged = true;
80-
} else if (path.equals(ConfigurationManager.getProxyConfig().getTlsKeyPath())) {
94+
} else if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
8195
keyChanged = true;
8296
}
8397

8498
if (certChanged && keyChanged) {
85-
log.info("The certificate and private key changed, reload the SSLContext from file");
86-
for (TlsContextReloadListener listener : reloadListeners) {
87-
listener.onTlsContextReload(null, null);
88-
}
99+
log.info("The certificate and private key changed, reload the ssl context");
100+
notifyContextReload();
89101
certChanged = false;
90102
keyChanged = false;
91103
}
92-
} else if (events.size() == 2 && events.get(0).getSourceType() == CertChangeEvent.SourceType.INLINE) {
93-
if (events.get(0).getValue() == null || events.get(1).getValue() == null) {
94-
log.warn("cert or key value is empty, ignore");
95-
return;
96-
}
104+
}
105+
106+
private void notifyContextReload() {
97107
for (TlsContextReloadListener listener : reloadListeners) {
98-
listener.onTlsContextReload(
99-
IOUtils.toInputStream(events.get(0).getValue(), StandardCharsets.UTF_8),
100-
IOUtils.toInputStream(events.get(1).getValue(), StandardCharsets.UTF_8)
101-
);
108+
try {
109+
listener.onTlsContextReload();
110+
} catch (Exception e) {
111+
log.error("Failed to notify TLS context reload to listener: " + listener, e);
112+
}
102113
}
103114
}
104115
}
105116

106117
// Interface for listeners interested in TLS context reload events
107118
public interface TlsContextReloadListener {
108-
void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream);
119+
void onTlsContextReload();
109120
}
110121
}

0 commit comments

Comments
 (0)