Skip to content

Commit 76cee72

Browse files
authored
[ISSUE #9537] MQClientAPIFactory Implement NameServerUpdateCallback interface (#9538)
1 parent 8639dcc commit 76cee72

2 files changed

Lines changed: 161 additions & 4 deletions

File tree

client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
package org.apache.rocketmq.client.impl.mqclient;
1818

1919
import com.google.common.base.Strings;
20-
2120
import java.time.Duration;
2221
import java.util.concurrent.ScheduledExecutorService;
2322
import java.util.concurrent.ThreadLocalRandom;
2423
import java.util.concurrent.TimeUnit;
25-
2624
import org.apache.commons.lang3.StringUtils;
2725
import org.apache.rocketmq.client.ClientConfig;
2826
import org.apache.rocketmq.client.common.NameserverAccessConfig;
2927
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
3028
import org.apache.rocketmq.common.MixAll;
31-
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
3229
import org.apache.rocketmq.common.ObjectCreator;
30+
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
3331
import org.apache.rocketmq.common.utils.StartAndShutdown;
3432
import org.apache.rocketmq.remoting.RPCHook;
3533
import org.apache.rocketmq.remoting.RemotingClient;
@@ -133,7 +131,9 @@ protected MQClientAPIExt createAndStart(String instanceName) {
133131
remotingClientCreator
134132
);
135133

136-
if (!mqClientAPIExt.updateNameServerAddressList()) {
134+
if (StringUtils.isEmpty(nameserverAccessConfig.getNamesrvDomain())) {
135+
mqClientAPIExt.updateNameServerAddressList(nameserverAccessConfig.getNamesrvAddr());
136+
} else {
137137
mqClientAPIExt.fetchNameServerAddr();
138138
this.scheduledExecutorService.scheduleAtFixedRate(
139139
mqClientAPIExt::fetchNameServerAddr,
@@ -142,7 +142,18 @@ protected MQClientAPIExt createAndStart(String instanceName) {
142142
TimeUnit.MILLISECONDS
143143
);
144144
}
145+
145146
mqClientAPIExt.start();
146147
return mqClientAPIExt;
147148
}
149+
150+
public void onNameServerAddressChange(String namesrvAddress) {
151+
for (MQClientAPIExt client : clients) {
152+
client.onNameServerAddressChange(namesrvAddress);
153+
}
154+
}
155+
156+
public MQClientAPIExt[] getClients() {
157+
return clients;
158+
}
148159
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.impl.mqclient;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertThrows;
22+
23+
import java.util.List;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import org.apache.rocketmq.client.common.NameserverAccessConfig;
26+
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
27+
import org.apache.rocketmq.common.MixAll;
28+
import org.apache.rocketmq.common.utils.ThreadUtils;
29+
import org.apache.rocketmq.remoting.RPCHook;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
34+
class MQClientAPITest {
35+
36+
private NameserverAccessConfig nameserverAccessConfig;
37+
private final ClientRemotingProcessor clientRemotingProcessor = new DoNothingClientRemotingProcessor(null);
38+
private final RPCHook rpcHook = null;
39+
private ScheduledExecutorService scheduledExecutorService;
40+
private MQClientAPIFactory mqClientAPIFactory;
41+
42+
@BeforeEach
43+
void setUp() {
44+
scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor("TestScheduledExecutorService", true);
45+
}
46+
47+
@AfterEach
48+
public void tearDown() {
49+
scheduledExecutorService.shutdownNow();
50+
}
51+
52+
@Test
53+
void testInitWithNamesrvAddr() {
54+
nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", "", "");
55+
56+
mqClientAPIFactory = new MQClientAPIFactory(
57+
nameserverAccessConfig,
58+
"TestPrefix",
59+
2,
60+
clientRemotingProcessor,
61+
rpcHook,
62+
scheduledExecutorService
63+
);
64+
65+
assertEquals("127.0.0.1:9876", System.getProperty("rocketmq.namesrv.addr"));
66+
}
67+
68+
@Test
69+
void testInitWithNamesrvDomain() {
70+
nameserverAccessConfig = new NameserverAccessConfig("", "test-domain", "");
71+
72+
mqClientAPIFactory = new MQClientAPIFactory(
73+
nameserverAccessConfig,
74+
"TestPrefix",
75+
2,
76+
clientRemotingProcessor,
77+
rpcHook,
78+
scheduledExecutorService
79+
);
80+
81+
assertEquals("test-domain", System.getProperty("rocketmq.namesrv.domain"));
82+
}
83+
84+
@Test
85+
void testInitThrowsExceptionWhenBothEmpty() {
86+
nameserverAccessConfig = new NameserverAccessConfig("", "", "");
87+
88+
RuntimeException exception = assertThrows(RuntimeException.class, () -> new MQClientAPIFactory(
89+
nameserverAccessConfig,
90+
"TestPrefix",
91+
2,
92+
clientRemotingProcessor,
93+
rpcHook,
94+
scheduledExecutorService
95+
));
96+
97+
assertEquals("The configuration item NamesrvAddr is not configured", exception.getMessage());
98+
}
99+
100+
@Test
101+
void testStartCreatesClients() throws Exception {
102+
nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", "", "");
103+
104+
mqClientAPIFactory = new MQClientAPIFactory(
105+
nameserverAccessConfig,
106+
"TestPrefix",
107+
2,
108+
clientRemotingProcessor,
109+
rpcHook,
110+
scheduledExecutorService
111+
);
112+
113+
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:123");
114+
115+
mqClientAPIFactory.start();
116+
117+
// Assert
118+
MQClientAPIExt client = mqClientAPIFactory.getClient();
119+
List<String> nameServerAddressList = client.getNameServerAddressList();
120+
assertEquals(1, nameServerAddressList.size());
121+
assertEquals("127.0.0.1:9876", nameServerAddressList.get(0));
122+
}
123+
124+
@Test
125+
void testOnNameServerAddressChangeUpdatesAllClients() throws Exception {
126+
nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", "", "");
127+
128+
mqClientAPIFactory = new MQClientAPIFactory(
129+
nameserverAccessConfig,
130+
"TestPrefix",
131+
2,
132+
clientRemotingProcessor,
133+
rpcHook,
134+
scheduledExecutorService
135+
);
136+
mqClientAPIFactory.start();
137+
138+
// Act
139+
mqClientAPIFactory.onNameServerAddressChange("new-address0;new-address1");
140+
141+
MQClientAPIExt client = mqClientAPIFactory.getClient();
142+
List<String> nameServerAddressList = client.getNameServerAddressList();
143+
assertEquals(2, nameServerAddressList.size());
144+
assertEquals("new-address0", nameServerAddressList.get(0));
145+
}
146+
}

0 commit comments

Comments
 (0)