Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ public class BrokerController {
private AuthenticationMetadataManager authenticationMetadataManager;
private AuthorizationMetadataManager authorizationMetadataManager;

private ConfigContext configContext;

public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
Expand All @@ -320,6 +322,14 @@ public BrokerController(
this(brokerConfig, null, null, messageStoreConfig, null);
}

public BrokerController(
final BrokerConfig brokerConfig,
final MessageStoreConfig messageStoreConfig,
final AuthConfig authConfig
) {
this(brokerConfig, null, null, messageStoreConfig, authConfig);
}

public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
Expand Down Expand Up @@ -2615,5 +2625,11 @@ public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public ConfigContext getConfigContext() {
return configContext;
}

public void setConfigContext(ConfigContext configContext) {
this.configContext = configContext;
}
}
114 changes: 70 additions & 44 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;

public class BrokerStartup {

public static Logger log;
public static final SystemConfigFileHelper CONFIG_FILE_HELPER = new SystemConfigFileHelper();

public static void main(String[] args) {
start(createBrokerController(args));
Expand Down Expand Up @@ -81,32 +79,55 @@ public static void shutdown(final BrokerController controller) {
}
}

public static BrokerController buildBrokerController(String[] args) throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
final AuthConfig authConfig = new AuthConfig();
nettyServerConfig.setListenPort(10911);
messageStoreConfig.setHaListenPort(0);

public static ConfigContext parseCmdLine(String[] args) throws Exception {
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine(
"mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
System.exit(-1);
}

Properties properties = null;
ConfigContext configContext = null;
String filePath;
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
CONFIG_FILE_HELPER.setFile(file);
BrokerPathConfigHelper.setBrokerConfigPath(file);
properties = CONFIG_FILE_HELPER.loadConfig();
}
filePath = commandLine.getOptionValue('c');
configContext = configFileToConfigContext(filePath);
}

if (commandLine.hasOption('p') && configContext != null) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, configContext.getBrokerConfig());
MixAll.printObjectProperties(console, configContext.getNettyServerConfig());
MixAll.printObjectProperties(console, configContext.getNettyClientConfig());
MixAll.printObjectProperties(console, configContext.getAuthConfig());
System.exit(0);
} else if (commandLine.hasOption('m') && configContext != null) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, configContext.getBrokerConfig(), true);
MixAll.printObjectProperties(console, configContext.getNettyServerConfig(), true);
MixAll.printObjectProperties(console, configContext.getNettyClientConfig(), true);
MixAll.printObjectProperties(console, configContext.getAuthConfig(), true);
System.exit(0);
}

assert configContext != null;
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), configContext.getBrokerConfig());

return configContext;
}

public static ConfigContext configFileToConfigContext(String filePath) throws Exception {
SystemConfigFileHelper systemConfigFileHelper = new SystemConfigFileHelper();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
AuthConfig authConfig = new AuthConfig();
Properties properties = new Properties();
if (StringUtils.isNotBlank(filePath)) {
systemConfigFileHelper.setFile(filePath);
BrokerPathConfigHelper.setBrokerConfigPath(filePath);
properties = systemConfigFileHelper.loadConfig();
}

if (properties != null) {
Expand All @@ -118,7 +139,30 @@ public static BrokerController buildBrokerController(String[] args) throws Excep
MixAll.properties2Object(properties, authConfig);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
return new ConfigContext.Builder()
.configFilePath(filePath)
.properties(properties)
.brokerConfig(brokerConfig)
.messageStoreConfig(messageStoreConfig)
.nettyServerConfig(nettyServerConfig)
.nettyClientConfig(nettyClientConfig)
.authConfig(authConfig)
.build();
}

public static BrokerController buildBrokerController(ConfigContext configContext) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

BrokerConfig brokerConfig = configContext.getBrokerConfig();
MessageStoreConfig messageStoreConfig = configContext.getMessageStoreConfig();
NettyClientConfig nettyClientConfig = configContext.getNettyClientConfig();
NettyServerConfig nettyServerConfig = configContext.getNettyServerConfig();
AuthConfig authConfig = configContext.getAuthConfig();
Properties properties = configContext.getProperties();

nettyServerConfig.setListenPort(10911);
configContext.getMessageStoreConfig().setHaListenPort(0);

if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment " +
"to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
Expand All @@ -135,16 +179,11 @@ public static BrokerController buildBrokerController(String[] args) throws Excep
}
} catch (Exception e) {
System.out.printf("The Name Server Address[%s] illegal, please set it as follows, " +
"\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
"\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
System.exit(-3);
}
}

if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}

