Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList,
boolean filterResult = true;
for (QueueFilter f: filter) {
Preconditions.checkNotNull(f);
filterResult &= f.filter(mq);
if (!f.filter(mq)) {
filterResult = false;
break;
}
}
if (filterResult) {
return mq;
Expand All @@ -106,28 +109,44 @@ public void resetIndex() {
this.sendWhichQueue.reset();
}

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
MessageQueue mq = selectOneMessageQueue();
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
public MessageQueue selectOneMessageQueue() {
if (messageQueueList == null || messageQueueList.isEmpty()) {
return null; // Avoid IndexOutOfBoundsException
}

int index = Math.abs(sendWhichQueue.incrementAndGet() % messageQueueList.size());
return messageQueueList.get(index);
}

/**
* Selects a message queue while avoiding the given broker.
* Falls back to a random queue if all queues belong to the same broker.
*/

public MessageQueue selectOneMessageQueue(final String lastBrokerName){
if (messageQueueList == null || messageQueueList.isEmpty()) {
return null;
}

public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = index % this.messageQueueList.size();
// Try selecting a queue that is not from the lastBrokerName
for (int i = 0; i < messageQueueList.size(); i++) {
int index = Math.floorMod(sendWhichQueue.incrementAndGet(), messageQueueList.size());
MessageQueue mq = messageQueueList.get(index);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}

return this.messageQueueList.get(pos);
// Fallback: If all queues belong to the same broker, return any queue
int index = Math.floorMod(sendWhichQueue.incrementAndGet(), messageQueueList.size());
return messageQueueList.get(index);
}

public int getWriteQueueNumsByBroker(final String brokerName) {
if (topicRouteData == null || topicRouteData.getQueueDatas() == null) {
return -1;
}

for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
if (queueData.getBrokerName().equals(brokerName)) {
Expand Down
Loading