Skip to content

Commit adf2c4d

Browse files
f1amingoRongtongJin
authored andcommitted
[ISSUE #9191] Provide the ability to replace the remoting layer implementation for Proxy and Broker (#9192)
* remoting replacement for proxy and broker * add unit test * fix code style
1 parent e441f8d commit adf2c4d

13 files changed

Lines changed: 391 additions & 163 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 116 additions & 86 deletions
Large diffs are not rendered by default.

broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,18 @@
2626
import org.apache.rocketmq.common.BrokerConfig;
2727
import org.apache.rocketmq.common.UtilAll;
2828
import org.apache.rocketmq.common.future.FutureTaskExt;
29+
import org.apache.rocketmq.remoting.RPCHook;
30+
import org.apache.rocketmq.remoting.RemotingServer;
2931
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
32+
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
33+
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
3034
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
3135
import org.apache.rocketmq.remoting.netty.RequestTask;
36+
import org.apache.rocketmq.remoting.pipeline.RequestPipeline;
37+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3238
import org.apache.rocketmq.store.config.MessageStoreConfig;
3339
import org.junit.After;
40+
import org.junit.Assert;
3441
import org.junit.Before;
3542
import org.junit.Test;
3643

@@ -94,4 +101,43 @@ public void run() {
94101
TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
95102
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
96103
}
104+
105+
@Test
106+
public void testCustomRemotingServer() throws CloneNotSupportedException {
107+
final RemotingServer mockRemotingServer = new NettyRemotingServer(nettyServerConfig);
108+
final String mockRemotingServerName = "MOCK_REMOTING_SERVER";
109+
110+
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
111+
brokerController.setRemotingServerByName(mockRemotingServerName, mockRemotingServer);
112+
brokerController.initializeRemotingServer();
113+
114+
final RPCHook rpcHook = new RPCHook() {
115+
@Override
116+
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
117+
118+
}
119+
120+
@Override
121+
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
122+
123+
}
124+
};
125+
brokerController.registerServerRPCHook(rpcHook);
126+
127+
// setRequestPipelineTest
128+
final RequestPipeline requestPipeline = (ctx, request) -> {
129+
130+
};
131+
brokerController.setRequestPipeline(requestPipeline);
132+
133+
NettyRemotingAbstract tcpRemotingServer = (NettyRemotingAbstract) brokerController.getRemotingServer();
134+
Assert.assertTrue(tcpRemotingServer.getRPCHook().contains(rpcHook));
135+
136+
NettyRemotingAbstract fastRemotingServer = (NettyRemotingAbstract) brokerController.getFastRemotingServer();
137+
Assert.assertTrue(fastRemotingServer.getRPCHook().contains(rpcHook));
138+
139+
NettyRemotingAbstract mockRemotingServer1 = (NettyRemotingAbstract) brokerController.getRemotingServerByName(mockRemotingServerName);
140+
Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook));
141+
Assert.assertSame(mockRemotingServer, mockRemotingServer1);
142+
}
97143
}

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
6060
import org.apache.rocketmq.common.MQVersion;
6161
import org.apache.rocketmq.common.MixAll;
62+
import org.apache.rocketmq.common.ObjectCreator;
6263
import org.apache.rocketmq.common.Pair;
6364
import org.apache.rocketmq.common.PlainAccessConfig;
6465
import org.apache.rocketmq.common.TopicConfig;
@@ -268,19 +269,43 @@ public class MQClientAPIImpl implements NameServerUpdateCallback, StartAndShutdo
268269
private String nameSrvAddr = null;
269270
private ClientConfig clientConfig;
270271

271-
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
272+
public MQClientAPIImpl(
273+
final NettyClientConfig nettyClientConfig,
272274
final ClientRemotingProcessor clientRemotingProcessor,
273-
RPCHook rpcHook, final ClientConfig clientConfig) {
275+
final RPCHook rpcHook,
276+
final ClientConfig clientConfig
277+
) {
274278
this(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null);
275279
}
276280

