Skip to content

Commit 2379813

Browse files
authored
[ISSUE #9253] Make the message visible earlier when response process encounters errors (#9255)
1 parent 0e19071 commit 2379813

File tree

4 files changed

+130
-12
lines changed

4 files changed

+130
-12
lines changed

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import org.apache.rocketmq.proxy.config.ProxyConfig;
3737
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
3838
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
39+
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
3940
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
4041
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
42+
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
4143
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
4244
import org.apache.rocketmq.proxy.processor.QueueSelector;
4345
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
@@ -135,14 +137,22 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
135137
).thenAccept(popResult -> {
136138
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
137139
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
140+
GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID());
141+
if (clientChannel == null) {
142+
GrpcProxyException e = new GrpcProxyException(Code.MESSAGE_NOT_FOUND,
143+
String.format("The client [%s] is disconnected.", ctx.getClientID()));
144+
popResult.getMsgFoundList().forEach(messageExt ->
145+
writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt));
146+
throw e;
147+
}
138148
List<MessageExt> messageExtList = popResult.getMsgFoundList();
139149
for (MessageExt messageExt : messageExtList) {
140150
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
141151
if (receiptHandle != null) {
142152
MessageReceiptHandle messageReceiptHandle =
143153
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
144154
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
145-
messagingProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), messageReceiptHandle);
155+
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
146156
}
147157
}
148158
}

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,15 @@ public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, Po
6464
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MESSAGE_NOT_FOUND, "no match message"))
6565
.build());
6666
} else {
67-
streamObserver.onNext(ReceiveMessageResponse.newBuilder()
68-
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()))
69-
.build());
67+
try {
68+
streamObserver.onNext(ReceiveMessageResponse.newBuilder()
69+
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()))
70+
.build());
71+
} catch (Throwable t) {
72+
messageFoundList.forEach(messageExt ->
73+
this.processThrowableWhenWriteMessage(t, ctx, request, messageExt));
74+
throw t;
75+
}
7076
Iterator<MessageExt> messageIterator = messageFoundList.iterator();
7177
while (messageIterator.hasNext()) {
7278
MessageExt curMessageExt = messageIterator.next();

proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,21 @@
3030
import io.grpc.stub.ServerCallStreamObserver;
3131
import io.grpc.stub.StreamObserver;
3232
import java.util.ArrayList;
33+
import java.util.Arrays;
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.List;
3637
import java.util.concurrent.CompletableFuture;
38+
import java.util.stream.Collectors;
39+
import org.apache.rocketmq.client.consumer.AckResult;
3740
import org.apache.rocketmq.client.consumer.PopResult;
3841
import org.apache.rocketmq.client.consumer.PopStatus;
3942
import org.apache.rocketmq.common.MixAll;
4043
import org.apache.rocketmq.common.constant.PermName;
44+
import org.apache.rocketmq.common.consumer.ReceiptHandle;
45+
import org.apache.rocketmq.common.message.MessageAccessor;
46+
import org.apache.rocketmq.common.message.MessageConst;
47+
import org.apache.rocketmq.common.message.MessageExt;
4148
import org.apache.rocketmq.proxy.common.ProxyContext;
4249
import org.apache.rocketmq.proxy.config.ConfigurationManager;
4350
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
@@ -61,6 +68,8 @@
6168
import static org.mockito.Mockito.doNothing;
6269
import static org.mockito.Mockito.mock;
6370
import static org.mockito.Mockito.when;
71+
import static org.mockito.Mockito.times;
72+
import static org.mockito.Mockito.verify;
6473

6574
public class ReceiveMessageActivityTest extends BaseActivityTest {
6675

@@ -223,6 +232,87 @@ public void testReceiveMessageIllegalInvisibleTimeTooLarge() {
223232
assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues()));
224233
}
225234

