diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 5a9fa263e52..b8241bccb18 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -285,7 +285,7 @@ public class BrokerController { protected volatile BrokerMemberGroup brokerMemberGroup; protected EscapeBridge escapeBridge; protected List brokerAttachedPlugins = new ArrayList<>(); - protected volatile long shouldStartTime; + protected volatile long startupTime; private BrokerPreOnlineService brokerPreOnlineService; protected volatile boolean isIsolated = false; protected volatile long minBrokerIdInGroup = 0; @@ -1764,7 +1764,7 @@ protected void startBasicService() throws Exception { public void start() throws Exception { - this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart(); + this.startupTime = System.currentTimeMillis(); if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) { isIsolated = true; @@ -1785,8 +1785,9 @@ public void start() throws Exception { @Override public void run0() { try { - if (System.currentTimeMillis() < shouldStartTime) { - BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime); + long diff = System.currentTimeMillis() - startupTime; + if (diff >= 0 && diff < messageStoreConfig.getDisappearTimeAfterStart()) { + BrokerController.LOG.info("Register to namesrv after {}", startupTime + messageStoreConfig.getDisappearTimeAfterStart()); return; } if (isIsolated) { @@ -2533,8 +2534,8 @@ public EscapeBridge getEscapeBridge() { return escapeBridge; } - public long getShouldStartTime() { - return shouldStartTime; + public long getStartupTime() { + return startupTime; } public BrokerPreOnlineService getBrokerPreOnlineService() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java index 6749af3d750..b4ac95e3da6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java @@ -62,14 +62,14 @@ public void incrementInFlightMessageNum(String topic, String group, int queueId, } public void decrementInFlightMessageNum(String topic, String group, long popTime, int qId, int delta) { - if (popTime < this.brokerController.getShouldStartTime()) { + if (popTime < this.brokerController.getStartupTime()) { return; } decrementInFlightMessageNum(topic, group, qId, delta); } public void decrementInFlightMessageNum(PopCheckPoint checkPoint) { - if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) { + if (checkPoint.getPopTime() < this.brokerController.getStartupTime()) { return; } decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId(), 1); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 2be41a69d63..bced7d218ef 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -630,8 +630,9 @@ public void run() { int slow = 1; while (!this.isStopped()) { try { - if (System.currentTimeMillis() < brokerController.getShouldStartTime()) { - POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime()); + long diff = System.currentTimeMillis() - brokerController.getStartupTime(); + if (diff >= 0 && diff < brokerController.getMessageStoreConfig().getDisappearTimeAfterStart()) { + POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getStartupTime() + brokerController.getMessageStoreConfig().getDisappearTimeAfterStart()); this.waitForRunning(1000); continue; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java index dea59fc99e6..9e5566d3a1e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java @@ -30,7 +30,7 @@ public class PopInflightMessageCounterTest { public void testNum() { BrokerController brokerController = mock(BrokerController.class); long brokerStartTime = System.currentTimeMillis(); - when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime); + when(brokerController.getStartupTime()).thenReturn(brokerStartTime); PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController); final String topic = "topic"; @@ -67,7 +67,7 @@ public void testNum() { public void testClearInFlightMessageNum() { BrokerController brokerController = mock(BrokerController.class); long brokerStartTime = System.currentTimeMillis(); - when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime); + when(brokerController.getStartupTime()).thenReturn(brokerStartTime); PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController); final String topic = "topic"; diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java index 616188e52d1..d082b7d616d 100644 --- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java +++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java @@ -58,7 +58,7 @@ protected void initializeScheduledTasks() { @Override public void start() throws Exception { - this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart(); + this.startupTime = System.currentTimeMillis(); if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) { isIsolated = true; @@ -75,8 +75,9 @@ public void start() throws Exception { @Override public void run0() { try { - if (System.currentTimeMillis() < shouldStartTime) { - BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime); + long diff = System.currentTimeMillis() - startupTime; + if (diff >= 0 && diff < messageStoreConfig.getDisappearTimeAfterStart()) { + BrokerController.LOG.info("Register to namesrv after {}", startupTime + messageStoreConfig.getDisappearTimeAfterStart()); return; } if (isIsolated) { diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index d6af7b84e79..d0d811e8299 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -152,7 +152,7 @@ public class TimerMessageStore { private volatile BrokerRole lastBrokerRole = BrokerRole.SLAVE; //the dequeue is an asynchronous process, use this flag to track if the status has changed private boolean dequeueStatusChangeFlag = false; - private long shouldStartTime; + private long startupTime; // True if current store is master or current brokerId is equal to the minimum brokerId of the replica group in slaveActingMaster mode. protected volatile boolean shouldRunningDequeue; @@ -464,7 +464,7 @@ public static boolean isMagicOK(int magic) { } public void start() { - this.shouldStartTime = storeConfig.getDisappearTimeAfterStart() + System.currentTimeMillis(); + this.startupTime = System.currentTimeMillis(); maybeMoveWriteTime(); enqueueGetService.start(); enqueuePutService.start(); @@ -1439,8 +1439,9 @@ public void run() { TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); while (!this.isStopped()) { try { - if (System.currentTimeMillis() < shouldStartTime) { - TimerMessageStore.LOGGER.info("TimerDequeueGetService ready to run after {}.", shouldStartTime); + long diff = System.currentTimeMillis() - startupTime; + if (diff >= 0 && diff < storeConfig.getDisappearTimeAfterStart()) { + TimerMessageStore.LOGGER.info("TimerDequeueGetService ready to run after {}.", startupTime + storeConfig.getDisappearTimeAfterStart()); waitForRunning(1000); continue; }