Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
package org.apache.rocketmq.broker.subscription;

import com.google.common.collect.ImmutableMap;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
Expand All @@ -38,11 +32,17 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.nio.file.Paths;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


Expand Down Expand Up @@ -100,6 +100,7 @@ public void updateSubscriptionGroupConfig() {
subscriptionGroupConfig.setGroupName(group);
Map<String, String> attr = ImmutableMap.of("+test", "true");
subscriptionGroupConfig.setAttributes(attr);
SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerControllerMock);
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
SubscriptionGroupConfig result = subscriptionGroupManager.getSubscriptionGroupTable().get(group);
assertThat(result).isNotNull();
Expand Down Expand Up @@ -156,14 +157,10 @@ public void testUpdateSubscriptionGroupConfigList_ValidConfigList() {
groupNames.add(groupName);
}

SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerControllerMock);
subscriptionGroupManager.updateSubscriptionGroupConfigList(configList);

// Verifying that persist() is called once
verify(subscriptionGroupManager, times(1)).persist();

groupNames.forEach(groupName ->
assertThat(subscriptionGroupManager.getSubscriptionGroupTable().get(groupName)).isNotNull());

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@

package org.apache.rocketmq.client.consumer;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
Expand Down Expand Up @@ -65,6 +54,18 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.awaitility.Awaitility.await;
Expand All @@ -75,6 +76,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -898,5 +900,6 @@ private void suppressUpdateTopicRouteInfoFromNameServer(
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
doNothing().when(mQClientFactory).updateTopicRouteInfoFromNameServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ public void assertUpdateOrCreateTopicWithError() {
@Test
public void assertUpdateOrCreateSubscriptionGroupWithSuccess() throws Exception {
setResponseSuccess(null);
SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
CompletableFuture<Void> actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout);
assertNull(actual.get());
}

@Test
public void assertUpdateOrCreateSubscriptionGroupWithError() {
setResponseError();
SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
CompletableFuture<Void> actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout);
Throwable thrown = assertThrows(ExecutionException.class, actual::get);
assertTrue(thrown.getCause() instanceof MQClientException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
Expand All @@ -34,6 +29,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -164,6 +165,7 @@ private ProcessQueue createProcessQueue() {
ProcessQueue result = new ProcessQueue();
result.setMsgAccCnt(1);
result.incTryUnlockTimes();
result.setLastPullTimestamp(10000L);
return result;
}

Expand Down
2 changes: 0 additions & 2 deletions common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ java_library(
srcs = glob(["src/main/java/**/*.java"]),
visibility = ["//visibility:public"],
deps = [
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:com_github_luben_zstd_jni",
"@maven//:com_google_guava_guava",
Expand Down Expand Up @@ -55,7 +54,6 @@ java_library(
":common",
"//:test_deps",
"@maven//:com_google_guava_guava",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:commons_codec_commons_codec",
"@maven//:io_netty_netty_all",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.alibaba.fastjson2.annotation.JSONField;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,51 @@

package org.apache.rocketmq.common.fastjson;

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.parser.DefaultJSONParser;
import com.alibaba.fastjson.parser.JSONToken;
import com.alibaba.fastjson.parser.deserializer.MapDeserializer;
import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.reader.ObjectReader;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;

/**
* workaround https://github.com/alibaba/fastjson/issues/3730
*/
public class GenericMapSuperclassDeserializer implements ObjectDeserializer {
public class GenericMapSuperclassDeserializer implements ObjectReader<Object> {
public static final GenericMapSuperclassDeserializer INSTANCE = new GenericMapSuperclassDeserializer();

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) {
public Object readObject(JSONReader reader, Type type, Object fieldName, long features) {
Class<?> clz = (Class<?>) type;
Type genericSuperclass = clz.getGenericSuperclass();
Map map;
try {
map = (Map) clz.newInstance();
map = (Map) clz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new JSONException("unsupport type " + type, e);
throw new com.alibaba.fastjson2.JSONException("unsupport type " + type, e);
}
ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass;
Type keyType = parameterizedType.getActualTypeArguments()[0];
Type valueType = parameterizedType.getActualTypeArguments()[1];
if (String.class == keyType) {
return (T) MapDeserializer.parseMap(parser, (Map<String, Object>) map, valueType, fieldName);
} else {
return (T) MapDeserializer.parseMap(parser, map, keyType, valueType, fieldName);

if (!reader.nextIfObjectStart()) {
throw new JSONException(reader.info("expect '{', but " + reader.current()));
}
}

@Override
public int getFastMatchToken() {
return JSONToken.LBRACE;
while (!reader.nextIfObjectEnd()) {
Object key;
if (keyType == String.class) {
key = reader.readFieldName();
} else {
key = reader.getContext().getProvider().getObjectReader(keyType).readObject(reader, keyType, fieldName, features);
reader.nextIfMatch(':');
}

Object value = reader.getContext().getProvider().getObjectReader(valueType).readObject(reader, valueType, fieldName, features);
map.put(key, value);
reader.nextIfComma();
}
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,21 @@
*/
package org.apache.rocketmq.common.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.alibaba.fastjson2.JSON;
import org.apache.commons.lang3.SerializationException;

/**
* The object serializer based on fastJson
*/
public class FastJsonSerializer implements Serializer {
private FastJsonConfig fastJsonConfig = new FastJsonConfig();

public FastJsonConfig getFastJsonConfig() {
return this.fastJsonConfig;
}

public void setFastJsonConfig(FastJsonConfig fastJsonConfig) {
this.fastJsonConfig = fastJsonConfig;
}

@Override
public <T> byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
} else {
try {
return JSON.toJSONBytes(this.fastJsonConfig.getCharset(), t, this.fastJsonConfig.getSerializeConfig(), this.fastJsonConfig.getSerializeFilters(), this.fastJsonConfig.getDateFormat(), JSON.DEFAULT_GENERATE_FEATURE, this.fastJsonConfig.getSerializerFeatures());
return JSON.toJSONBytes(t);
} catch (Exception var3) {
throw new SerializationException("Could not serialize: " + var3.getMessage(), var3);
}
Expand All @@ -51,7 +41,7 @@ public <T> byte[] serialize(T t) throws SerializationException {
public <T> T deserialize(byte[] bytes, Class<T> type) throws SerializationException {
if (bytes != null && bytes.length != 0) {
try {
return JSON.parseObject(bytes, this.fastJsonConfig.getCharset(), type, this.fastJsonConfig.getParserConfig(), this.fastJsonConfig.getParseProcess(), JSON.DEFAULT_PARSER_FEATURE, this.fastJsonConfig.getFeatures());
return JSON.parseObject(bytes, type);
} catch (Exception var3) {
throw new SerializationException("Could not deserialize: " + var3.getMessage(), var3);
}
Expand Down
Loading
Loading