Skip to content

Commit 7cb1b1e

Browse files
ltamberxiaoming.lt
andauthored
[ISSUE #9467] Implement paged retrieval of Topic and SubscriptionGroup information (#9468)
* Implement paged retrieval of Topic and SubscriptionGroup information - Implemented pagination logic to support data retrieval by sequence number and maximum count. - Added data version checking to ensure the retrieved data is the latest. - Optimized the result structure to include total count and current page data. - Added unit tests * BrokerOuterAPI * add timeout & log --------- Co-authored-by: xiaoming.lt <xiaoming.lt@alibaba-inc.com>
1 parent be67d61 commit 7cb1b1e

22 files changed

Lines changed: 1072 additions & 117 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 168 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
2525
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.Optional;
2629
import java.util.Set;
2730
import java.util.concurrent.ArrayBlockingQueue;
2831
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.ConcurrentMap;
2934
import java.util.concurrent.CopyOnWriteArrayList;
3035
import java.util.concurrent.CountDownLatch;
3136
import java.util.concurrent.ExecutorService;
@@ -102,6 +107,10 @@
102107
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
103108
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
104109
import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
110+
import org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
111+
import org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
112+
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
113+
import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
105114
import org.apache.rocketmq.remoting.protocol.header.GetBrokerMemberGroupRequestHeader;
106115
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
107116
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
@@ -139,6 +148,8 @@
139148
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
140149
import org.apache.rocketmq.remoting.protocol.route.QueueData;
141150
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
151+
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
152+
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
142153
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
143154
import org.apache.rocketmq.remoting.rpc.RpcClient;
144155
import org.apache.rocketmq.remoting.rpc.RpcClientImpl;
@@ -761,22 +772,86 @@ public void run0() {
761772
return changedList;
762773
}
763774

764-
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
765-
final String addr) throws RemotingConnectException, RemotingSendRequestException,
766-
RemotingTimeoutException, InterruptedException, MQBrokerException {
767-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
775+
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(final String addr)
776+
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
777+
InterruptedException, MQBrokerException, RemotingCommandException {
778+
779+
DataVersion topicConfigDataVersion = null;
780+
DataVersion mappingDataVersion = null;
781+
long timeoutMills = getTimeoutMillis();
782+
int topicSeq = 0;
783+
long beginTime = System.nanoTime();
784+
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
785+
Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap = new ConcurrentHashMap<>();
786+
while (true) {
787+
long leftTime = timeoutMills - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
788+
if (leftTime < 0) {
789+
throw new RemotingTimeoutException("invokeSync call timeout");
790+
}
768791

769-
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
770-
assert response != null;
771-
switch (response.getCode()) {
772-
case ResponseCode.SUCCESS: {
773-
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
792+
GetAllTopicConfigRequestHeader requestHeader = new GetAllTopicConfigRequestHeader();
793+
requestHeader.setTopicSeq(topicSeq);
794+
requestHeader.setMaxTopicNum(getMaxPageSize());
795+
requestHeader.setDataVersion(Optional.ofNullable(topicConfigDataVersion).
796+
map(DataVersion::toJson).orElse(StringUtils.EMPTY));
797+
LOGGER.info("getAllTopicConfig from seq {}, max {}, dataVersion {}",
798+
topicSeq, requestHeader.getMaxTopicNum(), requestHeader.getDataVersion());
799+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, requestHeader);
800+
801+
RemotingCommand response = this.remotingClient.invokeSync(
802+
MixAll.brokerVIPChannel(true, addr), request, 30000);
803+
804+
assert response != null;
805+
if (response.getCode() == SUCCESS) {
806+
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper =
807+
TopicConfigAndMappingSerializeWrapper.decode(response.getBody(), TopicConfigAndMappingSerializeWrapper.class);
808+
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
809+
topicQueueMappingDetailMap.putAll(topicConfigSerializeWrapper.getTopicQueueMappingDetailMap());
810+
topicSeq += topicConfigSerializeWrapper.getTopicConfigTable().size();
811+
812+
813+
DataVersion newDataVersion = topicConfigSerializeWrapper.getDataVersion();
814+
if (topicConfigDataVersion == null) {
815+
// fill dataVersion before break the loop to compatible with old version server
816+
topicConfigDataVersion = newDataVersion;
817+
mappingDataVersion = topicConfigSerializeWrapper.getMappingDataVersion();
818+
}
819+
820+
GetAllTopicConfigResponseHeader responseHeader =
821+
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
822+
Integer totalTopicNum = Optional.ofNullable(responseHeader)
823+
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
824+
825+
if (Objects.isNull(totalTopicNum)) { // compatible with old version server
826+
// the server side don't support totalTopicNum, all data is returned
827+
break;
828+
}
829+
830+
if (!Objects.equals(topicConfigDataVersion, newDataVersion)) {
831+
LOGGER.error("dataVersion changed, currentDataVersion: {}, newDataVersion: {}", topicConfigDataVersion, newDataVersion);
832+
topicConfigDataVersion = newDataVersion;
833+
mappingDataVersion = topicConfigSerializeWrapper.getMappingDataVersion();
834+
topicSeq = 0;
835+
topicConfigTable.clear();
836+
continue;
837+
}
838+
839+
if (topicSeq >= totalTopicNum - 1) {
840+
LOGGER.info("get all topic config, totalTopicNum: {}", totalTopicNum);
841+
break;
842+
}
843+
} else {
844+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
774845
}
775-
default:
776-
break;
846+
777847
}
778848

779-
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
849+
TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
850+
topicConfigSerializeWrapper.setDataVersion(topicConfigDataVersion);
851+
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
852+
topicConfigSerializeWrapper.setMappingDataVersion(mappingDataVersion);
853+
topicConfigSerializeWrapper.setTopicQueueMappingDetailMap(topicQueueMappingDetailMap);
854+
return topicConfigSerializeWrapper;
780855
}
781856

782857
public TimerCheckpoint getTimerCheckPoint(
@@ -849,21 +924,82 @@ public String getAllDelayOffset(
849924
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
850925
}
851926

852-
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
853-
final String addr) throws InterruptedException, RemotingTimeoutException,
854-
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
855-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
856-
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
857-
assert response != null;
858-
switch (response.getCode()) {
859-
case ResponseCode.SUCCESS: {
860-
return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
927+
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr)
928+
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
929+
RemotingConnectException, MQBrokerException, RemotingCommandException {
930+
931+
long timeoutMills = getTimeoutMillis();
932+
DataVersion currentDataVersion = null;
933+
int groupSeq = 0;
934+
long beginTime = System.nanoTime();
935+
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<>();
936+
ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = new ConcurrentHashMap<>();
937+
938+
while (true) {
939+
long leftTime = timeoutMills - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
940+
if (leftTime < 0) {
941+
throw new RemotingTimeoutException("invokeSync call timeout");
942+
}
943+
944+
GetAllSubscriptionGroupRequestHeader requestHeader = new GetAllSubscriptionGroupRequestHeader();
945+
requestHeader.setGroupSeq(groupSeq);
946+
requestHeader.setMaxGroupNum(getMaxPageSize());
947+
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
948+
.map(DataVersion::toJson).orElse(StringUtils.EMPTY));
949+
LOGGER.info("getAllSubscriptionGroup from seq {}, max {}, dataVersion {}",
950+
groupSeq, requestHeader.getMaxGroupNum(), requestHeader.getDataVersion());
951+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, requestHeader);
952+
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 30000);
953+
954+
assert response != null;
955+
if (response.getCode() == SUCCESS) {
956+
SubscriptionGroupWrapper subscriptionGroupWrapper =
957+
SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
958+
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
959+
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
960+
961+
DataVersion newDataVersion = subscriptionGroupWrapper.getDataVersion();
962+
if (currentDataVersion == null) {
963+
// fill dataVersion before break the loop to compatible with old version server
964+
currentDataVersion = newDataVersion;
965+
}
966+
967+
groupSeq += subscriptionGroupWrapper.getSubscriptionGroupTable().size();
968+
969+
GetAllSubscriptionGroupResponseHeader responseHeader =
970+
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
971+
Integer totalGroupNum = Optional.ofNullable(responseHeader)
972+
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
973+
974+
if (Objects.isNull(totalGroupNum)) {
975+
// the server side don't support totalGroupNum, all data is returned
976+
break;
977+
}
978+
979+
if (!Objects.equals(currentDataVersion, newDataVersion)) {
980+
LOGGER.error("dataVersion changed, currentDataVersion: {}, newDataVersion: {}",
981+
currentDataVersion, newDataVersion);
982+
currentDataVersion = newDataVersion;
983+
groupSeq = 0;
984+
subscriptionGroupTable.clear();
985+
forbiddenTable.clear();
986+
continue;
987+
}
988+
989+
if (groupSeq >= totalGroupNum - 1) {
990+
LOGGER.info("get all subscription group config, totalGroupNum: {}", totalGroupNum);
991+
break;
992+
}
993+
} else {
994+
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
861995
}
862-
default:
863-
break;
864996
}
865997

866-
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
998+
SubscriptionGroupWrapper allSubscriptionGroup = new SubscriptionGroupWrapper();
999+
allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
1000+
allSubscriptionGroup.setForbiddenTable(forbiddenTable);
1001+
allSubscriptionGroup.setDataVersion(currentDataVersion);
1002+
return allSubscriptionGroup;
8671003
}
8681004

8691005
public void registerRPCHook(RPCHook rpcHook) {
@@ -1491,4 +1627,12 @@ private PullResult processPullResult(final PullResultExt pullResult, String brok
14911627
return pullResult;
14921628
}
14931629

1630+
private int getMaxPageSize() {
1631+
return 2000;
1632+
}
1633+
1634+
private long getTimeoutMillis() {
1635+
return TimeUnit.SECONDS.toMillis(60);
1636+
}
1637+
14941638
}

0 commit comments

Comments
 (0)