Skip to content

Commit 20486b8

Browse files
committed
feat(proxy): 添加证书变更监听功能
- 新增 CertChangeEvent、CertChangeSource 和 FileCertChangeSource 类 - 实现证书文件变更监听和事件处理机制 - 修改 GrpcServer 以支持证书变更热重载 - 更新相关测试用例 Signed-off-by: Async <raisinata@foxmail.com>
1 parent b5acb36 commit 20486b8

14 files changed

Lines changed: 687 additions & 525 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
4545
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
4646
import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
47+
import org.apache.rocketmq.proxy.service.cert.FileCertChangeSource;
4748
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
4849
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
4950
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -77,8 +78,15 @@ public static void main(String[] args) {
7778

7879
MessagingProcessor messagingProcessor = createMessagingProcessor();
7980

81+
FileCertChangeSource fileCertChangeSource = new FileCertChangeSource(
82+
ConfigurationManager.getProxyConfig().getTlsCertPath(),
83+
ConfigurationManager.getProxyConfig().getTlsKeyPath());
84+
TlsCertificateManager tlsCertificateManager = new TlsCertificateManager(fileCertChangeSource);
85+
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(tlsCertificateManager);
86+
8087
// create grpcServer
81-
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
88+
GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor,
89+
ConfigurationManager.getProxyConfig().getGrpcServerPort(), tlsCertificateManager)
8290
.addService(createServiceProcessor(messagingProcessor))
8391
.addService(ChannelzService.newInstance(100))
8492
.addService(ProtoReflectionService.newInstance())
@@ -87,11 +95,9 @@ public static void main(String[] args) {
8795
.build();
8896
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
8997

90-
RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
98+
RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, tlsCertificateManager);
9199
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
92100

93-
initTlsCertificateManager();
94-
95101
// start servers one by one.
96102
PROXY_START_AND_SHUTDOWN.start();
97103

@@ -125,11 +131,6 @@ protected static void initConfiguration(CommandLineArgument commandLineArgument)
125131

126132
}
127133

128-
protected static void initTlsCertificateManager() {
129-
TlsCertificateManager tlsCertManager = TlsCertificateManager.getInstance();
130-
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(tlsCertManager);
131-
}
132-
133134
protected static CommandLineArgument parseCommandLineArgument(String[] args) {
134135
CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
135136
buildCommandlineOptions(), new DefaultParser());

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
2727

2828
import java.io.IOException;
29+
import java.io.InputStream;
2930
import java.security.cert.CertificateException;
3031
import java.util.concurrent.TimeUnit;
3132

