Skip to content

Commit 611a38d

Browse files
committed
refactor(proxy): 重构证书变更事件处理逻辑
- 修改 CertChangeEvent 类,将 values 字段改为单个 value 字段 - 更新 CertChangeSource 接口,使用事件列表替代单个事件 -调整 FileCertChangeSource 实现,适应新的事件处理方式- 修改 TlsCertificateManager 类,支持处理多个证书变更事件 - 更新相关测试用例,以适应新的事件处理逻辑 Signed-off-by: Async <raisinata@foxmail.com>
1 parent 88f18b9 commit 611a38d

7 files changed

Lines changed: 83 additions & 73 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public static void loadSslContext(InputStream certInputStream,
134134
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
135135
try (InputStream serverKeyInputStream = Files.newInputStream(
136136
Paths.get(tlsKeyPath));
137-
InputStream serverCertificateStream = Files.newInputStream(
137+
InputStream serverCertificateStream = Files.newInputStream(
138138
Paths.get(tlsCertPath))) {
139139
sslContext = GrpcSslContexts.forServer(serverCertificateStream,
140140
serverKeyInputStream)
@@ -166,8 +166,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
166166
}
167167
if (ha.state() == ProtocolDetectionState.DETECTED) {
168168
ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
169-
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
170-
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
169+
.addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
170+
.addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
171171
} else {
172172
ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
173173
}
@@ -233,7 +233,7 @@ private void handleWithMessage(HAProxyMessage msg) {
233233
msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
234234
}
235235
pne = InternalProtocolNegotiationEvent
236-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
236+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
237237
} finally {
238238
msg.release();
239239
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/CertChangeEvent.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.rocketmq.proxy.service.cert;
1818

19-
import java.util.List;
20-
2119
public class CertChangeEvent {
2220

2321
public enum Kind {
@@ -35,12 +33,12 @@ public enum SourceType {
3533

3634
private final SourceType sourceType;
3735

38-
private final List<String> values;
36+
private final String value;
3937

40-
public CertChangeEvent(Kind kind, SourceType type, List<String> values) {
38+
public CertChangeEvent(Kind kind, SourceType type, String value) {
4139
this.kind = kind;
4240
this.sourceType = type;
43-
this.values = values;
41+
this.value = value;
4442
}
4543

4644
public Kind getKind() {
@@ -51,15 +49,15 @@ public SourceType getSourceType() {
5149
return sourceType;
5250
}
5351

54-
public List<String> getValues() {
55-
return values;
52+
public String getValue() {
53+
return value;
5654
}
5755

5856
@Override public String toString() {
5957
return "CertChangeEvent{" +
6058
"kind=" + kind +
6159
", sourceType=" + sourceType +
62-
", values=" + values +
60+
", values=" + value +
6361
'}';
6462
}
6563
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/CertChangeSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
*/
1717
package org.apache.rocketmq.proxy.service.cert;
1818

19+
import java.util.List;
1920
import org.apache.rocketmq.common.utils.StartAndShutdown;
2021

2122
public interface CertChangeSource extends StartAndShutdown {
2223

2324
void setListener(ChangeListener listener);
2425

2526
interface ChangeListener {
26-
void onCertChanged(CertChangeEvent event);
27+
void onCertChanged(List<CertChangeEvent> events);
2728
}
2829
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/FileCertChangeSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public FileCertChangeSource(String certPath, String keyPath, int watchInterval)
4848
kind = CertChangeEvent.Kind.SERVER_KEY;
4949
}
5050

51-
listener.onCertChanged(new CertChangeEvent(kind, CertChangeEvent.SourceType.FILE,
52-
Collections.singletonList(path)));
51+
listener.onCertChanged(Collections.singletonList(new CertChangeEvent(kind, CertChangeEvent.SourceType.FILE, path)));
5352
}, watchInterval);
5453
} catch (Exception e) {
5554
throw new RuntimeException(e);

proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManager.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ public void shutdown() throws Exception {
6666
}
6767

6868
@Override
69-
public void onCertChanged(CertChangeEvent event) {
70-
log.info("cert changed: {}", event);
71-
if (event.getSourceType() == CertChangeEvent.SourceType.FILE) {
72-
if (event.getValues().isEmpty()) {
69+
public void onCertChanged(List<CertChangeEvent> events) {
70+
log.info("cert changed: {}", events);
71+
if (events.size() == 1 && events.get(0).getSourceType() == CertChangeEvent.SourceType.FILE) {
72+
CertChangeEvent event = events.get(0);
73+
if (event.getValue() == null) {
7374
log.warn("cert path is empty, ignore");
7475
return;
7576
}
76-
String path = event.getValues().get(0);
77+
String path = event.getValue();
7778
if (path.equals(ConfigurationManager.getProxyConfig().getTlsCertPath())) {
7879
certChanged = true;
7980
} else if (path.equals(ConfigurationManager.getProxyConfig().getTlsKeyPath())) {
@@ -88,15 +89,15 @@ public void onCertChanged(CertChangeEvent event) {
8889
certChanged = false;
8990
keyChanged = false;
9091
}
91-
} else if (event.getSourceType() == CertChangeEvent.SourceType.INLINE) {
92-
if (event.getValues().size() != 2) {
93-
log.warn("cert inline is invalid, ignore");
92+
} else if (events.size() == 2 && events.get(0).getSourceType() == CertChangeEvent.SourceType.INLINE) {
93+
if (events.get(0).getValue() == null || events.get(1).getValue() == null) {
94+
log.warn("cert or key value is empty, ignore");
9495
return;
9596
}
9697
for (TlsContextReloadListener listener : reloadListeners) {
9798
listener.onTlsContextReload(
98-
IOUtils.toInputStream(event.getValues().get(0), StandardCharsets.UTF_8),
99-
IOUtils.toInputStream(event.getValues().get(1), StandardCharsets.UTF_8)
99+
IOUtils.toInputStream(events.get(0).getValue(), StandardCharsets.UTF_8),
100+
IOUtils.toInputStream(events.get(1).getValue(), StandardCharsets.UTF_8)
100101
);
101102
}
102103
}

proxy/src/test/java/org/apache/rocketmq/proxy/service/cert/FileCertChangeSourceTest.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.File;
2020
import java.io.PrintWriter;
21+
import java.util.List;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
2324
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -120,43 +121,51 @@ public void testCertFileChangeTriggerListener() throws Exception {
120121
boolean called = latch.await(2, TimeUnit.SECONDS);
121122
assertThat(called).isTrue();
122123

123-
ArgumentCaptor<CertChangeEvent> eventCaptor = ArgumentCaptor.forClass(CertChangeEvent.class);
124-
verify(mockListener, times(1)).onCertChanged(eventCaptor.capture());
125-
CertChangeEvent capturedEvent = eventCaptor.getValue();
124+
// Use a List<CertChangeEvent> captor
125+
@SuppressWarnings("unchecked")
126+
ArgumentCaptor<List<CertChangeEvent>> eventListCaptor = ArgumentCaptor.forClass(List.class);
127+
verify(mockListener, times(1)).onCertChanged(eventListCaptor.capture());
128+
129+
List<CertChangeEvent> capturedEvents = eventListCaptor.getValue();
130+
assertThat(capturedEvents).hasSize(1);
131+
CertChangeEvent capturedEvent = capturedEvents.get(0);
126132
assertThat(capturedEvent.getKind()).isEqualTo(CertChangeEvent.Kind.SERVER_CERT);
127133
assertThat(capturedEvent.getSourceType()).isEqualTo(CertChangeEvent.SourceType.FILE);
128-
assertThat(capturedEvent.getValues()).contains(certFile.getAbsolutePath());
134+
assertThat(capturedEvent.getValue()).contains(certFile.getAbsolutePath());
129135
}
130136

131137
@Test
132138
public void testKeyFileChangeTriggerListener() throws Exception {
133139
fileCertChangeSourceCustomInterval.start();
134-
135140
final CountDownLatch latch = new CountDownLatch(1);
136-
137-
CertChangeSource.ChangeListener countingListener = event -> {
141+
CertChangeSource.ChangeListener countingListener = events -> {
138142
try {
139-
mockListener.onCertChanged(event);
143+
mockListener.onCertChanged(events);
140144
} finally {
141145
latch.countDown();
142146
}
143147
};
144-
145148
fileCertChangeSourceCustomInterval.setListener(countingListener);
146-
147149
modifyFile(keyFile, "Modified key content " + System.currentTimeMillis());
148-
149150
boolean called = latch.await(2, TimeUnit.SECONDS);
150151
assertThat(called).isTrue();
151152

152-
ArgumentCaptor<CertChangeEvent> eventCaptor = ArgumentCaptor.forClass(CertChangeEvent.class);
153-
verify(mockListener, times(1)).onCertChanged(eventCaptor.capture());
154-
CertChangeEvent capturedEvent = eventCaptor.getValue();
153+
// Use a List<CertChangeEvent> captor
154+
@SuppressWarnings("unchecked")
155+
ArgumentCaptor<List<CertChangeEvent>> eventListCaptor = ArgumentCaptor.forClass(List.class);
156+
verify(mockListener, times(1)).onCertChanged(eventListCaptor.capture());
157+
158+
List<CertChangeEvent> capturedEvents = eventListCaptor.getValue();
159+
assertThat(capturedEvents).hasSize(1);
160+
CertChangeEvent capturedEvent = capturedEvents.get(0);
155161
assertThat(capturedEvent.getKind()).isEqualTo(CertChangeEvent.Kind.SERVER_KEY);
156162
assertThat(capturedEvent.getSourceType()).isEqualTo(CertChangeEvent.SourceType.FILE);
157-
assertThat(capturedEvent.getValues()).contains(keyFile.getAbsolutePath());
163+
assertThat(capturedEvent.getValue()).contains(keyFile.getAbsolutePath());
158164
}
159165

166+
167+
168+
160169
@Test
161170
public void testFileChangeWithoutListener() throws Exception {
162171

@@ -169,7 +178,7 @@ public void testFileChangeWithoutListener() throws Exception {
169178

170179
Thread.sleep(500);
171180

172-
verify(mockListener, times(0)).onCertChanged(Mockito.any(CertChangeEvent.class));
181+
verify(mockListener, times(0)).onCertChanged(Mockito.anyList());
173182

174183
sourceWithoutListener.shutdown();
175184
}
@@ -204,7 +213,7 @@ public void testSetListener() throws Exception {
204213
boolean called = latch.await(2, TimeUnit.SECONDS);
205214
assertThat(called).isTrue();
206215

207-
verify(mockListener, times(1)).onCertChanged(Mockito.any(CertChangeEvent.class));
216+
verify(mockListener, times(1)).onCertChanged(Mockito.anyList());
208217

209218
source.shutdown();
210219
}

0 commit comments

Comments
 (0)