Skip to content

Commit e1021ec

Browse files
authored
[ISSUE #9416] Fix batch send messages have the same message id when consumed (#9417)
1 parent 46b7a98 commit e1021ec

3 files changed

Lines changed: 11 additions & 30 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -290,15 +290,6 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex
290290
throw new RuntimeException("message body size exceeded");
291291
}
292292

293-
// properties from MessageExtBatch
294-
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
295-
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
296-
int batchPropDataLen = batchPropData.length;
297-
if (batchPropDataLen > Short.MAX_VALUE) {
298-
CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
299-
throw new RuntimeException("Properties size of messageExtBatch exceeded!");
300-
}
301-
final short batchPropLen = (short) batchPropDataLen;
302293

303294
int batchSize = 0;
304295
while (messagesByteBuff.hasRemaining()) {
@@ -320,14 +311,11 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex
320311
short propertiesLen = messagesByteBuff.getShort();
321312
int propertiesPos = messagesByteBuff.position();
322313
messagesByteBuff.position(propertiesPos + propertiesLen);
323-
boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
324-
&& messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
325314

326315
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
327316

328317
final int topicLength = topicData.length;
329-
int totalPropLen = needAppendLastPropertySeparator ?
330-
propertiesLen + batchPropLen + 1 : propertiesLen + batchPropLen;
318+
int totalPropLen = propertiesLen;
331319

332320
// properties need to add crc32
333321
totalPropLen += crc32ReservedLength;
@@ -386,12 +374,6 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex
386374
if (propertiesLen > 0) {
387375
this.byteBuf.writeBytes(messagesByteBuff.array(), propertiesPos, propertiesLen);
388376
}
389-
if (batchPropLen > 0) {
390-
if (needAppendLastPropertySeparator) {
391-
this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
392-
}
393-
this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
394-
}
395377
this.byteBuf.writerIndex(this.byteBuf.writerIndex() + crc32ReservedLength);
396378
}
397379
putMessageContext.setBatchSize(batchSize);

store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.rocketmq.store;
1919

20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
2023
import java.io.File;
2124
import java.net.InetSocketAddress;
2225
import java.nio.ByteBuffer;
2326
import java.util.ArrayList;
2427
import java.util.HashSet;
2528
import java.util.List;
2629
import java.util.Set;
27-
2830
import java.util.concurrent.ConcurrentHashMap;
2931
import org.apache.rocketmq.common.BrokerConfig;
3032
import org.apache.rocketmq.common.UtilAll;
@@ -37,9 +39,6 @@
3739
import org.junit.Before;
3840
import org.junit.Test;
3941

40-
import static org.junit.Assert.assertEquals;
41-
import static org.junit.Assert.assertTrue;
42-
4342
public class AppendCallbackTest {
4443

4544
AppendMessageCallback callback;

store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
package org.apache.rocketmq.store;
1919

20+
import static java.util.concurrent.TimeUnit.SECONDS;
21+
import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.awaitility.Awaitility.await;
24+
import static org.junit.Assert.assertTrue;
25+
2026
import java.io.File;
2127
import java.net.InetSocketAddress;
2228
import java.nio.charset.Charset;
@@ -39,12 +45,6 @@
3945
import org.junit.Before;
4046
import org.junit.Test;
4147

42-
import static java.util.concurrent.TimeUnit.SECONDS;
43-
import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
44-
import static org.assertj.core.api.Assertions.assertThat;
45-
import static org.awaitility.Awaitility.await;
46-
import static org.junit.Assert.assertTrue;
47-
4848
public class BatchPutMessageTest {
4949

5050
private MessageStore messageStore;
@@ -108,7 +108,7 @@ public void testPutMessages() throws Exception {
108108
short propertiesLength = (short) propertiesBytes.length;
109109
final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
110110
final int topicLength = topicData.length;
111-
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength + batchPropLen) + msgLengthArr[j - 1];
111+
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1];
112112
j++;
113113
}
114114
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);

0 commit comments

Comments
 (0)