Skip to content

Commit 7fc5452

Browse files
authored
[ISSUE #9970] Refactor the MessageQueueSelector to support more flexible queue selection strategy (#9971)
1 parent 9f23894 commit 7fc5452

13 files changed

Lines changed: 1168 additions & 184 deletions

File tree

broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
package org.apache.rocketmq.broker.subscription;
1919

20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.Paths;
24+
import java.util.Comparator;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.UUID;
28+
import java.util.stream.Stream;
2029
import org.apache.rocketmq.broker.BrokerController;
2130
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
2231
import org.apache.rocketmq.common.BrokerConfig;
@@ -34,15 +43,6 @@
3443
import org.mockito.Mockito;
3544
import org.mockito.junit.MockitoJUnitRunner;
3645

37-
import java.io.IOException;
38-
import java.nio.file.Files;
39-
import java.nio.file.Path;
40-
import java.nio.file.Paths;
41-
import java.util.Comparator;
42-
import java.util.HashMap;
43-
import java.util.Map;
44-
import java.util.UUID;
45-
4646
import static org.mockito.Mockito.when;
4747

4848
@RunWith(MockitoJUnitRunner.class)
@@ -78,24 +78,28 @@ public void destroy() {
7878
if (notToBeExecuted()) {
7979
return;
8080
}
81-
Path pathToBeDeleted = Paths.get(basePath);
82-
83-
try {
84-
Files.walk(pathToBeDeleted)
85-
.sorted(Comparator.reverseOrder())
86-
.forEach(path -> {
87-
try {
88-
Files.delete(path);
89-
} catch (IOException e) {
90-
// ignore
91-
}
92-
});
93-
} catch (IOException e) {
94-
// ignore
95-
}
81+
9682
if (rocksDBSubscriptionGroupManager != null) {
9783
rocksDBSubscriptionGroupManager.stop();
9884
}
85+
86+
Path root = Paths.get(basePath);
87+
if (Files.notExists(root)) {
88+
return;
89+
}
90+
91+
try (Stream<Path> walk = Files.walk(root)) {
92+
walk.sorted(Comparator.reverseOrder())
93+
.forEach(p -> {
94+
try {
95+
Files.deleteIfExists(p);
96+
} catch (IOException e) {
97+
// ignore
98+
}
99+
});
100+
} catch (IOException e) {
101+
// ignore
102+
}
99103
}
100104

101105

client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
2222
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
2323
import org.apache.rocketmq.common.message.MessageQueue;
24+
import org.apache.rocketmq.common.utils.StartAndShutdown;
2425

25-
public class MQFaultStrategy {
26+
public class MQFaultStrategy implements StartAndShutdown {
2627
private LatencyFaultTolerance<String> latencyFaultTolerance;
2728
private volatile boolean sendLatencyFaultEnable;
2829
private volatile boolean startDetectorEnable;
@@ -130,6 +131,11 @@ public void startDetector() {
130131
this.latencyFaultTolerance.startDetector();
131132
}
132133

134+
@Override
135+
public void start() throws Exception {
136+
this.startDetector();
137+
}
138+
133139
public void shutdown() {
134140
this.latencyFaultTolerance.shutdown();
135141
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717
package org.apache.rocketmq.proxy.service.route;
1818

1919
import com.google.common.base.MoreObjects;
20-
import java.util.Objects;
2120
import org.apache.rocketmq.common.message.MessageQueue;
2221

23-
public class AddressableMessageQueue implements Comparable<AddressableMessageQueue> {
24-
25-
private final MessageQueue messageQueue;
22+
public class AddressableMessageQueue extends MessageQueue {
2623
private final String brokerAddr;
2724

2825
public AddressableMessageQueue(MessageQueue messageQueue, String brokerAddr) {
29-
this.messageQueue = messageQueue;
26+
super(messageQueue);
3027
this.brokerAddr = brokerAddr;
3128
}
3229

30+
public String getBrokerAddr() {
31+
return brokerAddr;
32+
}
33+
34+
public MessageQueue getMessageQueue() {
35+
return new MessageQueue(getTopic(), getBrokerName(), getQueueId());
36+
}
37+
3338
@Override
34-
public int compareTo(AddressableMessageQueue o) {
35-
return messageQueue.compareTo(o.messageQueue);
39+
public int hashCode() {
40+
return super.hashCode();
3641
}
3742

3843
@Override
@@ -43,39 +48,13 @@ public boolean equals(Object o) {
4348
if (!(o instanceof AddressableMessageQueue)) {
4449
return false;
4550
}
46-
AddressableMessageQueue queue = (AddressableMessageQueue) o;
47-
return Objects.equals(messageQueue, queue.messageQueue);
48-
}
49-
50-
@Override
51-
public int hashCode() {
52-
return messageQueue == null ? 1 : messageQueue.hashCode();
53-
}
54-
55-
public int getQueueId() {
56-
return this.messageQueue.getQueueId();
57-
}
58-
59-
public String getBrokerName() {
60-
return this.messageQueue.getBrokerName();
61-
}
62-
63-
public String getTopic() {
64-
return messageQueue.getTopic();
65-
}
66-
67-
public MessageQueue getMessageQueue() {
68-
return messageQueue;
69-
}
70-
71-
public String getBrokerAddr() {
72-
return brokerAddr;
51+
return super.equals(o);
7352
}
7453

7554
@Override
7655
public String toString() {
7756
return MoreObjects.toStringHelper(this)
78-
.add("messageQueue", messageQueue)
57+
.add("messageQueue", super.toString())
7958
.add("brokerAddr", brokerAddr)
8059
.toString();
8160
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.proxy.service.route;
19+
20+
public class DefaultMessageQueuePriorityProvider implements MessageQueuePriorityProvider<AddressableMessageQueue> {
21+
@Override
22+
public int priorityOf(AddressableMessageQueue queue) {
23+
return 0;
24+
}
25+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.proxy.service.route;
18+
19+
import java.util.List;
20+
import java.util.Objects;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import org.apache.commons.lang3.tuple.Pair;
23+
import org.apache.rocketmq.common.message.MessageQueue;
24+
25+
@FunctionalInterface
26+
public interface MessageQueuePenalizer<Q extends MessageQueue> {
27+
28+
/**
29+
* Returns the penalty value for the given MessageQueue; lower is better.
30+
*/
31+
int penaltyOf(Q messageQueue);
32+
33+
/**
34+
* Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up).
35+
*/
36+
static <Q extends MessageQueue> int evaluatePenalty(Q messageQueue, List<MessageQueuePenalizer<Q>> penalizers) {
37+
Objects.requireNonNull(messageQueue, "messageQueue");
38+
if (penalizers == null || penalizers.isEmpty()) {
39+
return 0;
40+
}
41+
int sum = 0;
42+
for (MessageQueuePenalizer<Q> p : penalizers) {
43+
sum += p.penaltyOf(messageQueue);
44+
}
45+
return sum;
46+
}
47+
48+
/**
49+
* Selects the queue with the lowest evaluated penalty from the given queue list.
50+
*
51+
* <p>The method iterates through all queues exactly once, but starts from a rotating index
52+
* derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 .</p>
53+
*
54+
* <p>For each queue, it computes a penalty via {@link #evaluatePenalty} using
55+
* the provided {@code penalizers}. The queue with the smallest penalty is selected.</p>
56+
*
57+
* <p>Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately,
58+
* since no better result than 0 is expected.</p>
59+
*
60+
* @param queues candidate queues to select from
61+
* @param penalizers penalty evaluators applied to each queue
62+
* @param startIndex atomic counter used to determine the rotating start position (round-robin)
63+
* @param <Q> queue type
64+
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty
65+
*/
66+
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenalty(List<Q> queues,
67+
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
68+
if (queues == null || queues.isEmpty()) {
69+
return null;
70+
}
71+
Q bestQueue = null;
72+
int bestPenalty = Integer.MAX_VALUE;
73+
74+
for (int i = 0; i < queues.size(); i++) {
75+
int index = Math.floorMod(startIndex.getAndIncrement(), queues.size());
76+
Q messageQueue = queues.get(index);
77+
int penalty = evaluatePenalty(messageQueue, penalizers);
78+
79+
// Short-circuit: cannot do better than 0
80+
if (penalty <= 0) {
81+
return Pair.of(messageQueue, penalty);
82+
}
83+
84+
if (penalty < bestPenalty) {
85+
bestPenalty = penalty;
86+
bestQueue = messageQueue;
87+
}
88+
}
89+
return Pair.of(bestQueue, bestPenalty);
90+
}
91+
92+
/**
93+
* Selects a queue with the lowest computed penalty from multiple priority groups.
94+
*
95+
* <p>The input {@code queuesWithPriority} is a list of queue groups ordered by priority.
96+
* For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue
97+
* within that group and obtain its penalty.</p>
98+
*
99+
* <p>Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0},
100+
* that result is returned immediately.</p>
101+
*
102+
* <p>Otherwise, it returns the queue with the smallest positive penalty among all groups.
103+
* If multiple groups produce the same minimum penalty, the first encountered one wins.</p>
104+
*
105+
* @param queuesWithPriority priority-ordered groups of queues; each inner list represents one priority level
106+
* @param penalizers penalty calculators used by {@code selectLeastPenalty} to score queues
107+
* @param startIndex round-robin start index forwarded to {@code selectLeastPenalty} to reduce contention/hotspots
108+
* @param <Q> queue type
109+
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty
110+
*/
111+
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenaltyWithPriority(List<List<Q>> queuesWithPriority,
112+
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
113+
if (queuesWithPriority == null || queuesWithPriority.isEmpty()) {
114+
return null;
115+
}
116+
if (queuesWithPriority.size() == 1) {
117+
return selectLeastPenalty(queuesWithPriority.get(0), penalizers, startIndex);
118+
}
119+
Q bestQueue = null;
120+
int bestPenalty = Integer.MAX_VALUE;
121+
for (List<Q> queues : queuesWithPriority) {
122+
Pair<Q, Integer> queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex);
123+
int penalty = queueAndPenalty.getRight();
124+
if (queueAndPenalty.getRight() <= 0) {
125+
return queueAndPenalty;
126+
}
127+
if (penalty < bestPenalty) {
128+
bestPenalty = penalty;
129+
bestQueue = queueAndPenalty.getLeft();
130+
}
131+
}
132+
return Pair.of(bestQueue, bestPenalty);
133+
}
134+
}

0 commit comments

Comments
 (0)