Skip to content

Commit 4771b8b

Browse files
committed
[ISSUE #9187] The request should be rejected if the queueOffset equals maxOffset when changing the invisible time (#9186)
* When changing the invisible time, the request should be rejected if the queueOffset equals maxOffset * Add UTs * Make UTs to pass * Remove unused imports
1 parent 9abdc3a commit 4771b8b

2 files changed

Lines changed: 33 additions & 4 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public CompletableFuture<RemotingCommand> processRequestAsync(final Channel chan
130130
} catch (ConsumeQueueException e) {
131131
throw new RemotingCommandException("Failed to get max consume offset", e);
132132
}
133-
if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) {
133+
if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() >= maxOffset) {
134134
response.setCode(ResponseCode.NO_MESSAGE);
135135
return CompletableFuture.completedFuture(response);
136136
}

broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.rocketmq.common.message.MessageConst;
3030
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
3131
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
32-
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
33-
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
3432
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
3533
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
3634
import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -46,6 +44,7 @@
4644
import org.apache.rocketmq.store.PutMessageResult;
4745
import org.apache.rocketmq.store.PutMessageStatus;
4846
import org.apache.rocketmq.store.config.MessageStoreConfig;
47+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
4948
import org.junit.Before;
5049
import org.junit.Test;
5150
import org.junit.runner.RunWith;
@@ -56,6 +55,8 @@
5655
import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
5756
import static org.assertj.core.api.Assertions.assertThat;
5857
import static org.mockito.ArgumentMatchers.any;
58+
import static org.mockito.ArgumentMatchers.anyInt;
59+
import static org.mockito.ArgumentMatchers.anyString;
5960
import static org.mockito.Mockito.mock;
6061
import static org.mockito.Mockito.when;
6162

@@ -108,7 +109,8 @@ public void init() throws IllegalAccessException, NoSuchFieldException {
108109
}
109110

110111
@Test
111-
public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
112+
public void testProcessRequest_Success() throws RemotingCommandException, ConsumeQueueException {
113+
when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(2L);
112114
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
113115
int queueId = 0;
114116
long queueOffset = 0;
@@ -133,4 +135,31 @@ public void testProcessRequest_Success() throws RemotingCommandException, Interr
133135
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
134136
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
135137
}
138+
139+
@Test
140+
public void testProcessRequest_NoMessage() throws RemotingCommandException, ConsumeQueueException {
141+
when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(2L);
142+
int queueId = 0;
143+
long queueOffset = 2;
144+
long popTime = System.currentTimeMillis() - 1_000;
145+
long invisibleTime = 30_000;
146+
int reviveQid = 0;
147+
String brokerName = "test_broker";
148+
String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, invisibleTime, reviveQid,
149+
topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + queueOffset;
150+
151+
ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
152+
requestHeader.setTopic(topic);
153+
requestHeader.setQueueId(queueId);
154+
requestHeader.setOffset(queueOffset);
155+
requestHeader.setConsumerGroup(group);
156+
requestHeader.setExtraInfo(extraInfo);
157+
requestHeader.setInvisibleTime(invisibleTime);
158+
159+
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
160+
request.makeCustomHeaderToNet();
161+
RemotingCommand responseToReturn = changeInvisibleTimeProcessor.processRequest(handlerContext, request);
162+
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
163+
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
164+
}
136165
}

0 commit comments

Comments
 (0)