Skip to content

Commit 82c62d3

Browse files
committed
optimize get channel remoteAddr
1 parent e4b731c commit 82c62d3

6 files changed

Lines changed: 18 additions & 16 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ public ProducerTableInfo getProducerTable() {
8585
if (map.containsKey(group)) {
8686
map.get(group).add(new ProducerInfo(
8787
clientChannelInfo.getClientId(),
88-
clientChannelInfo.getChannel().remoteAddress().toString(),
88+
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()),
8989
clientChannelInfo.getLanguage(),
9090
clientChannelInfo.getVersion(),
9191
clientChannelInfo.getLastUpdateTimestamp()
9292
));
9393
} else {
9494
map.put(group, new ArrayList<>(Collections.singleton(new ProducerInfo(
9595
clientChannelInfo.getClientId(),
96-
clientChannelInfo.getChannel().remoteAddress().toString(),
96+
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()),
9797
clientChannelInfo.getLanguage(),
9898
clientChannelInfo.getVersion(),
9999
clientChannelInfo.getLastUpdateTimestamp()

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.alibaba.fastjson.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22-
import java.util.BitSet;
2322
import java.nio.charset.StandardCharsets;
23+
import java.util.BitSet;
2424
import org.apache.rocketmq.broker.BrokerController;
2525
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2626
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -407,7 +407,7 @@ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOf
407407
if (nextOffset > -1) {
408408
if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic, consumeGroup, qId)) {
409409
this.brokerController.getConsumerOffsetManager().commitOffset(
410-
channel.remoteAddress().toString(), consumeGroup, topic, qId, nextOffset);
410+
RemotingHelper.parseChannelRemoteAddr(channel), consumeGroup, topic, qId, nextOffset);
411411
}
412412
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, consumeGroup, qId, invisibleTime)) {
413413
this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, qId, consumeGroup);
@@ -451,7 +451,7 @@ protected void ackOrderlyNew(String topic, String consumeGroup, int qId, long ac
451451
long nextOffset = consumerOrderInfoManager.commitAndNext(topic, consumeGroup, qId, ackOffset, popTime);
452452
if (brokerController.getBrokerConfig().isPopConsumerKVServiceLog()) {
453453
POP_LOGGER.info("PopConsumerService ack orderly, time={}, topicId={}, groupId={}, queueId={}, " +
454-
"offset={}, next={}", popTime, topic, consumeGroup, qId, ackOffset, nextOffset);
454+
"offset={}, next={}", popTime, topic, consumeGroup, qId, ackOffset, nextOffset);
455455
}
456456

457457
if (nextOffset > -1L) {

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
756756
// because offset in PopBuffer is not committed.
757757
POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
758758
lockKey, atomicOffset.get(), result.getNextBeginOffset());
759-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
759+
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), topic,
760760
queueId, result.getNextBeginOffset());
761761
atomicOffset.set(result.getNextBeginOffset());
762762
return this.brokerController.getMessageStore().getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, atomicOffset.get(),
@@ -793,7 +793,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
793793
requestHeader.getConsumerGroup(),
794794
queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
795795
orderCountInfo);
796-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
796+
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
797797
requestHeader.getConsumerGroup(), topic, queueId, finalOffset);
798798
} else {
799799
if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) {
@@ -809,7 +809,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
809809
|| GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus()))
810810
&& result.getNextBeginOffset() > -1) {
811811
if (isOrder) {
812-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
812+
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), topic,
813813
queueId, result.getNextBeginOffset());
814814
} else {
815815
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, finalOffset,

remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ public static CompletableFuture<Void> convertChannelFutureToCompletableFuture(Ch
361361
if (future.isSuccess()) {
362362
completableFuture.complete(null);
363363
} else {
364-
completableFuture.completeExceptionally(new RemotingConnectException(channelFuture.channel().remoteAddress().toString(), future.cause()));
364+
completableFuture.completeExceptionally(new RemotingConnectException(RemotingHelper.parseChannelRemoteAddr(channelFuture.channel()), future.cause()));
365365
}
366366
});
367367
return completableFuture;

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,9 @@ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingComma
502502
return invokeImpl(channel, request, timeoutMillis).thenApply(ResponseFuture::getResponseCommand)
503503
.get(timeoutMillis, TimeUnit.MILLISECONDS);
504504
} catch (ExecutionException e) {
505-
throw new RemotingSendRequestException(channel.remoteAddress().toString(), e.getCause());
505+
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e.getCause());
506506
} catch (TimeoutException e) {
507-
throw new RemotingTimeoutException(channel.remoteAddress().toString(), timeoutMillis, e.getCause());
507+
throw new RemotingTimeoutException(RemotingHelper.parseChannelRemoteAddr(channel), timeoutMillis, e.getCause());
508508
}
509509
}
510510

remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import org.apache.rocketmq.remoting.InvokeCallback;
24+
import org.apache.rocketmq.remoting.common.RemotingHelper;
2425
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
2526
import org.apache.rocketmq.remoting.exception.RemotingException;
2627
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -45,12 +46,13 @@ public class ResponseFuture {
4546
private volatile boolean interrupted = false;
4647

4748
public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
48-
SemaphoreReleaseOnlyOnce once) {
49+
SemaphoreReleaseOnlyOnce once) {
4950
this(channel, opaque, null, timeoutMillis, invokeCallback, once);
5051
}
5152

52-
public ResponseFuture(Channel channel, int opaque, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback,
53-
SemaphoreReleaseOnlyOnce once) {
53+
public ResponseFuture(Channel channel, int opaque, RemotingCommand request, long timeoutMillis,
54+
InvokeCallback invokeCallback,
55+
SemaphoreReleaseOnlyOnce once) {
5456
this.channel = channel;
5557
this.opaque = opaque;
5658
this.request = request;
@@ -67,9 +69,9 @@ public void executeInvokeCallback() {
6769
invokeCallback.operationSucceed(response);
6870
} else {
6971
if (!isSendRequestOK()) {
70-
invokeCallback.operationFail(new RemotingSendRequestException(channel.remoteAddress().toString(), getCause()));
72+
invokeCallback.operationFail(new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), getCause()));
7173
} else if (isTimeout()) {
72-
invokeCallback.operationFail(new RemotingTimeoutException(channel.remoteAddress().toString(), getTimeoutMillis(), getCause()));
74+
invokeCallback.operationFail(new RemotingTimeoutException(RemotingHelper.parseChannelRemoteAddr(channel), getTimeoutMillis(), getCause()));
7375
} else {
7476
invokeCallback.operationFail(new RemotingException(getRequestCommand().toString(), getCause()));
7577
}

0 commit comments

Comments
 (0)