|
18 | 18 |
|
19 | 19 | import io.netty.channel.Channel; |
20 | 20 | import io.netty.channel.ChannelHandlerContext; |
21 | | -import java.lang.reflect.Field; |
22 | | -import java.util.concurrent.CompletableFuture; |
23 | | -import java.util.concurrent.ConcurrentHashMap; |
24 | 21 | import org.apache.rocketmq.broker.BrokerController; |
25 | 22 | import org.apache.rocketmq.broker.client.ClientChannelInfo; |
26 | 23 | import org.apache.rocketmq.broker.client.net.Broker2Client; |
27 | 24 | import org.apache.rocketmq.broker.failover.EscapeBridge; |
28 | 25 | import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; |
29 | | -import org.apache.rocketmq.store.stats.BrokerStatsManager; |
| 26 | +import org.apache.rocketmq.broker.metrics.PopMetricsManager; |
| 27 | +import org.apache.rocketmq.broker.topic.TopicConfigManager; |
30 | 28 | import org.apache.rocketmq.common.BrokerConfig; |
31 | 29 | import org.apache.rocketmq.common.TopicConfig; |
32 | | -import org.apache.rocketmq.broker.topic.TopicConfigManager; |
33 | 30 | import org.apache.rocketmq.common.message.MessageConst; |
34 | 31 | import org.apache.rocketmq.common.message.MessageExtBrokerInner; |
35 | 32 | import org.apache.rocketmq.remoting.exception.RemotingCommandException; |
|
41 | 38 | import org.apache.rocketmq.remoting.protocol.ResponseCode; |
42 | 39 | import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; |
43 | 40 | import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; |
| 41 | +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; |
44 | 42 | import org.apache.rocketmq.store.AppendMessageResult; |
45 | 43 | import org.apache.rocketmq.store.AppendMessageStatus; |
46 | 44 | import org.apache.rocketmq.store.DefaultMessageStore; |
| 45 | +import org.apache.rocketmq.store.MessageStore; |
47 | 46 | import org.apache.rocketmq.store.PutMessageResult; |
48 | 47 | import org.apache.rocketmq.store.PutMessageStatus; |
49 | 48 | import org.apache.rocketmq.store.config.MessageStoreConfig; |
50 | 49 | import org.apache.rocketmq.store.exception.ConsumeQueueException; |
| 50 | +import org.apache.rocketmq.store.stats.BrokerStatsManager; |
51 | 51 | import org.junit.Before; |
52 | 52 | import org.junit.Test; |
53 | 53 | import org.junit.runner.RunWith; |
54 | 54 | import org.mockito.Mock; |
55 | 55 | import org.mockito.Spy; |
56 | 56 | import org.mockito.junit.MockitoJUnitRunner; |
57 | 57 |
|
| 58 | +import java.lang.reflect.Field; |
| 59 | +import java.util.concurrent.CompletableFuture; |
| 60 | +import java.util.concurrent.ConcurrentHashMap; |
| 61 | + |
| 62 | +import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; |
58 | 63 | import static org.assertj.core.api.Assertions.assertThat; |
| 64 | +import static org.junit.Assert.assertEquals; |
| 65 | +import static org.junit.Assert.assertNotNull; |
59 | 66 | import static org.mockito.ArgumentMatchers.any; |
60 | 67 | import static org.mockito.ArgumentMatchers.anyInt; |
61 | 68 | import static org.mockito.ArgumentMatchers.anyString; |
| 69 | +import static org.mockito.Mockito.doNothing; |
62 | 70 | import static org.mockito.Mockito.mock; |
63 | 71 | import static org.mockito.Mockito.when; |
64 | 72 |
|
@@ -118,6 +126,17 @@ public void init() throws IllegalAccessException, NoSuchFieldException { |
118 | 126 | PopBufferMergeService popBufferMergeService = mock(PopBufferMergeService.class); |
119 | 127 | when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); |
120 | 128 | when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService); |
| 129 | + |
| 130 | + ConsumerData consumerData = createConsumerData(group, topic); |
| 131 | + clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0); |
| 132 | + brokerController.getConsumerManager().registerConsumer( |
| 133 | + consumerData.getGroupName(), |
| 134 | + clientInfo, |
| 135 | + consumerData.getConsumeType(), |
| 136 | + consumerData.getMessageModel(), |
| 137 | + consumerData.getConsumeFromWhere(), |
| 138 | + consumerData.getSubscriptionDataSet(), |
| 139 | + false); |
121 | 140 |
|
122 | 141 | clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0); |
123 | 142 | changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(brokerController); |
@@ -177,4 +196,58 @@ public void testProcessRequest_NoMessage() throws RemotingCommandException, Cons |
177 | 196 | assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); |
178 | 197 | assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); |
179 | 198 | } |
| 199 | + |
| 200 | + @Test |
| 201 | + public void testProcessRequestAsync_JsonParsing() throws Exception { |
| 202 | + Channel mockChannel = mock(Channel.class); |
| 203 | + RemotingCommand mockRequest = mock(RemotingCommand.class); |
| 204 | + BrokerController mockBrokerController = mock(BrokerController.class); |
| 205 | + TopicConfigManager mockTopicConfigManager = mock(TopicConfigManager.class); |
| 206 | + MessageStore mockMessageStore = mock(MessageStore.class); |
| 207 | + BrokerConfig mockBrokerConfig = mock(BrokerConfig.class); |
| 208 | + BrokerStatsManager mockBrokerStatsManager = mock(BrokerStatsManager.class); |
| 209 | + PopMessageProcessor mockPopMessageProcessor = mock(PopMessageProcessor.class); |
| 210 | + PopBufferMergeService mockPopBufferMergeService = mock(PopBufferMergeService.class); |
| 211 | + BrokerMetricsManager brokerMetricsManager = mock(BrokerMetricsManager.class); |
| 212 | + PopMetricsManager popMetricsManager = mock(PopMetricsManager.class); |
| 213 | + |
| 214 | + when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager); |
| 215 | + when(mockBrokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager); |
| 216 | + doNothing().when(popMetricsManager).incPopReviveCkPutCount(any(), any()); |
| 217 | + when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager); |
| 218 | + when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager); |
| 219 | + when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore); |
| 220 | + when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig); |
| 221 | + when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager); |
| 222 | + when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor); |
| 223 | + when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService); |
| 224 | + when(mockPopBufferMergeService.addAk(anyInt(), any())).thenReturn(false); |
| 225 | + when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge); |
| 226 | + PutMessageResult mockPutMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, null, true); |
| 227 | + when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any())) |
| 228 | + .thenReturn(CompletableFuture.completedFuture(mockPutMessageResult)); |
| 229 | + |
| 230 | + TopicConfig topicConfig = new TopicConfig(); |
| 231 | + topicConfig.setReadQueueNums(4); |
| 232 | + when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig); |
| 233 | + when(mockMessageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(0L); |
| 234 | + when(mockMessageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(10L); |
| 235 | + when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false); |
| 236 | + |
| 237 | + ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); |
| 238 | + requestHeader.setTopic("TestTopic"); |
| 239 | + requestHeader.setQueueId(1); |
| 240 | + requestHeader.setOffset(5L); |
| 241 | + requestHeader.setConsumerGroup("TestGroup"); |
| 242 | + requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1"); |
| 243 | + requestHeader.setInvisibleTime(60000L); |
| 244 | + when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader); |
| 245 | + |
| 246 | + ChangeInvisibleTimeProcessor processor = new ChangeInvisibleTimeProcessor(mockBrokerController); |
| 247 | + CompletableFuture<RemotingCommand> futureResponse = processor.processRequestAsync(mockChannel, mockRequest, true); |
| 248 | + |
| 249 | + RemotingCommand response = futureResponse.get(); |
| 250 | + assertNotNull(response); |
| 251 | + assertEquals(ResponseCode.SUCCESS, response.getCode()); |
| 252 | + } |
180 | 253 | } |
0 commit comments