Skip to content

Commit b12664b

Browse files
committed
Merge branch 'develop' into dev-tofastjson2
# Conflicts: # broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java # broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java # broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java # broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java # broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
2 parents 7980863 + 63d20eb commit b12664b

87 files changed

Lines changed: 3086 additions & 600 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

BUILDING

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Build Instructions for Apache RocketMQ
44

55
(1) Prerequisites
66

7-
JDK 1.7+ is required in order to compile and run RocketMQ.
7+
JDK 1.8+ is required in order to compile and run RocketMQ.
88

99
RocketMQ utilizes Maven as a distribution management and packaging tool. Version 3.0.3 or later is required.
1010
Maven installation and configuration instructions can be found here:

auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,27 @@
3535
import org.apache.rocketmq.auth.config.AuthConfig;
3636
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
3737
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
38-
import org.rocksdb.RocksIterator;
38+
import org.rocksdb.RocksDB;
3939

4040
public class LocalAuthenticationMetadataProvider implements AuthenticationMetadataProvider {
4141

42+
private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
43+
StandardCharsets.UTF_8);
44+
4245
private ConfigRocksDBStorage storage;
4346

4447
private LoadingCache<String, User> userCache;
4548

49+
protected ThreadPoolExecutor cacheRefreshExecutor;
50+
4651
@Override
4752
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
48-
this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "users");
53+
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false);
4954
if (!this.storage.start()) {
5055
throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
5156
}
5257

