|
36 | 36 | import org.apache.rocketmq.remoting.protocol.DataVersion; |
37 | 37 | import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; |
38 | 38 | import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper; |
| 39 | +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; |
39 | 40 | import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; |
40 | 41 | import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; |
41 | 42 | import org.apache.rocketmq.store.MessageStore; |
@@ -205,4 +206,72 @@ private TimerMetrics.TimerMetricsSerializeWrapper createTimerMetricsWrapper() { |
205 | 206 | wrapper.setDataVersion(dataVersion); |
206 | 207 | return wrapper; |
207 | 208 | } |
| 209 | + |
| 210 | + @Test |
| 211 | + public void testSyncTopicConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { |
| 212 | + // Setup: Slave has existing topics that should be deleted |
| 213 | + TopicConfig existingTopic1 = new TopicConfig("ToBeDeleted1"); |
| 214 | + TopicConfig existingTopic2 = new TopicConfig("ToBeDeleted2"); |
| 215 | + TopicConfig existingTopic3 = new TopicConfig("ToBeKept"); |
| 216 | + |
| 217 | + when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); |
| 218 | + topicConfigManager.getTopicConfigTable().put("ToBeDeleted1", existingTopic1); |
| 219 | + topicConfigManager.getTopicConfigTable().put("ToBeDeleted2", existingTopic2); |
| 220 | + topicConfigManager.getTopicConfigTable().put("ToBeKept", existingTopic3); |
| 221 | + |
| 222 | + // Master returns only some topics (not the ones to be deleted) |
| 223 | + TopicConfig newTopic = new TopicConfig("NewTopic"); |
| 224 | + TopicConfigAndMappingSerializeWrapper wrapper = createTopicConfigWrapper(newTopic); |
| 225 | + wrapper.getTopicConfigTable().put("ToBeKept", existingTopic3); |
| 226 | + |
| 227 | + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(wrapper); |
| 228 | + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); |
| 229 | + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); |
| 230 | + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper()); |
| 231 | + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); |
| 232 | + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); |
| 233 | + when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); |
| 234 | + |
| 235 | + slaveSynchronize.syncAll(); |
| 236 | + |
| 237 | + // Verify deleteTopicConfig was called for the removed topics |
| 238 | + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted1"); |
| 239 | + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted2"); |
| 240 | + } |
| 241 | + |
| 242 | + @Test |
| 243 | + public void testSyncSubscriptionGroupConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { |
| 244 | + // Setup: Slave has existing subscription groups that should be deleted |
| 245 | + SubscriptionGroupConfig existingGroup1 = new SubscriptionGroupConfig(); |
| 246 | + existingGroup1.setGroupName("ToBeDeletedGroup1"); |
| 247 | + |
| 248 | + SubscriptionGroupConfig existingGroup2 = new SubscriptionGroupConfig(); |
| 249 | + existingGroup2.setGroupName("ToBeDeletedGroup2"); |
| 250 | + |
| 251 | + SubscriptionGroupConfig existingGroup3 = new SubscriptionGroupConfig(); |
| 252 | + existingGroup3.setGroupName("ToBeKeptGroup"); |
| 253 | + |
| 254 | + when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); |
| 255 | + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup1", existingGroup1); |
| 256 | + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup2", existingGroup2); |
| 257 | + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); |
| 258 | + |
| 259 | + // Master returns only some subscription groups (not the ones to be deleted) |
| 260 | + SubscriptionGroupWrapper wrapper = createSubscriptionGroupWrapper(); |
| 261 | + wrapper.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); |
| 262 | + |
| 263 | + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(new TopicConfig("NewTopic"))); |
| 264 | + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); |
| 265 | + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); |
| 266 | + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(wrapper); |
| 267 | + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); |
| 268 | + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); |
| 269 | + when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion()); |
| 270 | + |
| 271 | + slaveSynchronize.syncAll(); |
| 272 | + |
| 273 | + // Verify deleteSubscriptionGroupConfig was called for the removed groups |
| 274 | + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup1"); |
| 275 | + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup2"); |
| 276 | + } |
208 | 277 | } |
0 commit comments