Skip to content

Commit 7d7eb73

Browse files
authored
[ISSUE #10003] Add gRPC maxConcurrentCallsPerConnection Configuration to Proxy (#10004)
1 parent e5d3372 commit 7d7eb73

File tree

5 files changed

+41
-20
lines changed

5 files changed

+41
-20
lines changed

common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ public static void logThreadPoolStatus() {
142142
List<ThreadPoolStatusMonitor> monitors = threadPoolWrapper.getStatusPrinters();
143143
for (ThreadPoolStatusMonitor monitor : monitors) {
144144
double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor());
145-
String nameFormatted = String.format("%-40s", threadPoolWrapper.getName());
146-
String descFormatted = String.format("%-12s", monitor.describe());
147-
waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, value);
145+
waterMarkLogger.info("\t{}\t{}\t{}", threadPoolWrapper.getName(),
146+
monitor.describe(),
147+
value);
148+
148149
if (enablePrintJstack) {
149150
if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
150151
System.currentTimeMillis() - jstackTime > jstackPeriodTime) {

proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ public class ProxyConfig implements ConfigFile {
9595
private boolean enableGrpcEpoll = false;
9696
private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
9797
private int grpcThreadPoolQueueCapacity = 100000;
98+
99+
/**
100+
* Maximum number of concurrent gRPC calls allowed per client connection.
101+
* <p>
102+
* A single client issuing excessively high concurrent requests may skew the validation load balancing
103+
* and overload a single proxy instance (hotspot), potentially bringing it down. Limiting
104+
* {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this per-connection hotspot risk.
105+
* <p>
106+
* Note: Setting this limit too low may cause send/consume failures (e.g., backpressure or rejected calls).
107+
*/
108+
private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE;
98109
private String brokerConfigPath = ConfigurationManager.getProxyHome() + "/conf/broker.conf";
99110
/**
100111
* gRPC max message size
@@ -1581,4 +1592,12 @@ public int getReturnHandleGroupThreadPoolNums() {
15811592
public void setReturnHandleGroupThreadPoolNums(int returnHandleGroupThreadPoolNums) {
15821593
this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
15831594
}
1595+
1596+
public int getGrpcMaxConcurrentCallsPerConnection() {
1597+
return grpcMaxConcurrentCallsPerConnection;
1598+
}
1599+
1600+
public void setGrpcMaxConcurrentCallsPerConnection(int grpcMaxConcurrentCallsPerConnection) {
1601+
this.grpcMaxConcurrentCallsPerConnection = grpcMaxConcurrentCallsPerConnection;
1602+
}
15841603
}

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
2525
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
2626
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
2729
import org.apache.rocketmq.common.constant.LoggerName;
2830
import org.apache.rocketmq.logging.org.slf4j.Logger;
2931
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3032
import org.apache.rocketmq.proxy.config.ConfigurationManager;
33+
import org.apache.rocketmq.proxy.config.ProxyConfig;
3134
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
3235
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
3336
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
34-
35-
import java.util.concurrent.ThreadPoolExecutor;
36-
import java.util.concurrent.TimeUnit;
3737
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3838

3939
public class GrpcServerBuilder {
@@ -52,18 +52,20 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
5252
}
5353

5454
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
55+
ProxyConfig config = ConfigurationManager.getProxyConfig();
5556
this.tlsCertificateManager = tlsCertificateManager;
56-
serverBuilder = NettyServerBuilder.forPort(port);
57+
serverBuilder = NettyServerBuilder.forPort(port)
58+
.maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection());
5759

5860
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
5961

6062
// build server
61-
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
62-
int workerLoopNum = ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum();
63-
int maxInboundMessageSize = ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize();
64-
long idleTimeMills = ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills();
63+
int bossLoopNum = config.getGrpcBossLoopNum();
64+
int workerLoopNum = config.getGrpcWorkerLoopNum();
65+
int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize();
66+
long idleTimeMills = config.getGrpcClientIdleTimeMills();
6567

66-
if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) {
68+
if (config.isEnableGrpcEpoll()) {
6769
serverBuilder.bossEventLoopGroup(new EpollEventLoopGroup(bossLoopNum))
6870
.workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum))
6971
.channelType(EpollServerSocketChannel.class)

proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2121
import io.netty.channel.Channel;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
2227
import org.apache.rocketmq.auth.config.AuthConfig;
2328
import org.apache.rocketmq.common.constant.LoggerName;
2429
import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -59,12 +64,6 @@
5964
import org.apache.rocketmq.remoting.protocol.RequestCode;
6065
import org.apache.rocketmq.remoting.protocol.ResponseCode;
6166

62-
import java.util.concurrent.BlockingQueue;
63-
import java.util.concurrent.CompletableFuture;
64-
import java.util.concurrent.ScheduledExecutorService;
65-
import java.util.concurrent.ThreadPoolExecutor;
66-
import java.util.concurrent.TimeUnit;
67-
6867
public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
6968
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
7069

@@ -194,7 +193,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific
194193
this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
195194
new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
196195
);
197-
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
196+
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 100, 100, TimeUnit.MILLISECONDS);
198197

199198
this.registerRemotingServer(this.defaultRemotingServer);
200199
}

proxy/src/main/resources/rmq.proxy.logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<maxFileSize>128MB</maxFileSize>
5353
</triggeringPolicy>
5454
<encoder>
55-
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n</pattern>
55+
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n</pattern>
5656
<charset class="java.nio.charset.Charset">UTF-8</charset>
5757
</encoder>
5858
</appender>

0 commit comments

Comments
 (0)