Skip to content

Commit 365b297

Browse files
authored
[ISSUE #7837] Fix start() and shutdown() of DefaultMessagingProcessor (#7838)
* Fix start and shutdown process of DefaultMessagingProcessor * minimal changes
1 parent 74ab3ae commit 365b297

3 files changed

Lines changed: 85 additions & 0 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public static DefaultMessagingProcessor createForClusterMode(RPCHook rpcHook) {
138138

139139
protected void init() {
140140
this.appendStartAndShutdown(this.serviceManager);
141+
this.appendStartAndShutdown(this.receiptHandleProcessor);
141142
this.appendShutdown(this.producerProcessorExecutor::shutdown);
142143
this.appendShutdown(this.consumerProcessorExecutor::shutdown);
143144
}

proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceMana
5151
});
5252
};
5353
this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
54+
this.appendStartAndShutdown(receiptHandleManager);
5455
}
5556

5657
protected ProxyContext createContext(String actionName) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.proxy.processor;
18+
19+
import io.netty.channel.local.LocalChannel;
20+
import org.apache.rocketmq.broker.client.ClientChannelInfo;
21+
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
22+
import org.apache.rocketmq.common.consumer.ReceiptHandle;
23+
import org.apache.rocketmq.common.message.MessageClientIDSetter;
24+
import org.apache.rocketmq.proxy.common.ContextVariable;
25+
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
26+
import org.apache.rocketmq.proxy.common.ProxyContext;
27+
import org.apache.rocketmq.proxy.config.ConfigurationManager;
28+
import org.apache.rocketmq.proxy.config.ProxyConfig;
29+
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.mockito.Mockito;
33+
34+
public class ReceiptHandleProcessorTest extends BaseProcessorTest {
35+
private static final ProxyContext PROXY_CONTEXT = ProxyContext.create();
36+
private static final String CONSUMER_GROUP = "consumerGroup";
37+
private static final String TOPIC = "topic";
38+
private static final String BROKER_NAME = "broker";
39+
private static final int QUEUE_ID = 1;
40+
private static final String MESSAGE_ID = "messageId";
41+
private static final long OFFSET = 123L;
42+
private static final long INVISIBLE_TIME = 60000L;
43+
private static final int RECONSUME_TIMES = 1;
44+
private static final String MSG_ID = MessageClientIDSetter.createUniqID();
45+
private MessageReceiptHandle messageReceiptHandle;
46+
47+
private ReceiptHandleProcessor receiptHandleProcessor;
48+
49+
@Before
50+
public void before() throws Throwable {
51+
super.before();
52+
this.receiptHandleProcessor = new ReceiptHandleProcessor(this.messagingProcessor, this.serviceManager);
53+
ProxyConfig config = ConfigurationManager.getProxyConfig();
54+
String receiptHandle = ReceiptHandle.builder()
55+
.startOffset(0L)
56+
.retrieveTime(System.currentTimeMillis() - INVISIBLE_TIME + config.getRenewAheadTimeMillis() - 5)
57+
.invisibleTime(INVISIBLE_TIME)
58+
.reviveQueueId(1)
59+
.topicType(ReceiptHandle.NORMAL_TOPIC)
60+
.brokerName(BROKER_NAME)
61+
.queueId(QUEUE_ID)
62+
.offset(OFFSET)
63+
.commitLogOffset(0L)
64+
.build().encode();
65+
PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
66+
PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel());
67+
Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class));
68+
messageReceiptHandle = new MessageReceiptHandle(CONSUMER_GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
69+
RECONSUME_TIMES);
70+
}
71+
72+
@Test
73+
public void testStart() throws Exception {
74+
receiptHandleProcessor.start();
75+
receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, PROXY_CONTEXT.getChannel(), CONSUMER_GROUP, MSG_ID, messageReceiptHandle);
76+
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(CONSUMER_GROUP))).thenReturn(new SubscriptionGroupConfig());
77+
Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))).thenReturn(Mockito.mock(ClientChannelInfo.class));
78+
Mockito.verify(messagingProcessor, Mockito.timeout(10000).times(1))
79+
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
80+
Mockito.eq(CONSUMER_GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
81+
}
82+
83+
}

0 commit comments

Comments
 (0)