Skip to content

Commit 1e07aa5

Browse files
yx9oRongtongJin
authored andcommitted
[ISSUE# 9333] Use fastjson2 in broker module (#9334)
* [#ISSUE 9333] Use fastjson2 in broker module * Update * Fix the serialization failure caused by improper get/set naming
1 parent 50ff54c commit 1e07aa5

22 files changed

Lines changed: 715 additions & 219 deletions

broker/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ java_library(
3232
"//tieredstore",
3333
"@maven//:org_slf4j_slf4j_api",
3434
"@maven//:ch_qos_logback_logback_classic",
35-
"@maven//:com_alibaba_fastjson",
3635
"@maven//:com_alibaba_fastjson2_fastjson2",
3736
"@maven//:com_github_luben_zstd_jni",
3837
"@maven//:com_google_guava_guava",
@@ -84,7 +83,6 @@ java_library(
8483
"//remoting",
8584
"//store",
8685
"//tieredstore",
87-
"@maven//:com_alibaba_fastjson",
8886
"@maven//:com_alibaba_fastjson2_fastjson2",
8987
"@maven//:com_google_guava_guava",
9088
"@maven//:io_netty_netty_all",

broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import java.nio.charset.StandardCharsets;
21-
import java.util.function.BiConsumer;
19+
import com.alibaba.fastjson2.JSON;
2220
import org.apache.commons.lang3.StringUtils;
2321
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
2422
import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,6 +29,9 @@
3129
import org.rocksdb.Statistics;
3230
import org.rocksdb.WriteBatch;
3331

32+
import java.nio.charset.StandardCharsets;
33+
import java.util.function.BiConsumer;
34+
3435
public class RocksDBConfigManager {
3536
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3637
public volatile boolean isStop = false;

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19-
import com.alibaba.fastjson.annotation.JSONField;
19+
import com.alibaba.fastjson2.annotation.JSONField;
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.base.MoreObjects;
22-
import java.util.ArrayList;
23-
import java.util.HashMap;
24-
import java.util.HashSet;
25-
import java.util.Iterator;
26-
import java.util.List;
27-
import java.util.Map;
28-
import java.util.Set;
29-
import java.util.concurrent.ConcurrentHashMap;
3022
import org.apache.rocketmq.broker.BrokerController;
3123
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
3224
import org.apache.rocketmq.common.ConfigManager;
@@ -37,6 +29,15 @@
3729
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
3830
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
3931

32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
35+
import java.util.Iterator;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
4041
public class ConsumerOrderInfoManager extends ConfigManager {
4142

4243
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.pop;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import com.alibaba.fastjson.JSONObject;
21-
import com.alibaba.fastjson.annotation.JSONField;
19+
import com.alibaba.fastjson2.JSON;
20+
import com.alibaba.fastjson2.annotation.JSONField;
21+
2222
import java.nio.ByteBuffer;
2323
import java.nio.charset.StandardCharsets;
2424

@@ -119,7 +119,7 @@ public byte[] getValueBytes() {
119119
}
120120

121121
public static PopConsumerRecord decode(byte[] body) {
122-
return JSONObject.parseObject(body, PopConsumerRecord.class);
122+
return JSON.parseObject(body, PopConsumerRecord.class);
123123
}
124124

125125
public long getPopTime() {

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.pop;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.base.Stopwatch;
22-
import java.nio.ByteBuffer;
23-
import java.nio.file.Paths;
24-
import java.util.ArrayList;
25-
import java.util.Collections;
26-
import java.util.HashMap;
27-
import java.util.List;
28-
import java.util.Objects;
29-
import java.util.Queue;
30-
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.CompletionException;
32-
import java.util.concurrent.ConcurrentHashMap;
33-
import java.util.concurrent.ConcurrentMap;
34-
import java.util.concurrent.LinkedBlockingQueue;
35-
import java.util.concurrent.TimeUnit;
36-
import java.util.concurrent.atomic.AtomicBoolean;
37-
import java.util.concurrent.atomic.AtomicLong;
3822
import org.apache.commons.lang3.StringUtils;
3923
import org.apache.commons.lang3.tuple.Triple;
4024
import org.apache.rocketmq.broker.BrokerController;
@@ -65,6 +49,23 @@
6549
import org.slf4j.Logger;
6650
import org.slf4j.LoggerFactory;
6751

52+
import java.nio.ByteBuffer;
53+
import java.nio.file.Paths;
54+
import java.util.ArrayList;
55+
import java.util.Collections;
56+
import java.util.HashMap;
57+
import java.util.List;
58+
import java.util.Objects;
59+
import java.util.Queue;
60+
import java.util.concurrent.CompletableFuture;
61+
import java.util.concurrent.CompletionException;
62+
import java.util.concurrent.ConcurrentHashMap;
63+
import java.util.concurrent.ConcurrentMap;
64+
import java.util.concurrent.LinkedBlockingQueue;
65+
import java.util.concurrent.TimeUnit;
66+
import java.util.concurrent.atomic.AtomicBoolean;
67+
import java.util.concurrent.atomic.AtomicLong;
68+
6869
public class PopConsumerService extends ServiceThread {
6970

7071
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22-
import java.util.BitSet;
23-
import java.nio.charset.StandardCharsets;
2422
import org.apache.rocketmq.broker.BrokerController;
2523
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2624
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -52,6 +50,9 @@
5250
import org.apache.rocketmq.store.pop.AckMsg;
5351
import org.apache.rocketmq.store.pop.BatchAckMsg;
5452

53+
import java.nio.charset.StandardCharsets;
54+
import java.util.BitSet;
55+
5556
public class AckMessageProcessor implements NettyRequestProcessor {
5657

5758
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,12 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import com.alibaba.fastjson.JSONObject;
19+
import com.alibaba.fastjson2.JSON;
20+
import com.alibaba.fastjson2.JSONObject;
2121
import com.google.common.collect.Sets;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.opentelemetry.api.common.Attributes;
25-
import java.io.UnsupportedEncodingException;
26-
import java.net.UnknownHostException;
27-
import java.nio.charset.StandardCharsets;
28-
import java.util.ArrayList;
29-
import java.util.Arrays;
30-
import java.util.HashMap;
31-
import java.util.HashSet;
32-
import java.util.Iterator;
33-
import java.util.List;
34-
import java.util.Map;
35-
import java.util.Properties;
36-
import java.util.Set;
37-
import java.util.concurrent.CompletableFuture;
38-
import java.util.concurrent.CompletionException;
39-
import java.util.concurrent.ConcurrentHashMap;
40-
import java.util.concurrent.ConcurrentMap;
41-
import java.util.concurrent.CountDownLatch;
42-
import java.util.concurrent.ExecutorService;
43-
import java.util.concurrent.SynchronousQueue;
44-
import java.util.concurrent.ThreadPoolExecutor;
45-
import java.util.concurrent.TimeUnit;
4625
import org.apache.commons.collections.CollectionUtils;
4726
import org.apache.commons.lang3.StringUtils;
4827
import org.apache.rocketmq.acl.AccessValidator;
@@ -236,6 +215,28 @@
236215
import org.apache.rocketmq.store.timer.TimerMessageStore;
237216
import org.apache.rocketmq.store.util.LibC;
238217

218+
import java.io.UnsupportedEncodingException;
219+
import java.net.UnknownHostException;
220+
import java.nio.charset.StandardCharsets;
221+
import java.util.ArrayList;
222+
import java.util.Arrays;
223+
import java.util.HashMap;
224+
import java.util.HashSet;
225+
import java.util.Iterator;
226+
import java.util.List;
227+
import java.util.Map;
228+
import java.util.Properties;
229+
import java.util.Set;
230+
import java.util.concurrent.CompletableFuture;
231+
import java.util.concurrent.CompletionException;
232+
import java.util.concurrent.ConcurrentHashMap;
233+
import java.util.concurrent.ConcurrentMap;
234+
import java.util.concurrent.CountDownLatch;
235+
import java.util.concurrent.ExecutorService;
236+
import java.util.concurrent.SynchronousQueue;
237+
import java.util.concurrent.ThreadPoolExecutor;
238+
import java.util.concurrent.TimeUnit;
239+
239240
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
240241
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
241242
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2891,7 +2892,7 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,
28912892
} else {
28922893
ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager()
28932894
.get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
2894-
body.setFilterData(JSON.toJSONString(filterData, true));
2895+
body.setFilterData(JSON.toJSONString(filterData));
28952896

28962897
messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,
28972898
this.brokerController.getConsumerFilterManager());

broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.TimeUnit;
24-
import java.nio.charset.StandardCharsets;
2522
import org.apache.rocketmq.broker.BrokerController;
2623
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2724
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,6 +47,10 @@
5047
import org.apache.rocketmq.store.pop.AckMsg;
5148
import org.apache.rocketmq.store.pop.PopCheckPoint;
5249

50+
import java.nio.charset.StandardCharsets;
51+
import java.util.concurrent.CompletableFuture;
52+
import java.util.concurrent.TimeUnit;
53+
5354
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
5455
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
5556
private final BrokerController brokerController;

broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import java.nio.charset.StandardCharsets;
21-
import java.util.ArrayList;
22-
import java.util.Iterator;
23-
import java.util.List;
24-
import java.util.Map;
25-
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.LinkedBlockingDeque;
27-
import java.util.concurrent.atomic.AtomicInteger;
19+
import com.alibaba.fastjson2.JSON;
2820
import org.apache.rocketmq.broker.BrokerController;
2921
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
3022
import org.apache.rocketmq.common.KeyBuilder;
@@ -44,6 +36,15 @@
4436
import org.apache.rocketmq.store.pop.BatchAckMsg;
4537
import org.apache.rocketmq.store.pop.PopCheckPoint;
4638

39+
import java.nio.charset.StandardCharsets;
40+
import java.util.ArrayList;
41+
import java.util.Iterator;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.concurrent.ConcurrentHashMap;
45+
import java.util.concurrent.LinkedBlockingDeque;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
4748
public class PopBufferMergeService extends ServiceThread {
4849
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
4950
ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,13 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.netty.channel.FileRegion;
2525
import io.opentelemetry.api.common.Attributes;
26-
import java.nio.ByteBuffer;
27-
import java.nio.charset.StandardCharsets;
28-
import java.util.Iterator;
29-
import java.util.List;
30-
import java.util.Map;
31-
import java.util.Map.Entry;
32-
import java.util.Objects;
33-
import java.util.Random;
34-
import java.util.concurrent.CompletableFuture;
35-
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.ConcurrentSkipListSet;
37-
import java.util.concurrent.TimeUnit;
38-
import java.util.concurrent.atomic.AtomicBoolean;
39-
import java.util.concurrent.atomic.AtomicLong;
4026
import org.apache.rocketmq.broker.BrokerController;
4127
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
4228
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -91,6 +77,21 @@
9177
import org.apache.rocketmq.store.pop.BatchAckMsg;
9278
import org.apache.rocketmq.store.pop.PopCheckPoint;
9379

80+
import java.nio.ByteBuffer;
81+
import java.nio.charset.StandardCharsets;
82+
import java.util.Iterator;
83+
import java.util.List;
84+
import java.util.Map;
85+
import java.util.Map.Entry;
86+
import java.util.Objects;
87+
import java.util.Random;
88+
import java.util.concurrent.CompletableFuture;
89+
import java.util.concurrent.ConcurrentHashMap;
90+
import java.util.concurrent.ConcurrentSkipListSet;
91+
import java.util.concurrent.TimeUnit;
92+
import java.util.concurrent.atomic.AtomicBoolean;
93+
import java.util.concurrent.atomic.AtomicLong;
94+
9495
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
9596
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
9697
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;

0 commit comments

Comments
 (0)