diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java index 3384d479c6e..11ed3c314cd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java @@ -18,12 +18,6 @@ package org.apache.rocketmq.broker.subscription; import com.google.common.collect.ImmutableMap; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SubscriptionGroupAttributes; @@ -38,11 +32,17 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -100,6 +100,7 @@ public void updateSubscriptionGroupConfig() { subscriptionGroupConfig.setGroupName(group); Map attr = ImmutableMap.of("+test", "true"); subscriptionGroupConfig.setAttributes(attr); + SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerControllerMock); subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig); SubscriptionGroupConfig result = subscriptionGroupManager.getSubscriptionGroupTable().get(group); assertThat(result).isNotNull(); @@ -156,14 +157,10 @@ public void testUpdateSubscriptionGroupConfigList_ValidConfigList() { groupNames.add(groupName); } + SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerControllerMock); subscriptionGroupManager.updateSubscriptionGroupConfigList(configList); - // Verifying that persist() is called once - verify(subscriptionGroupManager, times(1)).persist(); - groupNames.forEach(groupName -> assertThat(subscriptionGroupManager.getSubscriptionGroupTable().get(groupName)).isNotNull()); - } - -} \ No newline at end of file +} diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 592c247057b..f57b3f80fe4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -17,17 +17,6 @@ package org.apache.rocketmq.client.consumer; -import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.store.OffsetStore; @@ -65,6 +54,18 @@ import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.awaitility.Awaitility.await; @@ -75,6 +76,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -898,5 +900,6 @@ private void suppressUpdateTopicRouteInfoFromNameServer( ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + doNothing().when(mQClientFactory).updateTopicRouteInfoFromNameServer(); } } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java index 71682fb52c0..520f4da5f2a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java @@ -206,7 +206,7 @@ public void assertUpdateOrCreateTopicWithError() { @Test public void assertUpdateOrCreateSubscriptionGroupWithSuccess() throws Exception { setResponseSuccess(null); - SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); CompletableFuture actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout); assertNull(actual.get()); } @@ -214,7 +214,7 @@ public void assertUpdateOrCreateSubscriptionGroupWithSuccess() throws Exception @Test public void assertUpdateOrCreateSubscriptionGroupWithError() { setResponseError(); - SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); CompletableFuture actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout); Throwable thrown = assertThrows(ExecutionException.class, actual::get); assertTrue(thrown.getCause() instanceof MQClientException); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java index dd7ffa757f8..a12633be1bd 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java @@ -16,11 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.TreeMap; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -34,6 +29,12 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; + import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -164,6 +165,7 @@ private ProcessQueue createProcessQueue() { ProcessQueue result = new ProcessQueue(); result.setMsgAccCnt(1); result.incTryUnlockTimes(); + result.setLastPullTimestamp(10000L); return result; } diff --git a/common/BUILD.bazel b/common/BUILD.bazel index 10c5d19fbe8..8aeeb2f24fc 100644 --- a/common/BUILD.bazel +++ b/common/BUILD.bazel @@ -21,7 +21,6 @@ java_library( srcs = glob(["src/main/java/**/*.java"]), visibility = ["//visibility:public"], deps = [ - "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_github_luben_zstd_jni", "@maven//:com_google_guava_guava", @@ -55,7 +54,6 @@ java_library( ":common", "//:test_deps", "@maven//:com_google_guava_guava", - "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:commons_codec_commons_codec", "@maven//:io_netty_netty_all", diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index 0bf64905a03..446e9c21185 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.common; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.TypeReference; -import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.fastjson2.annotation.JSONField; import java.util.HashMap; import java.util.Map; import java.util.Objects; diff --git a/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java b/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java index 80a1554d123..1da7420c04e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java +++ b/common/src/main/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializer.java @@ -17,11 +17,10 @@ package org.apache.rocketmq.common.fastjson; -import com.alibaba.fastjson.JSONException; -import com.alibaba.fastjson.parser.DefaultJSONParser; -import com.alibaba.fastjson.parser.JSONToken; -import com.alibaba.fastjson.parser.deserializer.MapDeserializer; -import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer; +import com.alibaba.fastjson2.JSONException; +import com.alibaba.fastjson2.JSONReader; +import com.alibaba.fastjson2.reader.ObjectReader; + import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Map; @@ -29,32 +28,40 @@ /** * workaround https://github.com/alibaba/fastjson/issues/3730 */ -public class GenericMapSuperclassDeserializer implements ObjectDeserializer { +public class GenericMapSuperclassDeserializer implements ObjectReader { public static final GenericMapSuperclassDeserializer INSTANCE = new GenericMapSuperclassDeserializer(); - @SuppressWarnings({"unchecked", "rawtypes"}) @Override - public T deserialze(DefaultJSONParser parser, Type type, Object fieldName) { + public Object readObject(JSONReader reader, Type type, Object fieldName, long features) { Class clz = (Class) type; Type genericSuperclass = clz.getGenericSuperclass(); Map map; try { - map = (Map) clz.newInstance(); + map = (Map) clz.getDeclaredConstructor().newInstance(); } catch (Exception e) { - throw new JSONException("unsupport type " + type, e); + throw new com.alibaba.fastjson2.JSONException("unsupport type " + type, e); } ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass; Type keyType = parameterizedType.getActualTypeArguments()[0]; Type valueType = parameterizedType.getActualTypeArguments()[1]; - if (String.class == keyType) { - return (T) MapDeserializer.parseMap(parser, (Map) map, valueType, fieldName); - } else { - return (T) MapDeserializer.parseMap(parser, map, keyType, valueType, fieldName); + + if (!reader.nextIfObjectStart()) { + throw new JSONException(reader.info("expect '{', but " + reader.current())); } - } - @Override - public int getFastMatchToken() { - return JSONToken.LBRACE; + while (!reader.nextIfObjectEnd()) { + Object key; + if (keyType == String.class) { + key = reader.readFieldName(); + } else { + key = reader.getContext().getProvider().getObjectReader(keyType).readObject(reader, keyType, fieldName, features); + reader.nextIfMatch(':'); + } + + Object value = reader.getContext().getProvider().getObjectReader(valueType).readObject(reader, valueType, fieldName, features); + map.put(key, value); + reader.nextIfComma(); + } + return map; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java index 600054b40b8..943e8235662 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java @@ -16,23 +16,13 @@ */ package org.apache.rocketmq.common.utils; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.support.config.FastJsonConfig; +import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.SerializationException; /** * The object serializer based on fastJson */ public class FastJsonSerializer implements Serializer { - private FastJsonConfig fastJsonConfig = new FastJsonConfig(); - - public FastJsonConfig getFastJsonConfig() { - return this.fastJsonConfig; - } - - public void setFastJsonConfig(FastJsonConfig fastJsonConfig) { - this.fastJsonConfig = fastJsonConfig; - } @Override public byte[] serialize(T t) throws SerializationException { @@ -40,7 +30,7 @@ public byte[] serialize(T t) throws SerializationException { return new byte[0]; } else { try { - return JSON.toJSONBytes(this.fastJsonConfig.getCharset(), t, this.fastJsonConfig.getSerializeConfig(), this.fastJsonConfig.getSerializeFilters(), this.fastJsonConfig.getDateFormat(), JSON.DEFAULT_GENERATE_FEATURE, this.fastJsonConfig.getSerializerFeatures()); + return JSON.toJSONBytes(t); } catch (Exception var3) { throw new SerializationException("Could not serialize: " + var3.getMessage(), var3); } @@ -51,7 +41,7 @@ public byte[] serialize(T t) throws SerializationException { public T deserialize(byte[] bytes, Class type) throws SerializationException { if (bytes != null && bytes.length != 0) { try { - return JSON.parseObject(bytes, this.fastJsonConfig.getCharset(), type, this.fastJsonConfig.getParserConfig(), this.fastJsonConfig.getParseProcess(), JSON.DEFAULT_PARSER_FEATURE, this.fastJsonConfig.getFeatures()); + return JSON.parseObject(bytes, type); } catch (Exception var3) { throw new SerializationException("Could not deserialize: " + var3.getMessage(), var3); } diff --git a/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java b/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java new file mode 100644 index 00000000000..36c633f699c --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/fastjson/GenericMapSuperclassDeserializerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.fastjson; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONException; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class GenericMapSuperclassDeserializerTest { + + public static class CustomMap extends HashMap { + private static final long serialVersionUID = 1L; + } + + public static class IntKeyMap extends HashMap { + private static final long serialVersionUID = 1L; + } + + @Test + public void testBasicDeserialization() { + JSON.registerIfAbsent(CustomMap.class, GenericMapSuperclassDeserializer.INSTANCE); + String json = "{\"key1\":\"value1\",\"key2\":42,\"key3\":true}"; + CustomMap map = JSON.parseObject(json, CustomMap.class); + + assertNotNull(map); + assertEquals(3, map.size()); + assertEquals("value1", map.get("key1")); + assertEquals(42, map.get("key2")); + assertEquals(true, map.get("key3")); + } + + @Test + public void testNestedObjects() { + JSON.registerIfAbsent(CustomMap.class, GenericMapSuperclassDeserializer.INSTANCE); + String json = "{\"simple\":\"value\",\"nested\":{\"inner\":123},\"array\":[1,2,3]}"; + CustomMap map = JSON.parseObject(json, CustomMap.class); + + assertNotNull(map); + assertEquals(3, map.size()); + assertEquals("value", map.get("simple")); + + assertTrue(map.get("nested") instanceof Map); + Map nestedMap = (Map) map.get("nested"); + assertEquals(123, nestedMap.get("inner")); + + assertTrue(map.get("array") instanceof java.util.List); + java.util.List array = (java.util.List) map.get("array"); + assertEquals(3, array.size()); + assertEquals(1, array.get(0)); + assertEquals(2, array.get(1)); + assertEquals(3, array.get(2)); + } + + @Test + public void testEmptyObject() { + JSON.registerIfAbsent(CustomMap.class, GenericMapSuperclassDeserializer.INSTANCE); + String json = "{}"; + CustomMap map = JSON.parseObject(json, CustomMap.class); + + assertNotNull(map); + assertEquals(0, map.size()); + } + + @Test + public void testNonStringKey() { + JSON.registerIfAbsent(IntKeyMap.class, GenericMapSuperclassDeserializer.INSTANCE); + String json = "{1:\"one\",2:\"two\",3:\"three\"}"; + IntKeyMap map = JSON.parseObject(json, IntKeyMap.class); + + assertNotNull(map); + assertEquals(3, map.size()); + assertEquals("one", map.get(1)); + assertEquals("two", map.get(2)); + assertEquals("three", map.get(3)); + } + + @Test(expected = JSONException.class) + public void testMalformedJson() { + JSON.registerIfAbsent(CustomMap.class, GenericMapSuperclassDeserializer.INSTANCE); + String json = "{\"key\":\"missing closing brace\""; + JSON.parseObject(json, CustomMap.class); + } +} diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/event/EventSerializerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/event/EventSerializerTest.java new file mode 100644 index 00000000000..76d6a344d4e --- /dev/null +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/event/EventSerializerTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.controller.impl.event; + +import org.apache.commons.lang3.SerializationException; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.common.utils.FastJsonSerializer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class EventSerializerTest { + + @Mock + private FastJsonSerializer serializer; + + private final EventSerializer eventSerializer = new EventSerializer(); + + @Before + public void init() throws IllegalAccessException { + FieldUtils.writeDeclaredField(eventSerializer, "serializer", serializer, true); + } + + @Test + public void testSerializeValidEventMessageShouldReturnSerializedData() { + EventMessage eventMessage = mock(EventMessage.class); + EventType eventType = EventType.APPLY_BROKER_ID_EVENT; + when(eventMessage.getEventType()).thenReturn(eventType); + when(serializer.serialize(eventMessage)).thenReturn("{\"event\":\"APPLY_BROKER_ID_EVENT\"}".getBytes()); + byte[] result = eventSerializer.serialize(eventMessage); + assertNotNull(result); + } + + @Test + public void testSerializeEventMessageWithNoEventType() { + EventMessage eventMessage = mock(EventMessage.class); + when(eventMessage.getEventType()).thenReturn(null); + assertThrows(NullPointerException.class, () -> eventSerializer.serialize(eventMessage)); + } + + @Test + public void testSerializeSerializerReturnsNullShouldReturnNull() { + EventMessage eventMessage = mock(EventMessage.class); + EventType eventType = EventType.READ_EVENT; + when(eventMessage.getEventType()).thenReturn(eventType); + when(serializer.serialize(eventMessage)).thenReturn(null); + byte[] result = eventSerializer.serialize(eventMessage); + assertNull(result); + } + + @Test + public void testSerializeSerializerThrowsException() { + EventMessage eventMessage = mock(EventMessage.class); + EventType eventType = EventType.ELECT_MASTER_EVENT; + when(eventMessage.getEventType()).thenReturn(eventType); + when(serializer.serialize(eventMessage)).thenThrow(new RuntimeException("Serialization error")); + assertThrows(RuntimeException.class, () -> eventSerializer.serialize(eventMessage)); + } + + @Test + public void testDeserializeBytesLessThanTwoReturnsNull() { + byte[] bytes = new byte[1]; + assertNull(eventSerializer.deserialize(bytes)); + } + + @Test + public void testDeserializeInvalidEventIdReturnsNull() { + assertNull(eventSerializer.deserialize(new byte[]{0, 0xF})); + } + + @Test + public void testDeserializeValidEventTypeReturnsEventMessage() throws SerializationException { + byte[] data = new byte[]{0, 0xF}; + byte[] bytes = new byte[]{0, (byte) EventType.ALTER_SYNC_STATE_SET_EVENT.getId(), data[0], data[1]}; + AlterSyncStateSetEvent alterSyncStateSetEvent = mock(AlterSyncStateSetEvent.class); + when(serializer.deserialize(any(byte[].class), eq(AlterSyncStateSetEvent.class))).thenReturn(alterSyncStateSetEvent); + EventMessage result = eventSerializer.deserialize(bytes); + assertNotNull(result); + assertTrue(result instanceof AlterSyncStateSetEvent); + } + + @Test + public void testDeserializeSerializerThrowsException() throws SerializationException { + byte[] data = new byte[]{0, 0xF}; + byte[] bytes = new byte[]{0, (byte) EventType.ALTER_SYNC_STATE_SET_EVENT.getId(), data[0], data[1]}; + when(serializer.deserialize(any(byte[].class), eq(AlterSyncStateSetEvent.class))).thenThrow(new SerializationException("Deserialization failed")); + assertThrows(SerializationException.class, () -> eventSerializer.deserialize(bytes)); + } + + @Test + public void testDeserializeValidEventTypeUnknownEventReturnsNull() throws SerializationException { + byte[] data = new byte[]{0, 0xF}; + byte[] bytes = new byte[]{0, (short) 99, data[0], data[1]}; + assertNull(eventSerializer.deserialize(bytes)); + } +} diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/event/ListEventSerializerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/event/ListEventSerializerTest.java new file mode 100644 index 00000000000..7bcf85aeee5 --- /dev/null +++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/event/ListEventSerializerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.controller.impl.event; + +import org.apache.commons.lang3.SerializationException; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ListEventSerializerTest { + + @Mock + private Logger logger; + + @Test + public void testSerializeEmptyList() { + List events = Collections.emptyList(); + byte[] result = ListEventSerializer.serialize(events, null); + assertNotNull(result); + assertEquals(0, result.length); + } + + @Test + public void testSerializeValidEventMessage() { + EventMessage eventMessage = new ElectMasterEvent("brokerA", 0L); + List events = Collections.singletonList(eventMessage); + byte[] result = ListEventSerializer.serialize(events, null); + assertNotNull(result); + assertTrue(result.length > 0); + } + + @Test + public void testSerializeEventMessageWithNullEventType() { + EventMessage eventMessage = mock(EventMessage.class); + when(eventMessage.getEventType()).thenReturn(null); + List events = Collections.singletonList(eventMessage); + assertThrows(NullPointerException.class, () -> ListEventSerializer.serialize(events, logger)); + } + + @Test + public void testDeserializeBytesIsNull() throws SerializationException { + List result = ListEventSerializer.deserialize(null, logger); + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testDeserializeBytesLengthLessThanSix() throws SerializationException { + byte[] bytes = new byte[5]; + List result = ListEventSerializer.deserialize(bytes, logger); + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testDeserializeValidBytesWithKnownEventType() throws SerializationException { + byte[] bytes = new byte[]{0x01, 0x00, 0x06, 0x00, 0x00, 0x00}; + assertNotNull(ListEventSerializer.deserialize(bytes, logger)); + } + + @Test + public void testDeserializeException() throws SerializationException { + byte[] bytes = new byte[]{0x01, 0x00, 0x06, 0x00, 0x00, 0x00, 0x02, 0x00, 0x06, 0x00, 0x00, 0x00}; + assertThrows(ArrayIndexOutOfBoundsException.class, () -> ListEventSerializer.deserialize(bytes, logger)); + } +} diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java index 1ef6beadd3d..10ff2295751 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.namesrv.processor; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSONWriter; import io.netty.channel.ChannelHandlerContext; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -81,9 +81,8 @@ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, byte[] content; Boolean standardJsonOnly = Optional.ofNullable(requestHeader.getAcceptStandardJsonOnly()).orElse(false); if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || standardJsonOnly) { - content = topicRouteData.encode(SerializerFeature.BrowserCompatible, - SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField, - SerializerFeature.MapSortField); + content = topicRouteData.encode(JSONWriter.Feature.BrowserCompatible, + JSONWriter.Feature.MapSortField); } else { content = topicRouteData.encode(); } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessorTest.java new file mode 100644 index 00000000000..86af432f283 --- /dev/null +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.namesrv.processor; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ClientRequestProcessorTest { + + @Mock + private NamesrvController namesrvController; + + @Mock + private RouteInfoManager routeInfoManager; + + @Mock + private NamesrvConfig namesrvConfig; + + @Mock + private ChannelHandlerContext ctx; + + private ClientRequestProcessor clientRequestProcessor; + + @Before + public void setup() throws NoSuchFieldException, IllegalAccessException { + when(namesrvController.getRouteInfoManager()).thenReturn(routeInfoManager); + when(namesrvController.getNamesrvConfig()).thenReturn(namesrvConfig); + + when(namesrvConfig.getWaitSecondsForService()).thenReturn(0); + when(namesrvConfig.isNeedWaitForService()).thenReturn(true); + + clientRequestProcessor = new ClientRequestProcessor(namesrvController); + + Field startupTimeMillisField = ClientRequestProcessor.class.getDeclaredField("startupTimeMillis"); + startupTimeMillisField.setAccessible(true); + startupTimeMillisField.set(clientRequestProcessor, System.currentTimeMillis() - 60000); + } + + @Test + public void testGetRouteInfoByTopicWithHighVersionClient() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + request.setVersion(MQVersion.Version.V4_9_4.ordinal()); + + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic("TestTopic"); + + RemotingCommand spyRequest = spy(request); + doReturn(requestHeader).when(spyRequest).decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + TopicRouteData topicRouteData = createMockTopicRouteData(); + + when(routeInfoManager.pickupTopicRouteData("TestTopic")).thenReturn(topicRouteData); + + RemotingCommand response = clientRequestProcessor.getRouteInfoByTopic(ctx, spyRequest); + + assertEquals(ResponseCode.SUCCESS, response.getCode()); + assertNotNull(response.getBody()); + } + + @Test + public void testGetRouteInfoByTopicWithLowVersionClientAndNoStandardJsonFlag() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + request.setVersion(MQVersion.Version.V4_9_3.ordinal()); + + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic("TestTopic"); + requestHeader.setAcceptStandardJsonOnly(false); + + RemotingCommand spyRequest = spy(request); + doReturn(requestHeader).when(spyRequest).decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + TopicRouteData topicRouteData = createMockTopicRouteData(); + + when(routeInfoManager.pickupTopicRouteData("TestTopic")).thenReturn(topicRouteData); + + RemotingCommand response = clientRequestProcessor.getRouteInfoByTopic(ctx, spyRequest); + + assertEquals(ResponseCode.SUCCESS, response.getCode()); + assertNotNull(response.getBody()); + } + + @Test + public void testGetRouteInfoByTopicWithNameServerNotReady() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic("TestTopic"); + + RemotingCommand spyRequest = spy(request); + doReturn(requestHeader).when(spyRequest).decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + when(namesrvConfig.getWaitSecondsForService()).thenReturn(60); + when(namesrvConfig.isNeedWaitForService()).thenReturn(true); + + try { + Field startupTimeMillisField = ClientRequestProcessor.class.getDeclaredField("startupTimeMillis"); + startupTimeMillisField.setAccessible(true); + startupTimeMillisField.set(clientRequestProcessor, System.currentTimeMillis()); + } catch (Exception e) { + e.printStackTrace(); + } + + RemotingCommand response = clientRequestProcessor.getRouteInfoByTopic(ctx, spyRequest); + + assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode()); + assertEquals("name server not ready", response.getRemark()); + } + + @Test + public void testGetRouteInfoByTopicWithTopicNotExist() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + + GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); + requestHeader.setTopic("NonExistentTopic"); + + RemotingCommand spyRequest = spy(request); + doReturn(requestHeader).when(spyRequest).decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + + when(routeInfoManager.pickupTopicRouteData("NonExistentTopic")).thenReturn(null); + + RemotingCommand response = clientRequestProcessor.getRouteInfoByTopic(ctx, spyRequest); + + assertEquals(ResponseCode.TOPIC_NOT_EXIST, response.getCode()); + assertNotNull(response.getRemark()); + } + + private TopicRouteData createMockTopicRouteData() { + TopicRouteData result = new TopicRouteData(); + + List queueDataList = new ArrayList<>(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker-a"); + queueData.setReadQueueNums(4); + queueData.setWriteQueueNums(4); + queueData.setPerm(6); + queueData.setTopicSysFlag(0); + queueDataList.add(queueData); + result.setQueueDatas(queueDataList); + + List brokerDataList = new ArrayList<>(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-a"); + brokerData.setCluster("default-cluster"); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + result.setBrokerDatas(brokerDataList); + + return result; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java index 56ec34fae6a..be4e4ba7cd9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.proxy.remoting.activity; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSONWriter; import com.google.common.net.HostAndPort; import io.netty.channel.ChannelHandlerContext; import java.util.ArrayList; @@ -57,9 +57,7 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom byte[] content; Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly(); if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) { - content = topicRouteData.encode(SerializerFeature.BrowserCompatible, - SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField, - SerializerFeature.MapSortField); + content = topicRouteData.encode(JSONWriter.Feature.BrowserCompatible, JSONWriter.Feature.MapSortField); } else { content = topicRouteData.encode(); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivityTest.java new file mode 100644 index 00000000000..be30299ffea --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivityTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class GetTopicRouteActivityTest { + + @Mock + private RequestPipeline requestPipeline; + + @Mock + private MessagingProcessor messagingProcessor; + + private GetTopicRouteActivity getTopicRouteActivity; + + private ChannelHandlerContext ctx; + + private ProxyContext context; + + @Before + public void setup() throws Exception { + getTopicRouteActivity = new GetTopicRouteActivity(requestPipeline, messagingProcessor); + + ConfigurationManager.initEnv(); + ConfigurationManager.intConfig(); + + Channel channel = new SimpleChannel(null, "0.0.0.0:0", "1.1.1.1:1"); + ctx = new SimpleChannelHandlerContext(channel); + + context = ProxyContext.create(); + } + + @Test + public void testProcessRequest0_HighVersion_SerializeWithFeatures() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + request.setVersion(MQVersion.Version.V4_9_4.ordinal()); + + GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader(); + header.setTopic("TestTopic"); + header.setAcceptStandardJsonOnly(false); + request.writeCustomHeader(header); + + TopicRouteData topicRouteData = prepareTopicRouteData(); + + TopicRouteData spyTopicRouteData = Mockito.spy(topicRouteData); + + ProxyTopicRouteData proxyTopicRouteData = mock(ProxyTopicRouteData.class); + when(proxyTopicRouteData.buildTopicRouteData()).thenReturn(spyTopicRouteData); + when(messagingProcessor.getTopicRouteDataForProxy(any(ProxyContext.class), anyList(), any())) + .thenReturn(proxyTopicRouteData); + + RemotingCommand response = getTopicRouteActivity.processRequest0(ctx, request, context); + + assertNotNull(response); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + + verify(spyTopicRouteData).encode( + JSONWriter.Feature.BrowserCompatible, + JSONWriter.Feature.MapSortField + ); + + TopicRouteData deserializedData = JSON.parseObject(response.getBody(), TopicRouteData.class); + assertEquals(topicRouteData.getOrderTopicConf(), deserializedData.getOrderTopicConf()); + assertEquals(topicRouteData.getQueueDatas().size(), deserializedData.getQueueDatas().size()); + } + + @Test + public void testProcessRequest0_LowVersion_StandardJsonOnly_SerializeWithFeatures() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, null); + request.setVersion(MQVersion.Version.V4_9_3.ordinal()); + + GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader(); + header.setTopic("TestTopic"); + header.setAcceptStandardJsonOnly(true); + request.writeCustomHeader(header); + + TopicRouteData topicRouteData = prepareTopicRouteData(); + + TopicRouteData spyTopicRouteData = Mockito.spy(topicRouteData); + + ProxyTopicRouteData proxyTopicRouteData = mock(ProxyTopicRouteData.class); + when(proxyTopicRouteData.buildTopicRouteData()).thenReturn(spyTopicRouteData); + when(messagingProcessor.getTopicRouteDataForProxy(any(ProxyContext.class), anyList(), any())) + .thenReturn(proxyTopicRouteData); + + RemotingCommand response = getTopicRouteActivity.processRequest0(ctx, request, context); + + assertNotNull(response); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + + verify(spyTopicRouteData).encode(); + } + + private TopicRouteData prepareTopicRouteData() { + TopicRouteData result = new TopicRouteData(); + result.setOrderTopicConf("orderTopicConf"); + + List queueDatas = new ArrayList<>(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker-a"); + queueData.setPerm(6); + queueData.setReadQueueNums(4); + queueData.setWriteQueueNums(4); + queueData.setTopicSysFlag(0); + queueDatas.add(queueData); + result.setQueueDatas(queueDatas); + + List brokerDatas = new ArrayList<>(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-a"); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDatas.add(brokerData); + result.setBrokerDatas(brokerDatas); + return result; + } +} diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel index 9f806be7635..4f61295bfa9 100644 --- a/remoting/BUILD.bazel +++ b/remoting/BUILD.bazel @@ -23,6 +23,7 @@ java_library( deps = [ "//common", "@maven//:com_alibaba_fastjson", + "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_google_guava_guava", "@maven//:com_google_code_findbugs_jsr305", "@maven//:com_squareup_okio_okio_jvm", @@ -52,6 +53,7 @@ java_library( "//common", "//:test_deps", "@maven//:com_alibaba_fastjson", + "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_google_code_gson_gson", "@maven//:com_google_guava_guava", "@maven//:com_google_code_findbugs_jsr305", diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java index 139a7043d84..c4e4da14684 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java @@ -16,8 +16,9 @@ */ package org.apache.rocketmq.remoting.protocol; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; + import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; @@ -34,7 +35,10 @@ public static byte[] encode(final Object obj) { } public static String toJson(final Object obj, boolean prettyFormat) { - return JSON.toJSONString(obj, prettyFormat); + if (prettyFormat) { + return JSON.toJSONString(obj, JSONWriter.Feature.PrettyFormat); + } + return JSON.toJSONString(obj); } public static T decode(final byte[] data, Class classOfT) { @@ -74,7 +78,7 @@ public byte[] encode() { * @param features Features to apply * @return serialized data. */ - public byte[] encode(SerializerFeature...features) { + public byte[] encode(JSONWriter.Feature... features) { final String json = JSON.toJSONString(this, features); return json.getBytes(CHARSET_UTF8); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java index 6bd80217da0..e2655fd35da 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.remoting.protocol; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSONWriter; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.TypeAdapter; @@ -105,8 +105,7 @@ public Map getMap() { } Foo foo = new Foo(); String invalid = new String(foo.encode(), Charset.defaultCharset()); - String valid = new String(foo.encode(SerializerFeature.BrowserCompatible, SerializerFeature.QuoteFieldNames, - SerializerFeature.MapSortField), Charset.defaultCharset()); + String valid = new String(foo.encode(JSONWriter.Feature.BrowserCompatible, JSONWriter.Feature.MapSortField), Charset.defaultCharset()); Gson gson = new Gson(); final TypeAdapter strictAdapter = gson.getAdapter(JsonElement.class); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingTest.java index 6b8a1392f5b..06d20c00b5d 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingTest.java @@ -17,15 +17,16 @@ package org.apache.rocketmq.remoting.protocol.statictopic; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.google.common.collect.ImmutableList; -import java.util.Map; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class TopicQueueMappingTest { @Test @@ -59,7 +60,7 @@ public void testJsonSerialize() { Assert.assertTrue(mappingDetailMap.containsKey("currIdMap")); Assert.assertEquals(8, mappingDetailMap.size()); Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size()); - Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size()); + Assert.assertEquals(1, ((JSONArray) ((JSONObject) mappingDetailMap.get("hostedQueues")).get(0)).size()); } { TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); @@ -69,9 +70,4 @@ public void testJsonSerialize() { Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); } } - - @Test - public void test() { - - } } diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java index f5967f5a303..a2ad8c5d850 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.tools.command.connection; -import java.util.HashSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; @@ -30,7 +29,7 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; +import java.util.HashSet; public class ConsumerConnectionSubCommandTest { private ServerResponseMocker brokerMocker; @@ -62,7 +61,7 @@ public void testExecute() throws SubCommandException { private ServerResponseMocker startOneBroker() { ConsumerConnection consumerConnection = new ConsumerConnection(); HashSet connectionSet = new HashSet<>(); - Connection connection = mock(Connection.class); + Connection connection = new Connection(); connectionSet.add(connection); consumerConnection.setConnectionSet(connectionSet); // start broker diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java index 672e4113d6c..812edde9e04 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.tools.command.connection; -import java.util.HashSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; @@ -30,7 +29,7 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; +import java.util.HashSet; public class ProducerConnectionSubCommandTest { @@ -64,7 +63,7 @@ public void testExecute() throws SubCommandException { private ServerResponseMocker startOneBroker() { ProducerConnection producerConnection = new ProducerConnection(); HashSet connectionSet = new HashSet<>(); - Connection connection = mock(Connection.class); + Connection connection = new Connection(); connectionSet.add(connection); producerConnection.setConnectionSet(connectionSet); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java index 4651113c689..9f4da15fa3a 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.tools.command.consumer; -import java.util.HashSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; @@ -30,7 +29,7 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; +import java.util.HashSet; public class ConsumerStatusSubCommandTest { @@ -65,7 +64,7 @@ public void testExecute() throws SubCommandException { private ServerResponseMocker startOneBroker() { ConsumerConnection consumerConnection = new ConsumerConnection(); HashSet connectionSet = new HashSet<>(); - Connection connection = mock(Connection.class); + Connection connection = new Connection(); connectionSet.add(connection); consumerConnection.setConnectionSet(connectionSet); // start broker diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java index e4e5e970c6e..7ff4c42851a 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java @@ -16,9 +16,6 @@ */ package org.apache.rocketmq.tools.command.consumer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; @@ -33,7 +30,9 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; public class GetConsumerConfigSubCommandTest { @@ -91,7 +90,7 @@ private ServerResponseMocker startNameServer() { private ServerResponseMocker startOneBroker() { ConsumerConnection consumerConnection = new ConsumerConnection(); HashSet connectionSet = new HashSet<>(); - Connection connection = mock(Connection.class); + Connection connection = new Connection(); connectionSet.add(connection); consumerConnection.setConnectionSet(connectionSet); // start broker diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java index 7039c05fbad..8f33212e5da 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java @@ -17,13 +17,12 @@ package org.apache.rocketmq.tools.command.producer; -import java.util.HashMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; -import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.body.ProducerInfo; +import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.server.NameServerMocker; @@ -32,6 +31,10 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + public class ProducerSubCommandTest { private ServerResponseMocker brokerMocker; @@ -61,21 +64,17 @@ public void testExecute() throws SubCommandException { } private ServerResponseMocker startOneBroker() { - ConsumeStats consumeStats = new ConsumeStats(); - HashMap offsetTable = new HashMap<>(); - MessageQueue messageQueue = new MessageQueue(); - messageQueue.setBrokerName("mockBrokerName"); - messageQueue.setQueueId(1); - messageQueue.setBrokerName("mockTopicName"); - - OffsetWrapper offsetWrapper = new OffsetWrapper(); - offsetWrapper.setBrokerOffset(1); - offsetWrapper.setConsumerOffset(1); - offsetWrapper.setLastTimestamp(System.currentTimeMillis()); + ProducerTableInfo producerTableInfo = new ProducerTableInfo(new HashMap<>()); + List producerInfo = new ArrayList<>(); + producerInfo.add(new ProducerInfo( + "xxxx-client-id", + "127.0.0.1:18978", + LanguageCode.JAVA, + 400, + System.currentTimeMillis() - offsetTable.put(messageQueue, offsetWrapper); - consumeStats.setOffsetTable(offsetTable); - // start broker - return ServerResponseMocker.startServer(consumeStats.encode()); + )); + producerTableInfo.getData().put("mockTopicName", producerInfo); + return ServerResponseMocker.startServer(producerTableInfo.encode()); } }