Skip to content

Commit 757cc72

Browse files
committed
fix
Change-Id: I84ea8161077bc3b1ea54524640697ef793e7d093
1 parent ee10411 commit 757cc72

4 files changed

Lines changed: 35 additions & 13 deletions

File tree

common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ public static void logThreadPoolStatus() {
105105
List<ThreadPoolStatusMonitor> monitors = threadPoolWrapper.getStatusPrinters();
106106
for (ThreadPoolStatusMonitor monitor : monitors) {
107107
double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor());
108-
String nameFormatted = String.format("%-40s", threadPoolWrapper.getName());
109-
String descFormatted = String.format("%-12s", monitor.describe());
110-
waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, value);
108+
waterMarkLogger.info("\t{}\t{}\t{}", threadPoolWrapper.getName(),
109+
monitor.describe(),
110+
value);
111+
111112
if (enablePrintJstack) {
112113
if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
113114
System.currentTimeMillis() - jstackTime > jstackPeriodTime) {

proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ public class ProxyConfig implements ConfigFile {
9494
private boolean enableGrpcEpoll = false;
9595
private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
9696
private int grpcThreadPoolQueueCapacity = 100000;
97+
98+
/**
99+
* Maximum number of concurrent gRPC calls allowed per client connection.
100+
* <p>
101+
* A single client issuing excessively high concurrent requests may skew the validation load balancing
102+
* and overload a single proxy instance (hotspot), potentially bringing it down. Limiting
103+
* {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this per-connection hotspot risk.
104+
* <p>
105+
* Note: Setting this limit too low may cause send/consume failures (e.g., backpressure or rejected calls).
106+
*/
107+
private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE;
97108
private String brokerConfigPath = ConfigurationManager.getProxyHome() + "/conf/broker.conf";
98109
/**
99110
* gRPC max message size
@@ -1572,4 +1583,12 @@ public int getReturnHandleGroupThreadPoolNums() {
15721583
public void setReturnHandleGroupThreadPoolNums(int returnHandleGroupThreadPoolNums) {
15731584
this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
15741585
}
1586+
1587+
public int getGrpcMaxConcurrentCallsPerConnection() {
1588+
return grpcMaxConcurrentCallsPerConnection;
1589+
}
1590+
1591+
public void setGrpcMaxConcurrentCallsPerConnection(int grpcMaxConcurrentCallsPerConnection) {
1592+
this.grpcMaxConcurrentCallsPerConnection = grpcMaxConcurrentCallsPerConnection;
1593+
}
15751594
}

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
2525
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
2626
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
2729
import org.apache.rocketmq.common.constant.LoggerName;
2830
import org.apache.rocketmq.logging.org.slf4j.Logger;
2931
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3032
import org.apache.rocketmq.proxy.config.ConfigurationManager;
33+
import org.apache.rocketmq.proxy.config.ProxyConfig;
3134
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
3235
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
3336
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
34-
35-
import java.util.concurrent.ThreadPoolExecutor;
36-
import java.util.concurrent.TimeUnit;
3737
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
3838

3939
public class GrpcServerBuilder {
@@ -52,18 +52,20 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
5252
}
5353

5454
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
55+
ProxyConfig config = ConfigurationManager.getProxyConfig();
5556
this.tlsCertificateManager = tlsCertificateManager;
56-
serverBuilder = NettyServerBuilder.forPort(port);
57+
serverBuilder = NettyServerBuilder.forPort(port)
58+
.maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection());
5759

5860
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
5961

6062
// build server
61-
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
62-
int workerLoopNum = ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum();
63-
int maxInboundMessageSize = ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize();
64-
long idleTimeMills = ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills();
63+
int bossLoopNum = config.getGrpcBossLoopNum();
64+
int workerLoopNum = config.getGrpcWorkerLoopNum();
65+
int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize();
66+
long idleTimeMills = config.getGrpcClientIdleTimeMills();
6567

66-
if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) {
68+
if (config.isEnableGrpcEpoll()) {
6769
serverBuilder.bossEventLoopGroup(new EpollEventLoopGroup(bossLoopNum))
6870
.workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum))
6971
.channelType(EpollServerSocketChannel.class)

proxy/src/main/resources/rmq.proxy.logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<maxFileSize>128MB</maxFileSize>
5353
</triggeringPolicy>
5454
<encoder>
55-
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n</pattern>
55+
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n</pattern>
5656
<charset class="java.nio.charset.Charset">UTF-8</charset>
5757
</encoder>
5858
</appender>

0 commit comments

Comments
 (0)