Skip to content

Commit 6694aec

Browse files
author
xiaoming.lt
committed
add timeout & log
1 parent 0d6a172 commit 6694aec

2 files changed

Lines changed: 35 additions & 8 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -778,20 +778,28 @@ public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(final String addr
778778

779779
DataVersion topicConfigDataVersion = null;
780780
DataVersion mappingDataVersion = null;
781+
long timeoutMills = getTimeoutMillis();
781782
int topicSeq = 0;
783+
long beginTime = System.nanoTime();
782784
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
783785
Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap = new ConcurrentHashMap<>();
784786
while (true) {
787+
long leftTime = timeoutMills - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
788+
if (leftTime < 0) {
789+
throw new RemotingTimeoutException("invokeSync call timeout");
790+
}
791+
785792
GetAllTopicConfigRequestHeader requestHeader = new GetAllTopicConfigRequestHeader();
786793
requestHeader.setTopicSeq(topicSeq);
787-
requestHeader.setMaxTopicNum(2000);
794+
requestHeader.setMaxTopicNum(getMaxPageSize());
788795
requestHeader.setDataVersion(Optional.ofNullable(topicConfigDataVersion).
789796
map(DataVersion::toJson).orElse(StringUtils.EMPTY));
790-
797+
LOGGER.info("getAllTopicConfig from seq {}, max {}, dataVersion {}",
798+
topicSeq, requestHeader.getMaxTopicNum(), requestHeader.getDataVersion());
791799
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, requestHeader);
792800

793801
RemotingCommand response = this.remotingClient.invokeSync(
794-
MixAll.brokerVIPChannel(true, addr), request, 3000);
802+
MixAll.brokerVIPChannel(true, addr), request, 30000);
795803

796804
assert response != null;
797805
if (response.getCode() == SUCCESS) {
@@ -919,20 +927,29 @@ public String getAllDelayOffset(
919927
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr)
920928
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
921929
RemotingConnectException, MQBrokerException, RemotingCommandException {
930+
931+
long timeoutMills = getTimeoutMillis();
922932
DataVersion currentDataVersion = null;
923933
int groupSeq = 0;
934+
long beginTime = System.nanoTime();
924935
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>();
925936
ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = new ConcurrentHashMap<>();
937+
926938
while (true) {
939+
long leftTime = timeoutMills - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
940+
if (leftTime < 0) {
941+
throw new RemotingTimeoutException("invokeSync call timeout");
942+
}
927943

928944
GetAllSubscriptionGroupRequestHeader requestHeader = new GetAllSubscriptionGroupRequestHeader();
929945
requestHeader.setGroupSeq(groupSeq);
930-
requestHeader.setMaxGroupNum(2000);
946+
requestHeader.setMaxGroupNum(getMaxPageSize());
931947
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
932948
.map(DataVersion::toJson).orElse(StringUtils.EMPTY));
933-
949+
LOGGER.info("getAllSubscriptionGroup from seq {}, max {}, dataVersion {}",
950+
groupSeq, requestHeader.getMaxGroupNum(), requestHeader.getDataVersion());
934951
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, requestHeader);
935-
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
952+
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 30000);
936953

937954
assert response != null;
938955
if (response.getCode() == SUCCESS) {
@@ -1610,4 +1627,12 @@ private PullResult processPullResult(final PullResultExt pullResult, String brok
16101627
return pullResult;
16111628
}
16121629

1630+
private int getMaxPageSize() {
1631+
return 2000;
1632+
}
1633+
1634+
private long getTimeoutMillis() {
1635+
return TimeUnit.SECONDS.toMillis(60);
1636+
}
1637+
16131638
}

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2836,7 +2836,8 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
28362836
requestHeader.setMaxGroupNum(clientConfig.getMaxPageSizeInGetMetadata());
28372837
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
28382838
.map(DataVersion::toJson).orElse(StringUtils.EMPTY));
2839-
2839+
log.info("getAllSubscriptionGroup from seq {}, max {}, dataVersion {}",
2840+
groupSeq, requestHeader.getMaxGroupNum(), requestHeader.getDataVersion());
28402841
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, requestHeader);
28412842
RemotingCommand response = this.remotingClient.invokeSync(
28422843
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, leftTime);
@@ -2930,7 +2931,8 @@ public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, long tim
29302931
requestHeader.setMaxTopicNum(clientConfig.getMaxPageSizeInGetMetadata());
29312932
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion).
29322933
map(DataVersion::toJson).orElse(StringUtils.EMPTY));
2933-
2934+
log.info("getAllTopicConfig from seq {}, max {}, dataVersion {}",
2935+
topicSeq, requestHeader.getMaxTopicNum(), requestHeader.getDataVersion());
29342936
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, requestHeader);
29352937

29362938
RemotingCommand response = this.remotingClient.invokeSync(

0 commit comments

Comments
 (0)