@@ -38,28 +39,30 @@ public class GrpcServer implements StartAndShutdown {
3839

3940
private final TimeUnit unit;
4041

42+
private final TlsCertificateManager tlsCertificateManager;
43+
44+
@VisibleForTesting
4145
final GrpcTlsReloadHandler tlsReloadHandler;
4246

43-
protected GrpcServer(Server server, long timeout, TimeUnit unit) throws Exception {
47+
protected GrpcServer(Server server, long timeout, TimeUnit unit, TlsCertificateManager tlsCertificateManager) throws Exception {
4448
this.server = server;
4549
this.timeout = timeout;
4650
this.unit = unit;
47-
51+
this.tlsCertificateManager = tlsCertificateManager;
4852
this.tlsReloadHandler = new GrpcTlsReloadHandler();
49-
50-
// Register the TLS context reload handler
51-
TlsCertificateManager.getInstance().registerReloadListener(this.tlsReloadHandler);
5253
}
5354

5455
public void start() throws Exception {
56+
// Register the TLS context reload handler
57+
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
5558
this.server.start();
5659
log.info("grpc server start successfully.");
5760
}
5861

5962
public void shutdown() {
6063
try {
6164
// Unregister the TLS context reload handler
62-
TlsCertificateManager.getInstance().unregisterReloadListener(this.tlsReloadHandler);
65+
tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
6366

6467
this.server.shutdown().awaitTermination(timeout, unit);
6568

@@ -72,9 +75,9 @@ public void shutdown() {
7275
@VisibleForTesting
7376
class GrpcTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
7477
@Override
75-
public void onTlsContextReload() {
78+
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
7679
try {
77-
ProxyAndTlsProtocolNegotiator.loadSslContext();
80+
ProxyAndTlsProtocolNegotiator.loadSslContext(null, null);
7881
log.info("SSLContext reloaded for grpc server");
7982
} catch (CertificateException | IOException e) {
8083
log.error("Failed to reload SSLContext for server", e);

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
3232
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
3333
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
34+
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3435

3536
import java.util.concurrent.ThreadPoolExecutor;
3637
import java.util.concurrent.TimeUnit;
@@ -43,11 +44,15 @@ public class GrpcServerBuilder {
4344

4445
protected TimeUnit unit = TimeUnit.SECONDS;
4546

46-
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) {
47-
return new GrpcServerBuilder(executor, port);
47+
protected TlsCertificateManager tlsCertificateManager;
48+
49+
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
50+
return new GrpcServerBuilder(executor, port, tlsCertificateManager);
4851
}
4952

50-
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
53+
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
54+
this.tlsCertificateManager = tlsCertificateManager;
55+
5156
serverBuilder = NettyServerBuilder.forPort(port);
5257

5358
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
@@ -99,7 +104,7 @@ public GrpcServerBuilder appendInterceptor(ServerInterceptor interceptor) {
99104
}
100105

101106
public GrpcServer build() throws Exception {
102-
return new GrpcServer(this.serverBuilder.build(), time, unit);
107+
return new GrpcServer(this.serverBuilder.build(), time, unit, tlsCertificateManager);
103108
}
104109

105110
public GrpcServerBuilder configInterceptor() {

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
8181

8282
public ProxyAndTlsProtocolNegotiator() {
8383
try {
84-
loadSslContext();
85-
log.info("SSLContext created for proxy server");
84+
loadSslContext(null, null);
85+
log.info("SSLContext created for proxy grpc server");
8686
} catch (IOException | CertificateException e) {
8787
log.error("SslContext init error", e);
8888
throw new RuntimeException(e);
@@ -103,7 +103,7 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
103103
public void close() {
104104
}
105105

106-
public static void loadSslContext() throws CertificateException, IOException {
106+
public static void loadSslContext(InputStream certInputStream, InputStream keyInputStream) throws CertificateException, IOException {
107107
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
108108
SslProvider provider;
109109
if (OpenSsl.isAvailable()) {
@@ -116,22 +116,31 @@ public static void loadSslContext() throws CertificateException, IOException {
116116
if (proxyConfig.isTlsTestModeEnable()) {
117117
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
118118
sslContext = GrpcSslContexts.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
119-
.sslProvider(provider)
119+
.sslProvider(provider)
120+
.trustManager(InsecureTrustManagerFactory.INSTANCE)
121+
.clientAuth(ClientAuth.NONE)
122+
.build();
123+
} else {
124+
boolean fromStream = certInputStream != null && keyInputStream != null;
125+
126+
if (fromStream) {
127+
sslContext = GrpcSslContexts.forServer(certInputStream, keyInputStream)
120128
.trustManager(InsecureTrustManagerFactory.INSTANCE)
121129
.clientAuth(ClientAuth.NONE)
122130
.build();
123-
} else {
124-
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
125-
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
126-
try (InputStream serverKeyInputStream = Files.newInputStream(
131+
} else {
132+
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
133+
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
134+
try (InputStream serverKeyInputStream = Files.newInputStream(
127135
Paths.get(tlsKeyPath));
128-
InputStream serverCertificateStream = Files.newInputStream(
136+
InputStream serverCertificateStream = Files.newInputStream(
129137
Paths.get(tlsCertPath))) {
130-
sslContext = GrpcSslContexts.forServer(serverCertificateStream,
131-
serverKeyInputStream)
138+
sslContext = GrpcSslContexts.forServer(serverCertificateStream,
139+
serverKeyInputStream)
132140
.trustManager(InsecureTrustManagerFactory.INSTANCE)
133141
.clientAuth(ClientAuth.NONE)
134142
.build();
143+
}
135144
}
136145
}
137146
}
@@ -150,14 +159,14 @@ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) {
150159
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
151160
try {
152161
ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(
153-
in);
162+
in);
154163
if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
155164
return;
156165
}
157166
if (ha.state() == ProtocolDetectionState.DETECTED) {
158167
ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
159-
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
160-
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
168+
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
169+
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
161170
} else {
162171
ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
163172
}
@@ -223,7 +232,7 @@ private void handleWithMessage(HAProxyMessage msg) {
223232
msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
224233
}
225234
pne = InternalProtocolNegotiationEvent
226-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
235+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
227236
} finally {
228237
msg.release();
229238
}
@@ -245,7 +254,7 @@ protected void handleHAProxyTLV(HAProxyTLV tlv, Attributes.Builder builder) {
245254
return;
246255
}
247256
Attributes.Key<String> key = AttributeKeys.valueOf(
248-
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
257+
HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
249258
builder.set(key, new String(valueBytes, CharsetUtil.UTF_8));
250259
}
251260

@@ -258,9 +267,9 @@ private class TlsModeHandler extends ByteToMessageDecoder {
258267

259268
public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
260269
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
261-
.newHandler(grpcHandler);
270+
.newHandler(grpcHandler);
262271
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
263-
.newHandler(grpcHandler);
272+
.newHandler(grpcHandler);
264273
}
265274

266275
@Override

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.rocketmq.remoting.protocol.RequestCode;
6060
import org.apache.rocketmq.remoting.protocol.ResponseCode;
6161

62+
import java.io.InputStream;
6263
import java.util.concurrent.BlockingQueue;
6364
import java.util.concurrent.CompletableFuture;
6465
import java.util.concurrent.ScheduledExecutorService;
@@ -89,10 +90,10 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
8990
protected final ThreadPoolExecutor topicRouteExecutor;
9091
protected final ThreadPoolExecutor defaultExecutor;
9192
protected final ScheduledExecutorService timerExecutor;
93+
protected final TlsCertificateManager tlsCertificateManager;
9294
protected final RemotingTlsReloadHandler tlsReloadHandler;
9395

94-
95-
public RemotingProtocolServer(MessagingProcessor messagingProcessor) throws Exception {
96+
public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception {
9697
this.messagingProcessor = messagingProcessor;
9798
this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
9899

@@ -191,15 +192,16 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor) throws Exce
191192
);
192193
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
193194

195+
this.tlsCertificateManager = tlsCertificateManager;
194196
this.tlsReloadHandler = new RemotingTlsReloadHandler();
195-
TlsCertificateManager.getInstance().registerReloadListener(this.tlsReloadHandler);
197+
tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
196198

197199
this.registerRemotingServer(this.defaultRemotingServer);
198200
}
199201

200202
protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener {
201203
@Override
202-
public void onTlsContextReload() {
204+
public void onTlsContextReload(InputStream certInputStream, InputStream keyInputStream) {
203205
if (defaultRemotingServer instanceof NettyRemotingServer) {
204206
((NettyRemotingServer) defaultRemotingServer).loadSslContext();
205207
log.info("SSLContext reloaded for remoting server");
@@ -243,7 +245,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) {
243245
@Override
244246
public void shutdown() throws Exception {
245247
// Unregister the TLS context reload handler
246-
TlsCertificateManager.getInstance().unregisterReloadListener(this.tlsReloadHandler);
248+
tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
247249

248250
this.defaultRemotingServer.shutdown();
249251
this.remotingChannelManager.shutdown();
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.proxy.service.cert;
18+
19+
import java.util.List;
20+
21+
public class CertChangeEvent {
22+
23+
public enum Kind {
24+
SERVER_CERT,
25+
SERVER_KEY,
26+
TRUST_CERT
27+
}
28+
29+
public enum SourceType {
30+
FILE,
31+
INLINE
32+
}
33+
34+
private final Kind kind;
35+
36+
private final SourceType sourceType;
37+
38+
private final List<String> values;
39+
40+
public CertChangeEvent(Kind kind, SourceType type, List<String> values) {
41+
this.kind = kind;
42+
this.sourceType = type;
43+
this.values = values;
44+
}
45+
46+
public Kind getKind() {
47+
return kind;
48+
}
49+
50+
public SourceType getSourceType() {
51+
return sourceType;
52+
}
53+
54+
public List<String> getValues() {
55+
return values;
56+
}
57+
58+
@Override public String toString() {
59+
return "CertChangeEvent{" +
60+
"kind=" + kind +
61+
", sourceType=" + sourceType +
62+
", values=" + values +
63+
'}';
64+
}
65+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.proxy.service.cert;
18+
19+
import org.apache.rocketmq.common.utils.StartAndShutdown;
20+
21+
public interface CertChangeSource extends StartAndShutdown {
22+
23+
void setListener(ChangeListener listener);
24+
25+
interface ChangeListener {
26+
void onCertChanged(CertChangeEvent event);
27+
}
28+
}

0 commit comments

Comments
 (0)