diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 917fe57aa87..8e65d639f78 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -88,7 +88,10 @@ private MessageQueue selectOneMessageQueue(List messageQueueList, boolean filterResult = true; for (QueueFilter f: filter) { Preconditions.checkNotNull(f); - filterResult &= f.filter(mq); + if (!f.filter(mq)) { + filterResult = false; + break; + } } if (filterResult) { return mq; @@ -106,28 +109,44 @@ public void resetIndex() { this.sendWhichQueue.reset(); } - public MessageQueue selectOneMessageQueue(final String lastBrokerName) { - if (lastBrokerName == null) { - return selectOneMessageQueue(); - } else { - for (int i = 0; i < this.messageQueueList.size(); i++) { - MessageQueue mq = selectOneMessageQueue(); - if (!mq.getBrokerName().equals(lastBrokerName)) { - return mq; - } - } - return selectOneMessageQueue(); + public MessageQueue selectOneMessageQueue() { + if (messageQueueList == null || messageQueueList.isEmpty()) { + return null; // Avoid IndexOutOfBoundsException } + + int index = Math.abs(sendWhichQueue.incrementAndGet() % messageQueueList.size()); + return messageQueueList.get(index); } + + /** + * Selects a message queue while avoiding the given broker. + * Falls back to a random queue if all queues belong to the same broker. + */ + + public MessageQueue selectOneMessageQueue(final String lastBrokerName){ + if (messageQueueList == null || messageQueueList.isEmpty()) { + return null; + } - public MessageQueue selectOneMessageQueue() { - int index = this.sendWhichQueue.incrementAndGet(); - int pos = index % this.messageQueueList.size(); + // Try selecting a queue that is not from the lastBrokerName + for (int i = 0; i < messageQueueList.size(); i++) { + int index = Math.floorMod(sendWhichQueue.incrementAndGet(), messageQueueList.size()); + MessageQueue mq = messageQueueList.get(index); + if (!mq.getBrokerName().equals(lastBrokerName)) { + return mq; + } + } - return this.messageQueueList.get(pos); + // Fallback: If all queues belong to the same broker, return any queue + int index = Math.floorMod(sendWhichQueue.incrementAndGet(), messageQueueList.size()); + return messageQueueList.get(index); } public int getWriteQueueNumsByBroker(final String brokerName) { + if (topicRouteData == null || topicRouteData.getQueueDatas() == null) { + return -1; + } + for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); if (queueData.getBrokerName().equals(brokerName)) {