Skip to content

Commit 8fc57f1

Browse files
authored
[ISSUE #9717] Fix RaftBrokerHeartBeatManager#scanNotActiveBroker was not actually executed
1 parent 47c07a9 commit 8fc57f1

2 files changed

Lines changed: 91 additions & 2 deletions

File tree

controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class RaftBrokerHeartBeatManager implements BrokerHeartbeatManager {
6363

6464
// resolve the scene
6565
// when controller all down and startup again, we wait for some time to avoid electing a new leader,which is not necessary
66-
private long firstReceivedHeartbeatTime = -1;
66+
private volatile long firstReceivedHeartbeatTime = -1;
6767

6868
public RaftBrokerHeartBeatManager(ControllerConfig controllerConfig) {
6969
this.scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RaftBrokerHeartbeatManager_scheduledService_"));
@@ -188,7 +188,8 @@ private void scanNotActiveBroker() {
188188
}
189189

190190
// if has not received any heartbeat from broker, we do not need to scan
191-
if (this.firstReceivedHeartbeatTime + controllerConfig.getJraftConfig().getjRaftScanWaitTimeoutMs() < System.currentTimeMillis()) {
191+
if (this.firstReceivedHeartbeatTime == -1 ||
192+
this.firstReceivedHeartbeatTime + controllerConfig.getJraftConfig().getjRaftScanWaitTimeoutMs() > System.currentTimeMillis()) {
192193
log.info("has not received any heartbeat from broker, skip scan not active broker");
193194
return;
194195
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.controller.impl;
18+
19+
import com.alibaba.fastjson.JSON;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.DefaultChannelPromise;
22+
import java.util.Collections;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import org.apache.rocketmq.common.ControllerConfig;
27+
import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
28+
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;
29+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
30+
import org.apache.rocketmq.remoting.protocol.ResponseCode;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.mockito.Mock;
35+
import org.mockito.junit.MockitoJUnitRunner;
36+
37+
import static org.junit.Assert.assertTrue;
38+
import static org.mockito.ArgumentMatchers.any;
39+
import static org.mockito.Mockito.when;
40+
41+
@RunWith(MockitoJUnitRunner.class)
42+
public class RaftBrokerHeartBeatManagerTest {
43+
@Mock
44+
private JRaftController jRaftController;
45+
@Mock
46+
private Channel brokerChannel;
47+
private RaftBrokerHeartBeatManager heartbeatManager;
48+
private final ControllerConfig config = new ControllerConfig();
49+
50+
@Before
51+
public void init() {
52+
when(jRaftController.isLeaderState()).thenReturn(true);
53+
config.setScanNotActiveBrokerInterval(1000);
54+
this.heartbeatManager = new RaftBrokerHeartBeatManager(config);
55+
this.heartbeatManager.setController(jRaftController);
56+
this.heartbeatManager.initialize();
57+
this.heartbeatManager.start();
58+
}
59+
60+
@Test
61+
public void testDetectBrokerAlive() throws InterruptedException {
62+
final CountDownLatch latch = new CountDownLatch(1);
63+
this.heartbeatManager.registerBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
64+
latch.countDown(); // onBrokerInactive
65+
});
66+
String clusterName = "cluster-1";
67+
String brokerName = "broker-1";
68+
String brokerAddr = "127.0.0.1:10911";
69+
long brokerId = 1L;
70+
RemotingCommand onBrokerHeartbeat = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
71+
RemotingCommand checkNotActiveResp1 = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
72+
checkNotActiveResp1.setBody(JSON.toJSONBytes(Collections.emptyList()));
73+
RemotingCommand checkNotActiveResp2 = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
74+
checkNotActiveResp2.setBody(JSON.toJSONBytes(Collections.singletonList(new BrokerIdentityInfo(clusterName, brokerName, brokerId))));
75+
when(jRaftController.onBrokerHeartBeat(any()))
76+
.thenReturn(CompletableFuture.completedFuture(onBrokerHeartbeat));
77+
when(jRaftController.checkNotActiveBroker(any()))
78+
.thenReturn(CompletableFuture.completedFuture(checkNotActiveResp1))
79+
.thenReturn(CompletableFuture.completedFuture(checkNotActiveResp2));
80+
DefaultChannelPromise channelPromise = new DefaultChannelPromise(brokerChannel);
81+
channelPromise.setSuccess();
82+
when(brokerChannel.close()).thenReturn(channelPromise);
83+
this.heartbeatManager.onBrokerHeartbeat(clusterName, brokerName, brokerAddr, brokerId, 3000L, brokerChannel,
84+
1, 1L, -1L, 0);
85+
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
86+
this.heartbeatManager.shutdown();
87+
}
88+
}

0 commit comments

Comments
 (0)