From 5c8cb0a847da3da67cf89dbe93901ddd29449a40 Mon Sep 17 00:00:00 2001 From: yx9o Date: Tue, 10 Mar 2026 00:03:59 +0800 Subject: [PATCH 1/4] [ISSUE #10159] Fix unstable tests in DefaultMQLitePullConsumerWithTraceTest --- .../trace/DefaultMQLitePullConsumerWithTraceTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index e0573bdfb0b..942659ee9a5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -111,6 +111,12 @@ public static void setUpEnv() { @Before public void init() throws Exception { + ConcurrentMap factoryTable = + (ConcurrentMap) FieldUtils.readDeclaredField( + MQClientManager.getInstance(), "factoryTable", true); + factoryTable.forEach((clientId, instance) -> instance.shutdown()); + factoryTable.clear(); + Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); field.setAccessible(true); RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); From 6c7e98037831deecf9152721975222c973f26a11 Mon Sep 17 00:00:00 2001 From: yx9o Date: Wed, 11 Mar 2026 23:22:02 +0800 Subject: [PATCH 2/4] Update --- ...efaultMQLitePullConsumerWithTraceTest.java | 96 +++++++++++++++---- 1 file changed, 77 insertions(+), 19 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index 942659ee9a5..d7204b4feea 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -60,13 +59,13 @@ import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -78,14 +77,16 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQLitePullConsumerWithTraceTest { - @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory; + private MQClientInstance mqClientInstance; + private MQClientInstance traceMqClientInstance; @Mock private MQClientAPIImpl mQClientAPIImpl; @@ -116,13 +117,40 @@ public void init() throws Exception { MQClientManager.getInstance(), "factoryTable", true); factoryTable.forEach((clientId, instance) -> instance.shutdown()); factoryTable.clear(); + mQClientFactory = null; + mqClientInstance = null; + traceMqClientInstance = null; + asyncTraceDispatcher = null; + traceProducer = null; + rebalanceImpl = null; + offsetStore = null; + litePullConsumerImpl = null; + } - Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); - field.setAccessible(true); - RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); - field = RebalanceService.class.getDeclaredField("waitInterval"); - field.setAccessible(true); - field.set(rebalanceService, 100); + @After + public void destroy() { + if (traceProducer != null) { + MQClientInstance traceClientFactory = traceProducer.getDefaultMQProducerImpl().getMqClientFactory(); + traceClientFactory.unregisterProducer(producerGroupTraceTemp); + traceClientFactory.unregisterProducer(traceProducer.getProducerGroup()); + } + + if (traceMqClientInstance != null && traceProducer != null) { + traceMqClientInstance.unregisterProducer(traceProducer.getProducerGroup()); + traceMqClientInstance.shutdown(); + } + + if (litePullConsumerImpl != null) { + if (mQClientFactory != null) { + mQClientFactory.unregisterConsumer(litePullConsumerImpl.groupName()); + mQClientFactory.shutdown(); + } + + if (mqClientInstance != null && mqClientInstance != mQClientFactory) { + mqClientInstance.unregisterConsumer(litePullConsumerImpl.groupName()); + mqClientInstance.shutdown(); + } + } } @Test @@ -132,8 +160,8 @@ public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exce Set messageQueueSet = new HashSet<>(); messageQueueSet.add(createMessageQueue()); litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); - litePullConsumer.setPollTimeoutMillis(20 * 1000); - List result = litePullConsumer.poll(); + List result = pollUntilFound(litePullConsumer); + assertThat(result).isNotEmpty(); assertThat(result.get(0).getTopic()).isEqualTo(topic); assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); } finally { @@ -148,8 +176,8 @@ public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws E Set messageQueueSet = new HashSet<>(); messageQueueSet.add(createMessageQueue()); litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); - litePullConsumer.setPollTimeoutMillis(20 * 1000); - List result = litePullConsumer.poll(); + List result = pollUntilFound(litePullConsumer); + assertThat(result).isNotEmpty(); assertThat(result.get(0).getTopic()).isEqualTo(topic); assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); } finally { @@ -160,11 +188,15 @@ public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws E @Test public void testLitePullConsumerWithTraceTLS() throws Exception { DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup"); - consumer.setUseTLS(true); - consumer.setEnableMsgTrace(true); - consumer.start(); - AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); - Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + try { + consumer.setUseTLS(true); + consumer.setEnableMsgTrace(true); + consumer.start(); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } finally { + consumer.shutdown(); + } } private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { @@ -198,8 +230,18 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); + mqClientInstance = (MQClientInstance) field.get(litePullConsumerImpl); + mQClientFactory = spy(mqClientInstance); + mQClientFactory.getClientConfig().setDecodeReadBody(true); field.set(litePullConsumerImpl, mQClientFactory); + field = MQClientInstance.class.getDeclaredField("rebalanceService"); + field.setAccessible(true); + RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); + field = RebalanceService.class.getDeclaredField("waitInterval"); + field.setAccessible(true); + field.set(rebalanceService, 100); + PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper(); field = PullAPIWrapper.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); @@ -207,6 +249,7 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); fieldTrace.setAccessible(true); + traceMqClientInstance = traceProducer.getDefaultMQProducerImpl().getMqClientFactory(); fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); @@ -231,6 +274,8 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); + lenient().when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) .thenAnswer(new Answer() { @@ -258,6 +303,19 @@ public Object answer(InvocationOnMock mock) throws Throwable { } + private List pollUntilFound(DefaultLitePullConsumer litePullConsumer) throws Exception { + litePullConsumer.setPollTimeoutMillis(1000); + long deadline = System.currentTimeMillis() + 20 * 1000; + List result = Collections.emptyList(); + while (System.currentTimeMillis() < deadline) { + result = litePullConsumer.poll(); + if (!result.isEmpty()) { + return result; + } + } + return result; + } + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List messageExtList) throws Exception { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); From c5a53df802e136d26048e61c80124163caa45ca3 Mon Sep 17 00:00:00 2001 From: yx9o Date: Thu, 12 Mar 2026 12:44:21 +0800 Subject: [PATCH 3/4] Update --- .../client/trace/DefaultMQLitePullConsumerWithTraceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index d7204b4feea..c4065cf8527 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -303,7 +303,7 @@ public Object answer(InvocationOnMock mock) throws Throwable { } - private List pollUntilFound(DefaultLitePullConsumer litePullConsumer) throws Exception { + private List pollUntilFound(DefaultLitePullConsumer litePullConsumer) { litePullConsumer.setPollTimeoutMillis(1000); long deadline = System.currentTimeMillis() + 20 * 1000; List result = Collections.emptyList(); From 4d4a467200d30514201553349d984f766a5bb2bc Mon Sep 17 00:00:00 2001 From: yx9o Date: Thu, 12 Mar 2026 22:48:13 +0800 Subject: [PATCH 4/4] Update --- .../test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index 1b1abd0a101..90b02f6a56e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -636,6 +636,7 @@ public void testConsumeOrderly() throws Exception { } public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception { + brokerController1.getBrokerConfig().setPriorityOrderAsc(true); String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.PRIORITY); String group = MQRandomUtils.getRandomConsumerGroup(); initConsumerGroup(group);