// Set broker role according to ha config
if (!brokerConfig.isEnableControllerMode()) {
switch (messageStoreConfig.getBrokerRole()) {
Expand Down Expand Up @@ -186,22 +225,6 @@ public static BrokerController buildBrokerController(String[] args) throws Excep
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}

if (commandLine.hasOption('p')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}

log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
Expand All @@ -218,6 +241,8 @@ public static BrokerController buildBrokerController(String[] args) throws Excep
// Remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

controller.setConfigContext(configContext);

return controller;
}

Expand All @@ -244,7 +269,8 @@ public void run() {

public static BrokerController createBrokerController(String[] args) {
try {
BrokerController controller = buildBrokerController(args);
ConfigContext configContext = parseCmdLine(args);
BrokerController controller = buildBrokerController(configContext);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
Expand Down
126 changes: 126 additions & 0 deletions broker/src/main/java/org/apache/rocketmq/broker/ConfigContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;

import java.util.Properties;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;

public class ConfigContext {
private String configFilePath;
private Properties properties;

private BrokerConfig brokerConfig;
private NettyServerConfig nettyServerConfig;
private NettyClientConfig nettyClientConfig;
private MessageStoreConfig messageStoreConfig;
private AuthConfig authConfig;

private ConfigContext(Builder builder) {
this.configFilePath = builder.configFilePath;
this.properties = builder.properties;
this.brokerConfig = builder.brokerConfig;
this.nettyServerConfig = builder.nettyServerConfig;
this.nettyClientConfig = builder.nettyClientConfig;
this.messageStoreConfig = builder.messageStoreConfig;
this.authConfig = builder.authConfig;
}

public String getConfigFilePath() {
return configFilePath;
}

public Properties getProperties() {
return properties;
}

public BrokerConfig getBrokerConfig() {
return brokerConfig;
}

public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}

public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
}

public MessageStoreConfig getMessageStoreConfig() {
return messageStoreConfig;
}

public AuthConfig getAuthConfig() {
return authConfig;
}

public static class Builder {
private String configFilePath;
private Properties properties;

private BrokerConfig brokerConfig;
private NettyServerConfig nettyServerConfig;
private NettyClientConfig nettyClientConfig;
private MessageStoreConfig messageStoreConfig;
private AuthConfig authConfig;

public Builder() {
}

public Builder configFilePath(String configFilePath) {
this.configFilePath = configFilePath;
return this;
}

public Builder properties(Properties properties) {
this.properties = properties;
return this;
}

public Builder brokerConfig(BrokerConfig brokerConfig) {
this.brokerConfig = brokerConfig;
return this;
}

public Builder nettyServerConfig(NettyServerConfig nettyServerConfig) {
this.nettyServerConfig = nettyServerConfig;
return this;
}

public Builder nettyClientConfig(NettyClientConfig nettyClientConfig) {
this.nettyClientConfig = nettyClientConfig;
return this;
}

public Builder messageStoreConfig(MessageStoreConfig messageStoreConfig) {
this.messageStoreConfig = messageStoreConfig;
return this;
}

public Builder authConfig(AuthConfig authConfig) {
this.authConfig = authConfig;
return this;
}

public ConfigContext build() {
return new ConfigContext(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.rocketmq.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -140,4 +141,36 @@ public void doAfterResponse(String remoteAddr, RemotingCommand request, Remoting
Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook));
Assert.assertSame(mockRemotingServer, mockRemotingServer1);
}

@Test
public void testConfigContextMethods() throws Exception {
// Test ConfigContext setter and getter methods
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig);

// Initially, ConfigContext should be null
assertThat(brokerController.getConfigContext()).isNull();

// Create a test ConfigContext
ConfigContext configContext = new ConfigContext.Builder()
.brokerConfig(brokerConfig)
.messageStoreConfig(messageStoreConfig)
.nettyServerConfig(nettyServerConfig)
.nettyClientConfig(new NettyClientConfig())
.authConfig(new AuthConfig())
.build();

// Set the ConfigContext
brokerController.setConfigContext(configContext);

// Verify it was set correctly
assertThat(brokerController.getConfigContext()).isNotNull();
assertThat(brokerController.getConfigContext()).isSameAs(configContext);
assertThat(brokerController.getConfigContext().getBrokerConfig()).isSameAs(brokerConfig);
assertThat(brokerController.getConfigContext().getMessageStoreConfig()).isSameAs(messageStoreConfig);
assertThat(brokerController.getConfigContext().getNettyServerConfig()).isSameAs(nettyServerConfig);

// Test setting null ConfigContext
brokerController.setConfigContext(null);
assertThat(brokerController.getConfigContext()).isNull();
}
}
Loading
Loading