Skip to content

Commit 2b5e709

Browse files
authored
[ISSUE #9777 ]feat: use data version from master while sync slave and fix delete co… (#9778)
* feat: use data version from master while sync slave and fix delete config while sync Change-Id: I42b2e7b1acc6836d3c90973801c9defba5f1325c * fix: assign new version using master while sync slave Change-Id: I7ec20607a84499fe5a6607763013c59d726aedc3 * feat: allow set dataVersion directly for topic/group config sync Change-Id: Ic845794350e8bdaa847bdd0ae4b3e40ab1ad6311 * feat: set data version directly while sync from master Change-Id: I39e78477a5223b578a4ede3e5cb76f04368d1ca3 * test: adjust slave sync test for version Change-Id: I9e835568912928ddf6e81816095ee3ed8f93afc0
1 parent 343daa9 commit 2b5e709

File tree

6 files changed

+56
-19
lines changed

6 files changed

+56
-19
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,12 @@ public void updateKvDataVersion() throws Exception {
165165
JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
166166
}
167167

168+
public void setKvDataVersion(DataVersion dataVersion) throws Exception {
169+
this.kvDataVersion = dataVersion;
170+
this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length,
171+
JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
172+
}
173+
168174
public DataVersion getKvDataVersion() {
169175
return kvDataVersion;
170176
}

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,16 @@ public void updateDataVersion() {
250250
}
251251
}
252252

253+
@Override
254+
public void setDataVersion(DataVersion dataVersion) {
255+
try {
256+
rocksDBConfigManager.setKvDataVersion(dataVersion);
257+
} catch (Exception e) {
258+
log.error("set group config dataVersion error", e);
259+
throw new RuntimeException(e);
260+
}
261+
}
262+
253263
protected void decodeForbidden(byte[] key, byte[] body) {
254264
String forbiddenGroupName = new String(key, RocksDBConfigManager.CHARSET);
255265
JSONObject jsonObject = JSON.parseObject(new String(body, RocksDBConfigManager.CHARSET));

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,16 @@ public void updateDataVersion() {
205205
}
206206
}
207207

208+
@Override
209+
public void setDataVersion(DataVersion dataVersion) {
210+
try {
211+
rocksDBConfigManager.setKvDataVersion(dataVersion);
212+
} catch (Exception e) {
213+
log.error("set topic config dataVersion error", e);
214+
throw new RuntimeException(e);
215+
}
216+
}
217+
208218
/**
209219
* Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
210220
* enabled.

broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ private void syncTopicConfig() {
8383
TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager();
8484
if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) {
8585

86-
topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion());
87-
8886
ConcurrentMap<String, TopicConfig> newTopicConfigTable = topicWrapper.getTopicConfigTable();
8987
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();
9088

@@ -94,13 +92,13 @@ private void syncTopicConfig() {
9492
Map.Entry<String, TopicConfig> entry = iterator.next();
9593
if (!newTopicConfigTable.containsKey(entry.getKey())) {
9694
iterator.remove();
95+
topicConfigManager.deleteTopicConfig(entry.getKey());
9796
}
98-
topicConfigManager.deleteTopicConfig(entry.getKey());
9997
}
10098

10199
//update
102100
newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig);
103-
topicConfigManager.updateDataVersion();
101+
topicConfigManager.setDataVersion(topicWrapper.getDataVersion());
104102
topicConfigManager.persist();
105103
}
106104
if (topicWrapper.getTopicQueueMappingDetailMap() != null
@@ -177,7 +175,6 @@ private void syncSubscriptionGroupConfig() {
177175
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
178176
.equals(subscriptionWrapper.getDataVersion())) {
179177
SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager();
180-
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());
181178

182179
ConcurrentMap<String, SubscriptionGroupConfig> curSubscriptionGroupTable =
183180
subscriptionGroupManager.getSubscriptionGroupTable();
@@ -189,12 +186,12 @@ private void syncSubscriptionGroupConfig() {
189186
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
190187
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
191188
iterator.remove();
189+
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
192190
}
193-
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
194191
}
195192
// update
196193
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);
197-
subscriptionGroupManager.updateDataVersion();
194+
subscriptionGroupManager.setDataVersion(subscriptionWrapper.getDataVersion());
198195
// persist
199196
subscriptionGroupManager.persist();
200197
LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak);

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,8 @@ public void updateDataVersion() {
778778
dataVersion.nextVersion(stateMachineVersion);
779779
}
780780

781-
781+
public void setDataVersion(DataVersion dataVersion) {
782+
this.dataVersion.assignNewOne(dataVersion);
783+
}
782784

783785
}

broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import java.util.concurrent.ConcurrentHashMap;
5656

5757
import static org.mockito.ArgumentMatchers.anyString;
58+
import static org.mockito.Mockito.doNothing;
59+
import static org.mockito.Mockito.spy;
5860
import static org.mockito.Mockito.when;
5961

6062
@RunWith(MockitoJUnitRunner.class)
@@ -113,13 +115,9 @@ public void init() {
113115
when(brokerController.getMessageStore()).thenReturn(messageStore);
114116
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
115117
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
116-
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
117-
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
118118
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
119119
when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>());
120120
when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion());
121-
when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion());
122-
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>());
123121
when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager);
124122
when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(new ConcurrentHashMap<>());
125123
when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true);
@@ -136,17 +134,31 @@ public void init() {
136134
public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
137135
MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException {
138136
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
139-
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
137+
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = createTopicConfigWrapper(newTopicConfig);
138+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicConfigWrapper);
140139
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
141140
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
142-
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
141+
SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper();
142+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
143143
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
144144
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
145+
146+
TopicConfigManager topicConfigManager = new TopicConfigManager();
147+
TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager);
148+
doNothing().when(spiedTopicConfigManager).persist();
149+
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager();
150+
SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager);
151+
doNothing().when(spiedGroupConfigManager).persist();
152+
when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager);
153+
when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager);
154+
145155
slaveSynchronize.syncAll();
146-
Assert.assertEquals(1, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
147-
Assert.assertEquals(1, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion());
156+
long topicVer = topicConfigWrapper.getDataVersion().getStateVersion();
157+
long groupVer = subscriptionGroupWrapper.getDataVersion().getStateVersion();
158+
Assert.assertEquals(topicVer, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
159+
Assert.assertEquals(topicVer, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion());
148160
Assert.assertEquals(1, consumerOffsetManager.getDataVersion().getStateVersion());
149-
Assert.assertEquals(1, subscriptionGroupManager.getDataVersion().getStateVersion());
161+
Assert.assertEquals(groupVer, this.brokerController.getSubscriptionGroupManager().getDataVersion().getStateVersion());
150162
Assert.assertEquals(1, timerMetrics.getDataVersion().getStateVersion());
151163
}
152164

@@ -167,7 +179,7 @@ private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConf
167179
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());
168180
wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig);
169181
DataVersion dataVersion = new DataVersion();
170-
dataVersion.setStateVersion(1L);
182+
dataVersion.setStateVersion(5L);
171183
wrapper.setDataVersion(dataVersion);
172184
wrapper.setMappingDataVersion(dataVersion);
173185
return wrapper;
@@ -186,7 +198,7 @@ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
186198
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
187199
wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
188200
DataVersion dataVersion = new DataVersion();
189-
dataVersion.setStateVersion(1L);
201+
dataVersion.setStateVersion(5L);
190202
wrapper.setDataVersion(dataVersion);
191203
return wrapper;
192204
}

0 commit comments

Comments
 (0)