From 89b34009072f807eba773d2d8ffdb52a8d2d6f0f Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 2 Dec 2025 10:29:04 +0800 Subject: [PATCH] Remove AbstractBrokerRunnable and replace with Runnable Change-Id: I94104151c452d09cbe195a3e1126a473b662a337 --- .../rocketmq/broker/BrokerController.java | 13 +++-- .../DefaultConsumerIdsChangeListener.java | 5 +- .../broker/latency/BrokerFastFailure.java | 5 +- .../rocketmq/broker/out/BrokerOuterAPI.java | 35 ++++++-------- .../common/AbstractBrokerRunnable.java | 48 ------------------- .../rocketmq/container/BrokerContainer.java | 13 +++-- .../container/InnerBrokerController.java | 9 ++-- .../rocketmq/store/DefaultMessageStore.java | 17 ++++--- .../rocketmq/store/index/IndexService.java | 5 +- 9 files changed, 45 insertions(+), 105 deletions(-) delete mode 100644 common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 7b1701c61a0..8d292a58679 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -97,7 +97,6 @@ import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl; import org.apache.rocketmq.broker.util.HookUtils; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.MixAll; @@ -1814,9 +1813,9 @@ public void start() throws Exception { this.registerBrokerAll(true, false, true); } - scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { if (System.currentTimeMillis() < shouldStartTime) { BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime); @@ -1836,9 +1835,9 @@ public void run0() { if (this.brokerConfig.isEnableSlaveActingMaster()) { scheduleSendHeartbeat(); - scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { BrokerController.this.syncBrokerMemberGroup(); } catch (Throwable e) { @@ -1869,9 +1868,9 @@ public void run() { } protected void scheduleSendHeartbeat() { - scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { if (isIsolated) { return; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index e0461769568..2946e03e1ae 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -48,9 +47,9 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen public DefaultConsumerIdsChangeListener(BrokerController brokerController) { this.brokerController = brokerController; - scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) { + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { notifyConsumerChange(); } catch (Exception e) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index ce8fdd88579..31bdb838a2c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; @@ -80,9 +79,9 @@ public static RequestTask castRunnable(final Runnable runnable) { } public void start() { - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerController.getBrokerConfig()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) { cleanExpiredRequest(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 21ba349c84c..ba4ba2ccf9e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -46,7 +46,6 @@ import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MixAll; @@ -356,18 +355,14 @@ public void sendHeartbeatViaDataVersion( requestHeader.setClusterName(clusterName); for (final String namesrvAddr : nameServerAddressList) { - brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) { - - @Override - public void run0() { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); - request.setBody(dataVersion.encode()); + brokerOuterExecutor.execute(() -> { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); + request.setBody(dataVersion.encode()); - try { - BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis); - } catch (Exception e) { - LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e); - } + try { + BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis); + } catch (Exception e) { + LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e); } }); } @@ -389,9 +384,9 @@ public void sendHeartbeat(final String clusterName, if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (final String namesrvAddr : nameServerAddressList) { - brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) { + brokerOuterExecutor.execute(new Runnable() { @Override - public void run0() { + public void run() { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader); try { @@ -532,9 +527,9 @@ public List registerBrokerAll( requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { - brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) { + brokerOuterExecutor.execute(new Runnable() { @Override - public void run0() { + public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { @@ -719,9 +714,9 @@ public List needRegister( if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { - brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) { + brokerOuterExecutor.execute(new Runnable() { @Override - public void run0() { + public void run() { try { QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); @@ -1501,9 +1496,9 @@ public void sendHeartbeatToController(final String controllerAddress, requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills); requestHeader.setElectionPriority(electionPriority); requestHeader.setBrokerId(brokerId); - brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) { + brokerOuterExecutor.execute(new Runnable() { @Override - public void run0() { + public void run() { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader); try { diff --git a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java b/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java deleted file mode 100644 index 34aabc5772c..00000000000 --- a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.common; - -import java.io.File; -import org.apache.rocketmq.logging.org.slf4j.MDC; - -public abstract class AbstractBrokerRunnable implements Runnable { - protected final BrokerIdentity brokerIdentity; - - public AbstractBrokerRunnable(BrokerIdentity brokerIdentity) { - this.brokerIdentity = brokerIdentity; - } - - private static final String MDC_BROKER_CONTAINER_LOG_DIR = "brokerContainerLogDir"; - - /** - * real logic for running - */ - public abstract void run0(); - - @Override - public void run() { - try { - if (brokerIdentity.isInBrokerContainer()) { - MDC.put(MDC_BROKER_CONTAINER_LOG_DIR, File.separator + brokerIdentity.getCanonicalName()); - } - run0(); - } finally { - MDC.clear(); - } - } -} diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java index aa38fb6224a..e8debfe99b2 100644 --- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java +++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java @@ -22,7 +22,6 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.ConfigContext; import org.apache.rocketmq.broker.out.BrokerOuterAPI; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.MixAll; @@ -156,9 +155,9 @@ public boolean initialize() { this.updateNamesrvAddr(); LOG.info("Set user specified name server address: {}", this.brokerContainerConfig.getNamesrvAddr()); // also auto update namesrv if specify - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { BrokerContainer.this.updateNamesrvAddr(); } catch (Throwable e) { @@ -167,10 +166,10 @@ public void run0() { } }, 1000 * 10, this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), TimeUnit.MILLISECONDS); } else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) { - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { @@ -180,9 +179,9 @@ public void run0() { }, 1000 * 10, this.brokerContainerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS); } - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { BrokerContainer.this.brokerOuterAPI.refreshMetadata(); } catch (Exception e) { diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java index 41ce28214bd..102bd4710fb 100644 --- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java +++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java @@ -21,7 +21,6 @@ import org.apache.rocketmq.auth.config.AuthConfig; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RemotingServer; @@ -82,9 +81,9 @@ public void start() throws Exception { this.registerBrokerAll(true, false, true); } - scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { if (System.currentTimeMillis() < shouldStartTime) { BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime); @@ -104,9 +103,9 @@ public void run0() { if (this.brokerConfig.isEnableSlaveActingMaster()) { scheduleSendHeartbeat(); - scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { try { InnerBrokerController.this.syncBrokerMemberGroup(); } catch (Throwable e) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index cb5c41471a9..d440ccfb119 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; @@ -1775,23 +1774,23 @@ private void createTempFile() throws IOException { private void addScheduleTask() { - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { DefaultMessageStore.this.checkSelf(); } }, 1, 10, TimeUnit.MINUTES); - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { @@ -1810,9 +1809,9 @@ public void run0() { } }, 1, 1, TimeUnit.SECONDS); - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override - public void run0() { + public void run() { DefaultMessageStore.this.storeCheckpoint.flush(); } }, 1, 1, TimeUnit.SECONDS); diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index 2d325ee13a4..4d358b4cedb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.rocketmq.common.AbstractBrokerRunnable; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -339,9 +338,9 @@ public IndexFile getAndCreateLastIndexFile() { if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; - Thread flushThread = new Thread(new AbstractBrokerRunnable(defaultMessageStore.getBrokerConfig()) { + Thread flushThread = new Thread(new Runnable() { @Override - public void run0() { + public void run() { IndexService.this.flush(flushThisFile); } }, "FlushIndexFileThread");