53-
ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
58+
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
5459
1,
5560
1,
5661
1000 * 60,
@@ -72,7 +77,7 @@ public CompletableFuture<Void> createUser(User user) {
7277
try {
7378
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
7479
byte[] valueBytes = JSON.toJSONBytes(user);
75-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
80+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
7681
this.storage.flushWAL();
7782
this.userCache.invalidate(user.getUsername());
7883
} catch (Exception e) {
@@ -84,7 +89,7 @@ public CompletableFuture<Void> createUser(User user) {
8489
@Override
8590
public CompletableFuture<Void> deleteUser(String username) {
8691
try {
87-
this.storage.delete(username.getBytes(StandardCharsets.UTF_8));
92+
this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, username.getBytes(StandardCharsets.UTF_8));
8893
this.storage.flushWAL();
8994
this.userCache.invalidate(username);
9095
} catch (Exception e) {
@@ -98,7 +103,7 @@ public CompletableFuture<Void> updateUser(User user) {
98103
try {
99104
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
100105
byte[] valueBytes = JSON.toJSONBytes(user);
101-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
106+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
102107
this.storage.flushWAL();
103108
this.userCache.invalidate(user.getUsername());
104109
} catch (Exception e) {
@@ -119,27 +124,31 @@ public CompletableFuture<User> getUser(String username) {
119124
@Override
120125
public CompletableFuture<List<User>> listUser(String filter) {
121126
List<User> result = new ArrayList<>();
122-
try (RocksIterator iterator = this.storage.iterator()) {
123-
iterator.seekToFirst();
124-
while (iterator.isValid()) {
125-
String username = new String(iterator.key(), StandardCharsets.UTF_8);
127+
CompletableFuture<List<User>> future = new CompletableFuture<>();
128+
try {
129+
this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
130+
String username = new String(key, StandardCharsets.UTF_8);
126131
if (StringUtils.isNotBlank(filter) && !username.contains(filter)) {
127-
iterator.next();
128-
continue;
132+
return;
129133
}
130-
User user = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), User.class);
134+
User user = JSON.parseObject(new String(value, StandardCharsets.UTF_8), User.class);
131135
result.add(user);
132-
iterator.next();
133-
}
136+
});
137+
} catch (Exception e) {
138+
future.completeExceptionally(e);
134139
}
135-
return CompletableFuture.completedFuture(result);
140+
future.complete(result);
141+
return future;
136142
}
137143

138144
@Override
139145
public void shutdown() {
140146
if (this.storage != null) {
141147
this.storage.shutdown();
142148
}
149+
if (this.cacheRefreshExecutor != null) {
150+
this.cacheRefreshExecutor.shutdown();
151+
}
143152
}
144153

145154
private static class UserCacheLoader implements CacheLoader<String, User> {
@@ -154,7 +163,7 @@ public UserCacheLoader(ConfigRocksDBStorage storage) {
154163
public User load(String username) {
155164
try {
156165
byte[] keyBytes = username.getBytes(StandardCharsets.UTF_8);
157-
byte[] valueBytes = storage.get(keyBytes);
166+
byte[] valueBytes = storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
158167
if (ArrayUtils.isEmpty(valueBytes)) {
159168
return EMPTY_USER;
160169
}

auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
306306
}
307307
break;
308308
case RequestCode.UNLOCK_BATCH_MQ:
309-
UnlockBatchRequestBody unlockBatchRequestBody = LockBatchRequestBody.decode(command.getBody(), UnlockBatchRequestBody.class);
309+
UnlockBatchRequestBody unlockBatchRequestBody = UnlockBatchRequestBody.decode(command.getBody(), UnlockBatchRequestBody.class);
310310
group = Resource.ofGroup(unlockBatchRequestBody.getConsumerGroup());
311311
result.add(DefaultAuthorizationContext.of(subject, group, Action.SUB, sourceIp));
312312
if (CollectionUtils.isNotEmpty(unlockBatchRequestBody.getMqSet())) {

auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,11 @@ private int comparePolicyEntries(PolicyEntry o1, PolicyEntry o2) {
154154
// the decision deny has higher priority
155155
Decision d1 = o1.getDecision();
156156
Decision d2 = o2.getDecision();
157-
return d1 == Decision.DENY ? 1 : d2 == Decision.DENY ? -1 : 0;
157+
158+
if (d1 != d2) {
159+
return d1 == Decision.DENY ? -1 : 1;
160+
}
161+
return 0;
158162
}
159163

160164
private static void throwException(DefaultAuthorizationContext context, String detail) {

auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/UserAuthorizationHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.rocketmq.auth.authentication.enums.SubjectType;
2222
import org.apache.rocketmq.auth.authentication.enums.UserStatus;
2323
import org.apache.rocketmq.auth.authentication.enums.UserType;
24-
import org.apache.rocketmq.auth.authentication.exception.AuthenticationException;
2524
import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
2625
import org.apache.rocketmq.auth.authentication.model.Subject;
2726
import org.apache.rocketmq.auth.authentication.model.User;
@@ -62,8 +61,8 @@ private CompletableFuture<User> getUser(Subject subject) {
6261
if (result == null) {
6362
throw new AuthorizationException("User:{} not found.", user.getUsername());
6463
}
65-
if (user.getUserStatus() == UserStatus.DISABLE) {
66-
throw new AuthenticationException("User:{} is disabled.", user.getUsername());
64+
if (result.getUserStatus() == UserStatus.DISABLE) {
65+
throw new AuthorizationException("User:{} is disabled.", result.getUsername());
6766
}
6867
return result;
6968
});

auth/src/main/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerImpl.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,19 +176,26 @@ public CompletableFuture<Void> deleteAcl(Subject subject, PolicyType policyType,
176176

177177
@Override
178178
public CompletableFuture<Acl> getAcl(Subject subject) {
179-
CompletableFuture<? extends Subject> subjectFuture;
180-
if (subject.isSubject(SubjectType.USER)) {
181-
User user = (User) subject;
182-
subjectFuture = this.getAuthenticationMetadataProvider().getUser(user.getUsername());
183-
} else {
184-
subjectFuture = CompletableFuture.completedFuture(subject);
185-
}
186-
return subjectFuture.thenCompose(sub -> {
187-
if (sub == null) {
188-
throw new AuthorizationException("The subject is not exist.");
179+
try {
180+
if (subject == null) {
181+
throw new AuthorizationException("The subject is null.");
189182
}
190-
return this.getAuthorizationMetadataProvider().getAcl(subject);
191-
});
183+
CompletableFuture<? extends Subject> subjectFuture;
184+
if (subject.isSubject(SubjectType.USER)) {
185+
User user = (User) subject;
186+
subjectFuture = this.getAuthenticationMetadataProvider().getUser(user.getUsername());
187+
} else {
188+
subjectFuture = CompletableFuture.completedFuture(subject);
189+
}
190+
return subjectFuture.thenCompose(sub -> {
191+
if (sub == null) {
192+
throw new AuthorizationException("The subject is not exist.");
193+
}
194+
return this.getAuthorizationMetadataProvider().getAcl(sub);
195+
});
196+
} catch (Exception e) {
197+
return this.handleException(e);
198+
}
192199
}
193200

194201
@Override

auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,26 @@
4040
import org.apache.rocketmq.auth.config.AuthConfig;
4141
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
4242
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
43-
import org.rocksdb.RocksIterator;
43+
import org.rocksdb.RocksDB;
4444

4545
public class LocalAuthorizationMetadataProvider implements AuthorizationMetadataProvider {
4646

47+
private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
48+
StandardCharsets.UTF_8);
49+
4750
private ConfigRocksDBStorage storage;
4851

4952
private LoadingCache<String, Acl> aclCache;
5053

54+
protected ThreadPoolExecutor cacheRefreshExecutor;
55+
5156
@Override
5257
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
53-
this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "acls");
58+
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false);
5459
if (!this.storage.start()) {
5560
throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
5661
}
57-
ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
62+
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
5863
1,
5964
1,
6065
1000 * 60,
@@ -77,7 +82,7 @@ public CompletableFuture<Void> createAcl(Acl acl) {
7782
Subject subject = acl.getSubject();
7883
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
7984
byte[] valueBytes = JSON.toJSONBytes(acl);
80-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
85+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
8186
this.storage.flushWAL();
8287
this.aclCache.invalidate(subject.getSubjectKey());
8388
} catch (Exception e) {
@@ -90,7 +95,7 @@ public CompletableFuture<Void> createAcl(Acl acl) {
9095
public CompletableFuture<Void> deleteAcl(Subject subject) {
9196
try {
9297
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
93-
this.storage.delete(keyBytes);
98+
this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
9499
this.storage.flushWAL();
95100
this.aclCache.invalidate(subject.getSubjectKey());
96101
} catch (Exception e) {
@@ -105,7 +110,7 @@ public CompletableFuture<Void> updateAcl(Acl acl) {
105110
Subject subject = acl.getSubject();
106111
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
107112
byte[] valueBytes = JSON.toJSONBytes(acl);
108-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
113+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
109114
this.storage.flushWAL();
110115
this.aclCache.invalidate(subject.getSubjectKey());
111116
} catch (Exception e) {
@@ -126,20 +131,18 @@ public CompletableFuture<Acl> getAcl(Subject subject) {
126131
@Override
127132
public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourceFilter) {
128133
List<Acl> result = new ArrayList<>();
129-
try (RocksIterator iterator = this.storage.iterator()) {
130-
iterator.seekToFirst();
131-
while (iterator.isValid()) {
132-
String subjectKey = new String(iterator.key(), StandardCharsets.UTF_8);
134+
CompletableFuture<List<Acl>> future = new CompletableFuture<>();
135+
try {
136+
this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
137+
String subjectKey = new String(key, StandardCharsets.UTF_8);
133138
if (StringUtils.isNotBlank(subjectFilter) && !subjectKey.contains(subjectFilter)) {
134-
iterator.next();
135-
continue;
139+
return;
136140
}
137141
Subject subject = Subject.of(subjectKey);
138-
Acl acl = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), Acl.class);
142+
Acl acl = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Acl.class);
139143
List<Policy> policies = acl.getPolicies();
140144
if (!CollectionUtils.isNotEmpty(policies)) {
141-
iterator.next();
142-
continue;
145+
return;
143146
}
144147
Iterator<Policy> policyIterator = policies.iterator();
145148
while (policyIterator.hasNext()) {
@@ -158,17 +161,22 @@ public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourc
158161
if (CollectionUtils.isNotEmpty(policies)) {
159162
result.add(Acl.of(subject, policies));
160163
}
161-
iterator.next();
162-
}
164+
});
165+
} catch (Exception e) {
166+
future.completeExceptionally(e);
163167
}
164-
return CompletableFuture.completedFuture(result);
168+
future.complete(result);
169+
return future;
165170
}
166171

167172
@Override
168173
public void shutdown() {
169174
if (this.storage != null) {
170175
this.storage.shutdown();
171176
}
177+
if (this.cacheRefreshExecutor != null) {
178+
this.cacheRefreshExecutor.shutdown();
179+
}
172180
}
173181

174182
private static class AclCacheLoader implements CacheLoader<String, Acl> {
@@ -185,7 +193,7 @@ public Acl load(String subjectKey) {
185193
byte[] keyBytes = subjectKey.getBytes(StandardCharsets.UTF_8);
186194
Subject subject = Subject.of(subjectKey);
187195

188-
byte[] valueBytes = this.storage.get(keyBytes);
196+
byte[] valueBytes = this.storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
189197
if (ArrayUtils.isEmpty(valueBytes)) {
190198
return EMPTY_ACL;
191199
}

auth/src/main/java/org/apache/rocketmq/auth/authorization/strategy/AbstractAuthorizationStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
*/
1717
package org.apache.rocketmq.auth.authorization.strategy;
1818

19-
import java.util.ArrayList;
20-
import java.util.List;
19+
import java.util.HashSet;
20+
import java.util.Set;
2121
import java.util.function.Supplier;
2222
import org.apache.commons.lang3.StringUtils;
2323
import org.apache.rocketmq.auth.authorization.context.AuthorizationContext;
@@ -30,7 +30,7 @@
3030
public abstract class AbstractAuthorizationStrategy implements AuthorizationStrategy {
3131

3232
protected final AuthConfig authConfig;
33-
protected final List<String> authorizationWhitelist = new ArrayList<>();
33+
protected final Set<String> authorizationWhiteSet = new HashSet<>();
3434
protected final AuthorizationProvider<AuthorizationContext> authorizationProvider;
3535

3636
public AbstractAuthorizationStrategy(AuthConfig authConfig, Supplier<?> metadataService) {
@@ -42,7 +42,7 @@ public AbstractAuthorizationStrategy(AuthConfig authConfig, Supplier<?> metadata
4242
if (StringUtils.isNotBlank(authConfig.getAuthorizationWhitelist())) {
4343
String[] whitelist = StringUtils.split(authConfig.getAuthorizationWhitelist(), ",");
4444
for (String rpcCode : whitelist) {
45-
this.authorizationWhitelist.add(StringUtils.trim(rpcCode));
45+
this.authorizationWhiteSet.add(StringUtils.trim(rpcCode));
4646
}
4747
}
4848
}
@@ -57,7 +57,7 @@ public void doEvaluate(AuthorizationContext context) {
5757
if (this.authorizationProvider == null) {
5858
return;
5959
}
60-
if (this.authorizationWhitelist.contains(context.getRpcCode())) {
60+
if (this.authorizationWhiteSet.contains(context.getRpcCode())) {
6161
return;
6262
}
6363
try {

0 commit comments

Comments
 (0)