277-
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
281+
public MQClientAPIImpl(
282+
final NettyClientConfig nettyClientConfig,
278283
final ClientRemotingProcessor clientRemotingProcessor,
279-
RPCHook rpcHook, final ClientConfig clientConfig, final ChannelEventListener channelEventListener) {
284+
final RPCHook rpcHook,
285+
final ClientConfig clientConfig,
286+
final ChannelEventListener channelEventListener
287+
) {
288+
this(
289+
nettyClientConfig,
290+
clientRemotingProcessor,
291+
rpcHook,
292+
clientConfig,
293+
channelEventListener,
294+
null
295+
);
296+
}
297+
298+
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
299+
final ClientRemotingProcessor clientRemotingProcessor,
300+
RPCHook rpcHook, final ClientConfig clientConfig,
301+
final ChannelEventListener channelEventListener,
302+
final ObjectCreator<RemotingClient> remotingClientCreator) {
280303
this.clientConfig = clientConfig;
281304
topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
282305
topAddressing.registerChangeCallBack(this);
283-
this.remotingClient = new NettyRemotingClient(nettyClientConfig, channelEventListener);
306+
this.remotingClient = remotingClientCreator != null
307+
? remotingClientCreator.create(nettyClientConfig, channelEventListener)
308+
: new NettyRemotingClient(nettyClientConfig, channelEventListener);
284309
this.clientRemotingProcessor = clientRemotingProcessor;
285310

286311
this.remotingClient.registerRPCHook(new NamespaceRpcHook(clientConfig));

client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
3838
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
3939
import org.apache.rocketmq.client.producer.SendResult;
40+
import org.apache.rocketmq.common.ObjectCreator;
4041
import org.apache.rocketmq.common.constant.LoggerName;
4142
import org.apache.rocketmq.common.message.Message;
4243
import org.apache.rocketmq.common.message.MessageBatch;
@@ -48,6 +49,7 @@
4849
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4950
import org.apache.rocketmq.remoting.InvokeCallback;
5051
import org.apache.rocketmq.remoting.RPCHook;
52+
import org.apache.rocketmq.remoting.RemotingClient;
5153
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
5254
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
5355
import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -97,7 +99,17 @@ public MQClientAPIExt(
9799
ClientRemotingProcessor clientRemotingProcessor,
98100
RPCHook rpcHook
99101
) {
100-
super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig);
102+
this(clientConfig, nettyClientConfig, clientRemotingProcessor, rpcHook, null);
103+
}
104+
105+
public MQClientAPIExt(
106+
ClientConfig clientConfig,
107+
NettyClientConfig nettyClientConfig,
108+
ClientRemotingProcessor clientRemotingProcessor,
109+
RPCHook rpcHook,
110+
ObjectCreator<RemotingClient> remotingClientCreator
111+
) {
112+
super(nettyClientConfig, clientRemotingProcessor, rpcHook, clientConfig, null, remotingClientCreator);
101113
this.clientConfig = clientConfig;
102114
this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
103115
}

client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
package org.apache.rocketmq.client.impl.mqclient;
1818

1919
import com.google.common.base.Strings;
20+
2021
import java.time.Duration;
2122
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.concurrent.ThreadLocalRandom;
2324
import java.util.concurrent.TimeUnit;
25+
2426
import org.apache.commons.lang3.StringUtils;
2527
import org.apache.rocketmq.client.ClientConfig;
2628
import org.apache.rocketmq.client.common.NameserverAccessConfig;
2729
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
2830
import org.apache.rocketmq.common.MixAll;
2931
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
32+
import org.apache.rocketmq.common.ObjectCreator;
3033
import org.apache.rocketmq.common.utils.StartAndShutdown;
3134
import org.apache.rocketmq.remoting.RPCHook;
35+
import org.apache.rocketmq.remoting.RemotingClient;
3236
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
3337

