Skip to content

Commit 0d6a172

Browse files
committed
BrokerOuterAPI
1 parent dde925c commit 0d6a172

3 files changed

Lines changed: 149 additions & 27 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 143 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
2525
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.Optional;
2629
import java.util.Set;
2730
import java.util.concurrent.ArrayBlockingQueue;
2831
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.ConcurrentMap;
2934
import java.util.concurrent.CopyOnWriteArrayList;
3035
import java.util.concurrent.CountDownLatch;
3136
import java.util.concurrent.ExecutorService;
@@ -102,6 +107,10 @@
102107
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
103108
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
104109
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
110+
import org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
111+
import org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
112+
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
113+
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
105114
import org.apache.rocketmq.remoting.protocol.header.GetBrokerMemberGroupRequestHeader;
106115
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
107116
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
@@ -139,6 +148,8 @@
139148
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
140149
import org.apache.rocketmq.remoting.protocol.route.QueueData;
141150
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
151+
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
152+
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
142153
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
143154
import org.apache.rocketmq.remoting.rpc.RpcClient;
144155
import org.apache.rocketmq.remoting.rpc.RpcClientImpl;
@@ -761,22 +772,78 @@ public void run0() {
761772
return changedList;
762773
}
763774

764-
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
765-
final String addr) throws RemotingConnectException, RemotingSendRequestException,
766-
RemotingTimeoutException, InterruptedException, MQBrokerException {
767-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
775+
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(final String addr)
776+
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
777+
InterruptedException, MQBrokerException, RemotingCommandException {
778+
779+
DataVersion topicConfigDataVersion = null;
780+
DataVersion mappingDataVersion = null;
781+
int topicSeq = 0;
782+
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
783+
Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap = new ConcurrentHashMap<>();
784+
while (true) {
785+
GetAllTopicConfigRequestHeader requestHeader = new GetAllTopicConfigRequestHeader();
786+
requestHeader.setTopicSeq(topicSeq);
787+
requestHeader.setMaxTopicNum(2000);
788+
requestHeader.setDataVersion(Optional.ofNullable(topicConfigDataVersion).
789+
map(DataVersion::toJson).orElse(StringUtils.EMPTY));
790+
791+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, requestHeader);
792+
793+
RemotingCommand response = this.remotingClient.invokeSync(
794+
MixAll.brokerVIPChannel(true, addr), request, 3000);
795+
796+
assert response != null;
797+
if (response.getCode() == SUCCESS) {
798+
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper =
799+
TopicConfigAndMappingSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
800+
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
801+
topicQueueMappingDetailMap.putAll(topicConfigSerializeWrapper.getTopicQueueMappingDetailMap());
802+
topicSeq += topicConfigSerializeWrapper.getTopicConfigTable().size();
803+
804+
805+
DataVersion newDataVersion = topicConfigSerializeWrapper.getDataVersion();
806+
if (topicConfigDataVersion == null) {
807+
// fill dataVersion before break the loop to compatible with old version server
808+
topicConfigDataVersion = newDataVersion;
809+
mappingDataVersion = topicConfigSerializeWrapper.getMappingDataVersion();
810+
}
768811

769-
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
770-
assert response != null;
771-
switch (response.getCode()) {
772-
case ResponseCode.SUCCESS: {
773-
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
812+
GetAllTopicConfigResponseHeader responseHeader =
813+
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
814+
Integer totalTopicNum = Optional.ofNullable(responseHeader)
815+
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
816+
817+
if (Objects.isNull(totalTopicNum)) { // compatible with old version server
818+
// the server side don't support totalTopicNum, all data is returned
819+
break;
820+
}
821+
822+
if (!Objects.equals(topicConfigDataVersion, newDataVersion)) {
823+
LOGGER.error("dataVersion changed, currentDataVersion: {}, newDataVersion: {}", topicConfigDataVersion, newDataVersion);
824+
topicConfigDataVersion = newDataVersion;
825+
mappingDataVersion = topicConfigSerializeWrapper.getMappingDataVersion();
826+
topicSeq = 0;
827+
topicConfigTable.clear();
828+
continue;
829+
}
830+
831+
if (topicSeq >= totalTopicNum - 1) {
832+
LOGGER.info("get all topic config, totalTopicNum: {}", totalTopicNum);
833+
break;
834+
}
835+
} else {
836+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
774837
}
775-
default:
776-
break;
838+
777839
}
778840

779-
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
841+
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
842+
topicConfigSerializeWrapper.setDataVersion(topicConfigDataVersion);
843+
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
844+
topicConfigSerializeWrapper.setMappingDataVersion(mappingDataVersion);
845+
topicConfigSerializeWrapper.setTopicQueueMappingDetailMap(topicQueueMappingDetailMap);
846+
return topicConfigSerializeWrapper;
780847
}
781848

782849
public TimerCheckpoint getTimerCheckPoint(
@@ -849,21 +916,73 @@ public String getAllDelayOffset(
849916
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
850917
}
851918

852-
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
853-
final String addr) throws InterruptedException, RemotingTimeoutException,
854-
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
855-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
856-
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
857-
assert response != null;
858-
switch (response.getCode()) {
859-
case ResponseCode.SUCCESS: {
860-
return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
919+
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr)
920+
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
921+
RemotingConnectException, MQBrokerException, RemotingCommandException {
922+
DataVersion currentDataVersion = null;
923+
int groupSeq = 0;
924+
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>();
925+
ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = new ConcurrentHashMap<>();
926+
while (true) {
927+
928+
GetAllSubscriptionGroupRequestHeader requestHeader = new GetAllSubscriptionGroupRequestHeader();
929+
requestHeader.setGroupSeq(groupSeq);
930+
requestHeader.setMaxGroupNum(2000);
931+
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
932+
.map(DataVersion::toJson).orElse(StringUtils.EMPTY));
933+
934+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, requestHeader);
935+
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
936+
937+
assert response != null;
938+
if (response.getCode() == SUCCESS) {
939+
SubscriptionGroupWrapper subscriptionGroupWrapper =
940+
SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
941+
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
942+
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
943+
944+
DataVersion newDataVersion = subscriptionGroupWrapper.getDataVersion();
945+
if (currentDataVersion == null) {
946+
// fill dataVersion before break the loop to compatible with old version server
947+
currentDataVersion = newDataVersion;
948+
}
949+
950+
groupSeq += subscriptionGroupWrapper.getSubscriptionGroupTable().size();
951+
952+
GetAllSubscriptionGroupResponseHeader responseHeader =
953+
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
954+
Integer totalGroupNum = Optional.ofNullable(responseHeader)
955+
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
956+
957+
if (Objects.isNull(totalGroupNum)) {
958+
// the server side don't support totalGroupNum, all data is returned
959+
break;
960+
}
961+
962+
if (!Objects.equals(currentDataVersion, newDataVersion)) {
963+
LOGGER.error("dataVersion changed, currentDataVersion: {}, newDataVersion: {}",
964+
currentDataVersion, newDataVersion);
965+
currentDataVersion = newDataVersion;
966+
groupSeq = 0;
967+
subscriptionGroupTable.clear();
968+
forbiddenTable.clear();
969+
continue;
970+
}
971+
972+
if (groupSeq >= totalGroupNum - 1) {
973+
LOGGER.info("get all subscription group config, totalGroupNum: {}", totalGroupNum);
974+
break;
975+
}
976+
} else {
977+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
861978
}
862-
default:
863-
break;
864979
}
865980

866-
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
981+
SubscriptionGroupWrapper allSubscriptionGroup = new SubscriptionGroupWrapper();
982+
allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
983+
allSubscriptionGroup.setForbiddenTable(forbiddenTable);
984+
allSubscriptionGroup.setDataVersion(currentDataVersion);
985+
return allSubscriptionGroup;
867986
}
868987

869988
public void registerRPCHook(RPCHook rpcHook) {

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.rocketmq.broker.topic.TopicConfigManager;
3030
import org.apache.rocketmq.client.exception.MQBrokerException;
3131
import org.apache.rocketmq.common.BrokerConfig;
32+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
3233
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
3334
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
3435
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -113,8 +114,8 @@ private MessageRequestModeSerializeWrapper createMessageRequestModeWrapper() {
113114

114115
@Test
115116
public void testSyncAtomically()
116-
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
117-
InterruptedException {
117+
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
118+
InterruptedException, RemotingCommandException {
118119
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
119120
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
120121

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.rocketmq.client.exception.MQBrokerException;
2828
import org.apache.rocketmq.common.BrokerConfig;
2929
import org.apache.rocketmq.common.TopicConfig;
30+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
3031
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
3132
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
3233
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -132,7 +133,8 @@ public void init() {
132133
}
133134

134135
@Test
135-
public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
136+
public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
137+
MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException {
136138
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
137139
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
138140
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());

0 commit comments

Comments
 (0)