|
36 | 36 | import java.util.Map; |
37 | 37 | import java.util.concurrent.TimeUnit; |
38 | 38 | import org.apache.commons.lang3.StringUtils; |
| 39 | +import org.apache.rocketmq.common.attribute.TopicMessageType; |
39 | 40 | import org.apache.rocketmq.common.constant.LoggerName; |
40 | 41 | import org.apache.rocketmq.common.filter.ExpressionType; |
41 | 42 | import org.apache.rocketmq.common.message.MessageConst; |
@@ -160,21 +161,8 @@ protected SystemProperties buildSystemProperties(MessageExt messageExt) { |
160 | 161 | } |
161 | 162 |
|
162 | 163 | // message_type |
163 | | - String isTrans = messageExt.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); |
164 | | - String isTransValue = "true"; |
165 | | - if (isTransValue.equals(isTrans)) { |
166 | | - systemPropertiesBuilder.setMessageType(MessageType.TRANSACTION); |
167 | | - } else if (messageExt.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null |
168 | | - || messageExt.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null |
169 | | - || messageExt.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { |
170 | | - systemPropertiesBuilder.setMessageType(MessageType.DELAY); |
171 | | - } else if (messageExt.getProperty(MessageConst.PROPERTY_SHARDING_KEY) != null) { |
172 | | - systemPropertiesBuilder.setMessageType(MessageType.FIFO); |
173 | | - } else if (messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) != null) { |
174 | | - systemPropertiesBuilder.setMessageType(MessageType.LITE); |
175 | | - } else { |
176 | | - systemPropertiesBuilder.setMessageType(MessageType.NORMAL); |
177 | | - } |
| 164 | + TopicMessageType topicMessageType = TopicMessageType.parseFromMessageProperty(messageExt.getProperties()); |
| 165 | + systemPropertiesBuilder.setMessageType(convertToGrpcMessageType(topicMessageType)); |
178 | 166 |
|
179 | 167 | // born_timestamp (millis) |
180 | 168 | long bornTimestamp = messageExt.getBornTimestamp(); |
@@ -271,4 +259,24 @@ public Resource buildResource(String resourceNameWithNamespace) { |
271 | 259 | .setName(NamespaceUtil.withoutNamespace(resourceNameWithNamespace)) |
272 | 260 | .build(); |
273 | 261 | } |
| 262 | + |
| 263 | + protected MessageType convertToGrpcMessageType(TopicMessageType topicMessageType) { |
| 264 | + switch (topicMessageType) { |
| 265 | + case TRANSACTION: |
| 266 | + return MessageType.TRANSACTION; |
| 267 | + case DELAY: |
| 268 | + return MessageType.DELAY; |
| 269 | + case FIFO: |
| 270 | + return MessageType.FIFO; |
| 271 | + case PRIORITY: |
| 272 | + return MessageType.PRIORITY; |
| 273 | + case LITE: |
| 274 | + return MessageType.LITE; |
| 275 | + case NORMAL: |
| 276 | + return MessageType.NORMAL; |
| 277 | + case UNSPECIFIED: |
| 278 | + default: |
| 279 | + return MessageType.NORMAL; |
| 280 | + } |
| 281 | + } |
274 | 282 | } |
0 commit comments