Skip to content

Commit 2f384c9

Browse files
lizhiminsRongtongJin
authored andcommitted
Revert "[ISSUE# 9333] Use fastjson2 in broker module (#9334)" (#9387)
This reverts commit b1daa3c.
1 parent 677a0f5 commit 2f384c9

22 files changed

Lines changed: 219 additions & 715 deletions

broker/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ java_library(
3131
"//tieredstore",
3232
"@maven//:org_slf4j_slf4j_api",
3333
"@maven//:ch_qos_logback_logback_classic",
34+
"@maven//:com_alibaba_fastjson",
3435
"@maven//:com_alibaba_fastjson2_fastjson2",
3536
"@maven//:com_github_luben_zstd_jni",
3637
"@maven//:com_google_guava_guava",
@@ -81,6 +82,7 @@ java_library(
8182
"//remoting",
8283
"//store",
8384
"//tieredstore",
85+
"@maven//:com_alibaba_fastjson",
8486
"@maven//:com_alibaba_fastjson2_fastjson2",
8587
"@maven//:org_slf4j_slf4j_api",
8688
"@maven//:com_google_guava_guava",

broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.function.BiConsumer;
2022
import org.apache.commons.lang3.StringUtils;
2123
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
2224
import org.apache.rocketmq.common.constant.LoggerName;
@@ -29,9 +31,6 @@
2931
import org.rocksdb.Statistics;
3032
import org.rocksdb.WriteBatch;
3133

32-
import java.nio.charset.StandardCharsets;
33-
import java.util.function.BiConsumer;
34-
3534
public class RocksDBConfigManager {
3635
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3736
public volatile boolean isStop = false;

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,17 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19-
import com.alibaba.fastjson2.annotation.JSONField;
19+
import com.alibaba.fastjson.annotation.JSONField;
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.base.MoreObjects;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.concurrent.ConcurrentHashMap;
2230
import org.apache.rocketmq.broker.BrokerController;
2331
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
2432
import org.apache.rocketmq.common.ConfigManager;
@@ -29,15 +37,6 @@
2937
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
3038
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
3139

32-
import java.util.ArrayList;
33-
import java.util.HashMap;
34-
import java.util.HashSet;
35-
import java.util.Iterator;
36-
import java.util.List;
37-
import java.util.Map;
38-
import java.util.Set;
39-
import java.util.concurrent.ConcurrentHashMap;
40-
4140
public class ConsumerOrderInfoManager extends ConfigManager {
4241

4342
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.pop;
1818

19-
import com.alibaba.fastjson2.JSON;
20-
import com.alibaba.fastjson2.annotation.JSONField;
21-
19+
import com.alibaba.fastjson.JSON;
20+
import com.alibaba.fastjson.JSONObject;
21+
import com.alibaba.fastjson.annotation.JSONField;
2222
import java.nio.ByteBuffer;
2323
import java.nio.charset.StandardCharsets;
2424

@@ -119,7 +119,7 @@ public byte[] getValueBytes() {
119119
}
120120

121121
public static PopConsumerRecord decode(byte[] body) {
122-
return JSON.parseObject(body, PopConsumerRecord.class);
122+
return JSONObject.parseObject(body, PopConsumerRecord.class);
123123
}
124124

125125
public long getPopTime() {

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,25 @@
1616
*/
1717
package org.apache.rocketmq.broker.pop;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.google.common.base.Stopwatch;
22+
import java.nio.ByteBuffer;
23+
import java.nio.file.Paths;
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.Queue;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.ConcurrentMap;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicLong;
2238
import org.apache.commons.lang3.StringUtils;
2339
import org.apache.commons.lang3.tuple.Triple;
2440
import org.apache.rocketmq.broker.BrokerController;
@@ -49,23 +65,6 @@
4965
import org.slf4j.Logger;
5066
import org.slf4j.LoggerFactory;
5167

52-
import java.nio.ByteBuffer;
53-
import java.nio.file.Paths;
54-
import java.util.ArrayList;
55-
import java.util.Collections;
56-
import java.util.HashMap;
57-
import java.util.List;
58-
import java.util.Objects;
59-
import java.util.Queue;
60-
import java.util.concurrent.CompletableFuture;
61-
import java.util.concurrent.CompletionException;
62-
import java.util.concurrent.ConcurrentHashMap;
63-
import java.util.concurrent.ConcurrentMap;
64-
import java.util.concurrent.LinkedBlockingQueue;
65-
import java.util.concurrent.TimeUnit;
66-
import java.util.concurrent.atomic.AtomicBoolean;
67-
import java.util.concurrent.atomic.AtomicLong;
68-
6968
public class PopConsumerService extends ServiceThread {
7069

7170
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22+
import java.util.BitSet;
23+
import java.nio.charset.StandardCharsets;
2224
import org.apache.rocketmq.broker.BrokerController;
2325
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2426
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,9 +52,6 @@
5052
import org.apache.rocketmq.store.pop.AckMsg;
5153
import org.apache.rocketmq.store.pop.BatchAckMsg;
5254

53-
import java.nio.charset.StandardCharsets;
54-
import java.util.BitSet;
55-
5655
public class AckMessageProcessor implements NettyRequestProcessor {
5756

5857
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,33 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson2.JSON;
20-
import com.alibaba.fastjson2.JSONObject;
19+
import com.alibaba.fastjson.JSON;
20+
import com.alibaba.fastjson.JSONObject;
2121
import com.google.common.collect.Sets;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.opentelemetry.api.common.Attributes;
25+
import java.io.UnsupportedEncodingException;
26+
import java.net.UnknownHostException;
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.HashMap;
31+
import java.util.HashSet;
32+
import java.util.Iterator;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Properties;
36+
import java.util.Set;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CompletionException;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.concurrent.ConcurrentMap;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.SynchronousQueue;
44+
import java.util.concurrent.ThreadPoolExecutor;
45+
import java.util.concurrent.TimeUnit;
2546
import org.apache.commons.collections.CollectionUtils;
2647
import org.apache.commons.lang3.StringUtils;
2748
import org.apache.rocketmq.auth.authentication.enums.UserType;
@@ -206,28 +227,6 @@
206227
import org.apache.rocketmq.store.timer.TimerMessageStore;
207228
import org.apache.rocketmq.store.util.LibC;
208229

209-
import java.io.UnsupportedEncodingException;
210-
import java.net.UnknownHostException;
211-
import java.nio.charset.StandardCharsets;
212-
import java.util.ArrayList;
213-
import java.util.Arrays;
214-
import java.util.HashMap;
215-
import java.util.HashSet;
216-
import java.util.Iterator;
217-
import java.util.List;
218-
import java.util.Map;
219-
import java.util.Properties;
220-
import java.util.Set;
221-
import java.util.concurrent.CompletableFuture;
222-
import java.util.concurrent.CompletionException;
223-
import java.util.concurrent.ConcurrentHashMap;
224-
import java.util.concurrent.ConcurrentMap;
225-
import java.util.concurrent.CountDownLatch;
226-
import java.util.concurrent.ExecutorService;
227-
import java.util.concurrent.SynchronousQueue;
228-
import java.util.concurrent.ThreadPoolExecutor;
229-
import java.util.concurrent.TimeUnit;
230-
231230
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
232231
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
233232
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2733,7 +2732,7 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,
27332732
} else {
27342733
ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager()
27352734
.get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
2736-
body.setFilterData(JSON.toJSONString(filterData));
2735+
body.setFilterData(JSON.toJSONString(filterData, true));
27372736

27382737
messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,
27392738
this.brokerController.getConsumerFilterManager());

broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.TimeUnit;
24+
import java.nio.charset.StandardCharsets;
2225
import org.apache.rocketmq.broker.BrokerController;
2326
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2427
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -47,10 +50,6 @@
4750
import org.apache.rocketmq.store.pop.AckMsg;
4851
import org.apache.rocketmq.store.pop.PopCheckPoint;
4952

50-
import java.nio.charset.StandardCharsets;
51-
import java.util.concurrent.CompletableFuture;
52-
import java.util.concurrent.TimeUnit;
53-
5453
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
5554
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
5655
private final BrokerController brokerController;

broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.ArrayList;
22+
import java.util.Iterator;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.LinkedBlockingDeque;
27+
import java.util.concurrent.atomic.AtomicInteger;
2028
import org.apache.rocketmq.broker.BrokerController;
2129
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2230
import org.apache.rocketmq.common.KeyBuilder;
@@ -36,15 +44,6 @@
3644
import org.apache.rocketmq.store.pop.BatchAckMsg;
3745
import org.apache.rocketmq.store.pop.PopCheckPoint;
3846

39-
import java.nio.charset.StandardCharsets;
40-
import java.util.ArrayList;
41-
import java.util.Iterator;
42-
import java.util.List;
43-
import java.util.Map;
44-
import java.util.concurrent.ConcurrentHashMap;
45-
import java.util.concurrent.LinkedBlockingDeque;
46-
import java.util.concurrent.atomic.AtomicInteger;
47-
4847
public class PopBufferMergeService extends ServiceThread {
4948
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
5049
ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,27 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson2.JSON;
19+
import com.alibaba.fastjson.JSON;
2020
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.netty.channel.FileRegion;
2525
import io.opentelemetry.api.common.Attributes;
26+
import java.nio.ByteBuffer;
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Iterator;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
32+
import java.util.Objects;
33+
import java.util.Random;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ConcurrentSkipListSet;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.concurrent.atomic.AtomicLong;
2640
import org.apache.rocketmq.broker.BrokerController;
2741
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
2842
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -77,21 +91,6 @@
7791
import org.apache.rocketmq.store.pop.BatchAckMsg;
7892
import org.apache.rocketmq.store.pop.PopCheckPoint;
7993

80-
import java.nio.ByteBuffer;
81-
import java.nio.charset.StandardCharsets;
82-
import java.util.Iterator;
83-
import java.util.List;
84-
import java.util.Map;
85-
import java.util.Map.Entry;
86-
import java.util.Objects;
87-
import java.util.Random;
88-
import java.util.concurrent.CompletableFuture;
89-
import java.util.concurrent.ConcurrentHashMap;
90-
import java.util.concurrent.ConcurrentSkipListSet;
91-
import java.util.concurrent.TimeUnit;
92-
import java.util.concurrent.atomic.AtomicBoolean;
93-
import java.util.concurrent.atomic.AtomicLong;
94-
9594
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
9695
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
9796
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;

0 commit comments

Comments
 (0)