Skip to content

Commit b6cc188

Browse files
ymwneumaowei-ymw
andauthored
[ISSUE #9992] Fix remoting server netty server codec thread reuse problem (#9993)
Co-authored-by: maowei.ymw <maowei.ymw@alibaba-inc.com>
1 parent 4eead13 commit b6cc188

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,9 @@ public void run(Timeout timeout) {
272272
*/
273273
protected ChannelPipeline configChannel(SocketChannel ch) {
274274
return ch.pipeline()
275-
.addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null,
275+
.addLast(getDefaultEventExecutorGroup(),
276276
HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
277-
.addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null,
277+
.addLast(getDefaultEventExecutorGroup(),
278278
encoder,
279279
new NettyDecoder(),
280280
distributionHandler,
@@ -430,7 +430,7 @@ private void printRemotingCodeDistribution() {
430430
}
431431

432432
public DefaultEventExecutorGroup getDefaultEventExecutorGroup() {
433-
return defaultEventExecutorGroup;
433+
return nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null;
434434
}
435435

436436
public NettyEncoder getEncoder() {
@@ -462,11 +462,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> o
462462
return;
463463
}
464464
if (detectionResult.state() == ProtocolDetectionState.DETECTED) {
465-
ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
466-
.addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
467-
.addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
465+
ctx.pipeline().addAfter(getDefaultEventExecutorGroup(), ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
466+
.addAfter(getDefaultEventExecutorGroup(), HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
467+
.addAfter(getDefaultEventExecutorGroup(), HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
468468
} else {
469-
ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
469+
ctx.pipeline().addAfter(getDefaultEventExecutorGroup(), ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
470470
}
471471

472472
try {
@@ -509,8 +509,8 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
509509
case ENFORCING:
510510
if (null != sslContext) {
511511
ctx.pipeline()
512-
.addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
513-
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
512+
.addAfter(getDefaultEventExecutorGroup(), TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
513+
.addAfter(getDefaultEventExecutorGroup(), TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
514514
log.info("Handlers prepended to channel pipeline to establish SSL connection");
515515
} else {
516516
ctx.close();

0 commit comments

Comments
 (0)