Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.HookUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -1814,9 +1813,9 @@ public void start() throws Exception {
this.registerBrokerAll(true, false, true);
}

scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
Expand All @@ -1836,9 +1835,9 @@ public void run0() {
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();

scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
Expand Down Expand Up @@ -1869,9 +1868,9 @@ public void run() {
}

protected void scheduleSendHeartbeat() {
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
if (isIsolated) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand All @@ -48,9 +47,9 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;

scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
notifyConsumerChange();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
Expand Down Expand Up @@ -80,9 +79,9 @@ public static RequestTask castRunnable(final Runnable runnable) {
}

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerController.getBrokerConfig()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -356,18 +355,14 @@ public void sendHeartbeatViaDataVersion(
requestHeader.setClusterName(clusterName);

for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {

@Override
public void run0() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(dataVersion.encode());
brokerOuterExecutor.execute(() -> {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(dataVersion.encode());

try {
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis);
} catch (Exception e) {
LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e);
}
try {
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMillis);
} catch (Exception e) {
LOGGER.error("sendHeartbeat Exception " + namesrvAddr, e);
}
});
}
Expand All @@ -389,9 +384,9 @@ public void sendHeartbeat(final String clusterName,

if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run0() {
public void run() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);

try {
Expand Down Expand Up @@ -532,9 +527,9 @@ public List<RegisterBrokerResult> registerBrokerAll(
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run0() {
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
Expand Down Expand Up @@ -719,9 +714,9 @@ public List<Boolean> needRegister(
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run0() {
public void run() {
try {
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
Expand Down Expand Up @@ -1501,9 +1496,9 @@ public void sendHeartbeatToController(final String controllerAddress,
requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
requestHeader.setElectionPriority(electionPriority);
requestHeader.setBrokerId(brokerId);
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run0() {
public void run() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);

try {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.ConfigContext;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -156,9 +155,9 @@ public boolean initialize() {
this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}", this.brokerContainerConfig.getNamesrvAddr());
// also auto update namesrv if specify
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
BrokerContainer.this.updateNamesrvAddr();
} catch (Throwable e) {
Expand All @@ -167,10 +166,10 @@ public void run0() {
}
}, 1000 * 10, this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
} else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run0() {
public void run() {
try {
BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
Expand All @@ -180,9 +179,9 @@ public void run0() {
}, 1000 * 10, this.brokerContainerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
}

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
BrokerContainer.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RemotingServer;
Expand Down Expand Up @@ -82,9 +81,9 @@ public void start() throws Exception {
this.registerBrokerAll(true, false, true);
}

scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
Expand All @@ -104,9 +103,9 @@ public void run0() {
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();

scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
try {
InnerBrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
Expand Down Expand Up @@ -1775,23 +1774,23 @@ private void createTempFile() throws IOException {

private void addScheduleTask() {

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
Expand All @@ -1810,9 +1809,9 @@ public void run0() {
}
}, 1, 1, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run0() {
public void run() {
DefaultMessageStore.this.storeCheckpoint.flush();
}
}, 1, 1, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -339,9 +338,9 @@ public IndexFile getAndCreateLastIndexFile() {
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;

Thread flushThread = new Thread(new AbstractBrokerRunnable(defaultMessageStore.getBrokerConfig()) {
Thread flushThread = new Thread(new Runnable() {
@Override
public void run0() {
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
Expand Down
Loading