235+
@Test
236+
public void testReceiveMessageAddReceiptHandle() {
237+
ConfigurationManager.getProxyConfig().setEnableProxyAutoRenew(true);
238+
StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
239+
doNothing().when(receiveStreamObserver).onNext(any());
240+
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
241+
242+
MessageExt messageExt1 = new MessageExt();
243+
String msgId1 = "msgId1";
244+
String popCk1 = "0 0 60000 0 0 broker 0 0 0";
245+
messageExt1.setTopic(TOPIC);
246+
messageExt1.setMsgId(msgId1);
247+
MessageAccessor.putProperty(messageExt1, MessageConst.PROPERTY_POP_CK, popCk1);
248+
messageExt1.setBody("body1".getBytes());
249+
MessageExt messageExt2 = new MessageExt();
250+
String msgId2 = "msgId2";
251+
String popCk2 = "0 0 60000 0 0 broker 0 1 1000";
252+
messageExt2.setTopic(TOPIC);
253+
messageExt2.setMsgId(msgId2);
254+
MessageAccessor.putProperty(messageExt2, MessageConst.PROPERTY_POP_CK, popCk2);
255+
messageExt2.setBody("body2".getBytes());
256+
PopResult popResult = new PopResult(PopStatus.FOUND, Arrays.asList(messageExt1, messageExt2));
257+
when(this.messagingProcessor.popMessage(
258+
any(),
259+
any(),
260+
anyString(),
261+
anyString(),
262+
anyInt(),
263+
anyLong(),
264+
anyLong(),
265+
anyInt(),
266+
any(),
267+
anyBoolean(),
268+
any(),
269+
isNull(),
270+
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
271+
ArgumentCaptor<String> msgIdCaptor = ArgumentCaptor.forClass(String.class);
272+
ArgumentCaptor<ReceiptHandle> receiptHandleCaptor = ArgumentCaptor.forClass(ReceiptHandle.class);
273+
when(this.messagingProcessor.changeInvisibleTime(
274+
any(),
275+
receiptHandleCaptor.capture(),
276+
msgIdCaptor.capture(),
277+
anyString(),
278+
anyString(),
279+
anyLong())).thenReturn(CompletableFuture.completedFuture(new AckResult()));
280+
281+
// normal
282+
ProxyContext ctx = createContext();
283+
this.grpcChannelManager.createChannel(ctx, ctx.getClientID());
284+
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.newBuilder()
285+
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
286+
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
287+
.setAutoRenew(true)
288+
.setFilterExpression(FilterExpression.newBuilder()
289+
.setType(FilterType.TAG)
290+
.setExpression("*")
291+
.build())
292+
.build();
293+
this.receiveMessageActivity.receiveMessage(ctx, receiveMessageRequest, receiveStreamObserver);
294+
verify(this.messagingProcessor, times(0)).changeInvisibleTime(
295+
any(),
296+
any(),
297+
anyString(),
298+
anyString(),
299+
anyString(),
300+
anyLong());
301+
302+
// abnormal
303+
this.grpcChannelManager.removeChannel(ctx.getClientID());
304+
this.receiveMessageActivity.receiveMessage(ctx, receiveMessageRequest, receiveStreamObserver);
305+
verify(this.messagingProcessor, times(2)).changeInvisibleTime(
306+
any(),
307+
any(),
308+
anyString(),
309+
anyString(),
310+
anyString(),
311+
anyLong());
312+
assertEquals(Arrays.asList(msgId1, msgId2), msgIdCaptor.getAllValues());
313+
assertEquals(Arrays.asList(popCk1, popCk2), receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList()));
314+
}
315+
226316
@Test
227317
public void testReceiveMessage() {
228318
StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);

proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriterTest.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static org.mockito.Mockito.doAnswer;
5454
import static org.mockito.Mockito.doNothing;
5555
import static org.mockito.Mockito.doReturn;
56+
import static org.mockito.Mockito.doThrow;
5657
import static org.mockito.Mockito.mock;
5758
import static org.mockito.Mockito.times;
5859
import static org.mockito.Mockito.verify;
@@ -90,16 +91,17 @@ public void testWriteMessage() {
9091
messageExtList.add(createMessageExt(TOPIC, "tag"));
9192
messageExtList.add(createMessageExt(TOPIC, "tag"));
9293
PopResult popResult = new PopResult(PopStatus.FOUND, messageExtList);
94+
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.newBuilder()
95+
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
96+
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
97+
.setFilterExpression(FilterExpression.newBuilder()
98+
.setType(FilterType.TAG)
99+
.setExpression("*")
100+
.build())
101+
.build();
93102
writer.writeAndComplete(
94103
ProxyContext.create(),
95-
ReceiveMessageRequest.newBuilder()
96-
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
97-
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
98-
.setFilterExpression(FilterExpression.newBuilder()
99-
.setType(FilterType.TAG)
100-
.setExpression("*")
101-
.build())
102-
.build(),
104+
receiveMessageRequest,
103105
popResult
104106
);
105107

@@ -114,6 +116,16 @@ public void testWriteMessage() {
114116
assertEquals(messageExtList.get(0).getMsgId(), responseArgumentCaptor.getAllValues().get(1).getMessage().getSystemProperties().getMessageId());
115117

116118
assertEquals(messageExtList.get(1).getMsgId(), changeInvisibleTimeMsgIdCaptor.getValue());
119+
120+
// case: fail to write response status at first step
121+
doThrow(new RuntimeException()).when(streamObserver).onNext(any());
122+
writer.writeAndComplete(
123+
ProxyContext.create(),
124+
receiveMessageRequest,
125+
popResult
126+
);
127+
verify(this.messagingProcessor, times(3))
128+
.changeInvisibleTime(any(), any(), anyString(), anyString(), anyString(), anyLong());
117129
}
118130

119131
@Test

0 commit comments

Comments
 (0)