Skip to content

Commit e0580bc

Browse files
fujian-zfjRongtongJin
authored andcommitted
[ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207)
* typo int readme[ecosystem] * fix slave sunc topic and sub in rocksdb ha mode
1 parent 137e7bf commit e0580bc

4 files changed

Lines changed: 32 additions & 17 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
7878
return true;
7979
}
8080

81-
8281
private boolean merge() {
8382
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
8483
log.info("subGroup json file does not exist, so skip merge");

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package org.apache.rocketmq.broker.slave;
1818

1919
import java.io.IOException;
20+
import java.util.Iterator;
21+
import java.util.Map;
2022
import java.util.concurrent.ConcurrentHashMap;
2123
import java.util.concurrent.ConcurrentMap;
2224

2325
import org.apache.commons.lang3.StringUtils;
2426
import org.apache.rocketmq.broker.BrokerController;
2527
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
2628
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
29+
import org.apache.rocketmq.broker.topic.TopicConfigManager;
2730
import org.apache.rocketmq.common.MixAll;
2831
import org.apache.rocketmq.common.TopicConfig;
2932
import org.apache.rocketmq.common.constant.LoggerName;
@@ -77,20 +80,28 @@ private void syncTopicConfig() {
7780
try {
7881
TopicConfigAndMappingSerializeWrapper topicWrapper =
7982
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
80-
if (!this.brokerController.getTopicConfigManager().getDataVersion()
81-
.equals(topicWrapper.getDataVersion())) {
83+
TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager();
84+
if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) {
8285

83-
this.brokerController.getTopicConfigManager().getDataVersion()
84-
.assignNewOne(topicWrapper.getDataVersion());
86+
topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion());
8587

8688
ConcurrentMap<String, TopicConfig> newTopicConfigTable = topicWrapper.getTopicConfigTable();
89+
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();
90+
8791
//delete
88-
ConcurrentMap<String, TopicConfig> topicConfigTable = this.brokerController.getTopicConfigManager().getTopicConfigTable();
89-
topicConfigTable.entrySet().removeIf(item -> !newTopicConfigTable.containsKey(item.getKey()));
92+
Iterator<Map.Entry<String, TopicConfig>> iterator = topicConfigTable.entrySet().iterator();
93+
while (iterator.hasNext()) {
94+
Map.Entry<String, TopicConfig> entry = iterator.next();
95+
if (!newTopicConfigTable.containsKey(entry.getKey())) {
96+
iterator.remove();
97+
}
98+
topicConfigManager.deleteTopicConfig(entry.getKey());
99+
}
100+
90101
//update
91-
topicConfigTable.putAll(newTopicConfigTable);
102+
newTopicConfigTable.values().forEach(topicConfigManager::updateSingleTopicConfigWithoutPersist);
92103

93-
this.brokerController.getTopicConfigManager().persist();
104+
topicConfigManager.persist();
94105
}
95106
if (topicWrapper.getTopicQueueMappingDetailMap() != null
96107
&& !topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion())) {
@@ -165,19 +176,24 @@ private void syncSubscriptionGroupConfig() {
165176

166177
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
167178
.equals(subscriptionWrapper.getDataVersion())) {
168-
SubscriptionGroupManager subscriptionGroupManager =
169-
this.brokerController.getSubscriptionGroupManager();
170-
subscriptionGroupManager.getDataVersion().assignNewOne(
171-
subscriptionWrapper.getDataVersion());
179+
SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager();
180+
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());
172181

173182
ConcurrentMap<String, SubscriptionGroupConfig> curSubscriptionGroupTable =
174183
subscriptionGroupManager.getSubscriptionGroupTable();
175184
ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable =
176185
subscriptionWrapper.getSubscriptionGroupTable();
177186
// delete
178-
curSubscriptionGroupTable.entrySet().removeIf(e -> !newSubscriptionGroupTable.containsKey(e.getKey()));
187+
Iterator<Map.Entry<String, SubscriptionGroupConfig>> iterator = curSubscriptionGroupTable.entrySet().iterator();
188+
while (iterator.hasNext()) {
189+
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
190+
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
191+
iterator.remove();
192+
}
193+
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
194+
}
179195
// update
180-
curSubscriptionGroupTable.putAll(newSubscriptionGroupTable);
196+
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::updateSubscriptionGroupConfigWithoutPersist);
181197
// persist
182198
subscriptionGroupManager.persist();
183199
LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak);

broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config)
143143
this.persist();
144144
}
145145

146-
protected void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
146+
public void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
147147
Map<String, String> newAttributes = request(config);
148148
Map<String, String> currentAttributes = current(config.getGroupName());
149149

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
497497
}
498498
}
499499

500-
protected void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) {
500+
public void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) {
501501
checkNotNull(topicConfig, "topicConfig shouldn't be null");
502502

503503
Map<String, String> newAttributes = request(topicConfig);

0 commit comments

Comments
 (0)