3438
public class MQClientAPIFactory implements StartAndShutdown {
@@ -40,16 +44,35 @@ public class MQClientAPIFactory implements StartAndShutdown {
4044
private final RPCHook rpcHook;
4145
private final ScheduledExecutorService scheduledExecutorService;
4246
private final NameserverAccessConfig nameserverAccessConfig;
47+
private final ObjectCreator<RemotingClient> remotingClientCreator;
48+
49+
public MQClientAPIFactory(
50+
NameserverAccessConfig nameserverAccessConfig,
51+
String namePrefix,
52+
int clientNum,
53+
ClientRemotingProcessor clientRemotingProcessor,
54+
RPCHook rpcHook,
55+
ScheduledExecutorService scheduledExecutorService
56+
) {
57+
this(nameserverAccessConfig, namePrefix, clientNum, clientRemotingProcessor, rpcHook, scheduledExecutorService, null);
58+
}
4359

44-
public MQClientAPIFactory(NameserverAccessConfig nameserverAccessConfig, String namePrefix, int clientNum,
60+
public MQClientAPIFactory(
61+
NameserverAccessConfig nameserverAccessConfig,
62+
String namePrefix,
63+
int clientNum,
4564
ClientRemotingProcessor clientRemotingProcessor,
46-
RPCHook rpcHook, ScheduledExecutorService scheduledExecutorService) {
65+
RPCHook rpcHook,
66+
ScheduledExecutorService scheduledExecutorService,
67+
ObjectCreator<RemotingClient> remotingClientCreator
68+
) {
4769
this.nameserverAccessConfig = nameserverAccessConfig;
4870
this.namePrefix = namePrefix;
4971
this.clientNum = clientNum;
5072
this.clientRemotingProcessor = clientRemotingProcessor;
5173
this.rpcHook = rpcHook;
5274
this.scheduledExecutorService = scheduledExecutorService;
75+
this.remotingClientCreator = remotingClientCreator;
5376

5477
this.init();
5578
}
@@ -102,9 +125,13 @@ protected MQClientAPIExt createAndStart(String instanceName) {
102125
NettyClientConfig nettyClientConfig = new NettyClientConfig();
103126
nettyClientConfig.setDisableCallbackExecutor(true);
104127

105-
MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(clientConfig, nettyClientConfig,
128+
MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(
129+
clientConfig,
130+
nettyClientConfig,
106131
clientRemotingProcessor,
107-
rpcHook);
132+
rpcHook,
133+
remotingClientCreator
134+
);
108135

109136
if (!mqClientAPIExt.updateNameServerAddressList()) {
110137
mqClientAPIExt.fetchNameServerAddr();

client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.rocketmq.client.producer.SendResult;
3636
import org.apache.rocketmq.client.producer.SendStatus;
3737
import org.apache.rocketmq.common.MixAll;
38+
import org.apache.rocketmq.common.ObjectCreator;
3839
import org.apache.rocketmq.common.Pair;
3940
import org.apache.rocketmq.common.PlainAccessConfig;
4041
import org.apache.rocketmq.common.TopicConfig;
@@ -59,6 +60,7 @@
5960
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
6061
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
6162
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
63+
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
6264
import org.apache.rocketmq.remoting.netty.ResponseFuture;
6365
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
6466
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -2104,6 +2106,48 @@ public void operationFail(Throwable throwable) {
21042106
done.await();
21052107
}
21062108

2109+
@Test
2110+
public void testMQClientAPIImplWithoutObjectCreator() {
2111+
MQClientAPIImpl clientAPI = new MQClientAPIImpl(
2112+
new NettyClientConfig(),
2113+
null,
2114+
null,
2115+
new ClientConfig(),
2116+
null,
2117+
null
2118+
);
2119+
RemotingClient remotingClient1 = clientAPI.getRemotingClient();
2120+
Assert.assertTrue(remotingClient1 instanceof NettyRemotingClient);
2121+
}
2122+
2123+
@Test
2124+
public void testMQClientAPIImplWithObjectCreator() {
2125+
ObjectCreator<RemotingClient> clientObjectCreator = args -> new MockRemotingClientTest((NettyClientConfig) args[0]);
2126+
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
2127+
MQClientAPIImpl clientAPI = new MQClientAPIImpl(
2128+
nettyClientConfig,
2129+
null,
2130+
null,
2131+
new ClientConfig(),
2132+
null,
2133+
clientObjectCreator
2134+
);
2135+
RemotingClient remotingClient1 = clientAPI.getRemotingClient();
2136+
Assert.assertTrue(remotingClient1 instanceof MockRemotingClientTest);
2137+
MockRemotingClientTest remotingClientTest = (MockRemotingClientTest) remotingClient1;
2138+
Assert.assertSame(remotingClientTest.getNettyClientConfig(), nettyClientConfig);
2139+
}
2140+
2141+
private static class MockRemotingClientTest extends NettyRemotingClient {
2142+
public MockRemotingClientTest(NettyClientConfig nettyClientConfig) {
2143+
super(nettyClientConfig);
2144+
}
2145+
2146+
public NettyClientConfig getNettyClientConfig() {
2147+
return nettyClientConfig;
2148+
}
2149+
}
2150+
21072151
private Properties createProperties() {
21082152
Properties result = new Properties();
21092153
result.put("key", "value");
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.common;
18+
19+
public interface ObjectCreator<T> {
20+
T create(Object... args);
21+
}

container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.rocketmq.common.AbstractBrokerRunnable;
2424
import org.apache.rocketmq.common.BrokerConfig;
2525
import org.apache.rocketmq.common.MixAll;
26+
import org.apache.rocketmq.remoting.RemotingServer;
2627
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
2728
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
2829
import org.apache.rocketmq.store.MessageStore;
@@ -43,8 +44,11 @@ public InnerBrokerController(
4344

4445
@Override
4546
protected void initializeRemotingServer() {
46-
this.remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
47-
this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
47+
RemotingServer remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
48+
RemotingServer fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
49+
50+
setRemotingServer(remotingServer);
51+
setFastRemotingServer(fastRemotingServer);
4852
}
4953

5054
@Override
@@ -119,11 +123,11 @@ public void shutdown() {
119123
scheduledFuture.cancel(true);
120124
}
121125

122-
if (this.remotingServer != null) {
126+
if (getRemotingServer() != null) {
123127
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
124128
}
125129

126-
if (this.fastRemotingServer != null) {
130+
if (getFastRemotingServer() != null) {
127131
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
128132
}
129133
}

0 commit comments

Comments
 (0)