From 2f646920a2ae1f7a9dfccd494b117e6b3bf62d2d Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 10 Feb 2026 11:07:13 +0800 Subject: [PATCH] fix delay message type parser for grpc converter. Change-Id: I1820986a5389b2209528a34a2201afe483898b7b --- .../proxy/grpc/v2/common/GrpcConverter.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 04dab917ea4..87d20ebca1b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageConst; @@ -160,21 +161,8 @@ protected SystemProperties buildSystemProperties(MessageExt messageExt) { } // message_type - String isTrans = messageExt.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - String isTransValue = "true"; - if (isTransValue.equals(isTrans)) { - systemPropertiesBuilder.setMessageType(MessageType.TRANSACTION); - } else if (messageExt.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null - || messageExt.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null - || messageExt.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { - systemPropertiesBuilder.setMessageType(MessageType.DELAY); - } else if (messageExt.getProperty(MessageConst.PROPERTY_SHARDING_KEY) != null) { - systemPropertiesBuilder.setMessageType(MessageType.FIFO); - } else if (messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) != null) { - systemPropertiesBuilder.setMessageType(MessageType.LITE); - } else { - systemPropertiesBuilder.setMessageType(MessageType.NORMAL); - } + TopicMessageType topicMessageType = TopicMessageType.parseFromMessageProperty(messageExt.getProperties()); + systemPropertiesBuilder.setMessageType(convertToGrpcMessageType(topicMessageType)); // born_timestamp (millis) long bornTimestamp = messageExt.getBornTimestamp(); @@ -271,4 +259,24 @@ public Resource buildResource(String resourceNameWithNamespace) { .setName(NamespaceUtil.withoutNamespace(resourceNameWithNamespace)) .build(); } + + protected MessageType convertToGrpcMessageType(TopicMessageType topicMessageType) { + switch (topicMessageType) { + case TRANSACTION: + return MessageType.TRANSACTION; + case DELAY: + return MessageType.DELAY; + case FIFO: + return MessageType.FIFO; + case PRIORITY: + return MessageType.PRIORITY; + case LITE: + return MessageType.LITE; + case NORMAL: + return MessageType.NORMAL; + case UNSPECIFIED: + default: + return MessageType.NORMAL; + } + } }