Skip to content

Commit 5be1dd7

Browse files
committed
fix
Change-Id: I9f233816eb3720505f0389369da0267c2d47beed
1 parent 7a1a950 commit 5be1dd7

2 files changed

Lines changed: 9 additions & 4 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,13 +268,18 @@ public MessageReceiptHandle removeOne(String msgID) {
268268

269269
public void computeIfPresent(String msgID, String handle,
270270
Function<MessageReceiptHandle, CompletableFuture<MessageReceiptHandle>> function) {
271+
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
272+
computeIfPresent(msgID, handle, function, timeout);
273+
}
274+
275+
public void computeIfPresent(String msgID, String handle,
276+
Function<MessageReceiptHandle, CompletableFuture<MessageReceiptHandle>> function, long lockTimeout) {
271277
Map<HandleKey, HandleData> handleMap = this.receiptHandleMap.get(msgID);
272278
if (handleMap == null) {
273279
return;
274280
}
275-
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
276281
handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> {
277-
Long lockTimeMs = handleData.lock(timeout);
282+
Long lockTimeMs = handleData.lock(lockTimeout);
278283
if (lockTimeMs == null) {
279284
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute failed");
280285
}

proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
4141
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
4242
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
43+
import org.apache.rocketmq.common.utils.ExceptionUtils;
4344
import org.apache.rocketmq.common.utils.StartAndShutdown;
4445
import org.apache.rocketmq.common.utils.ThreadUtils;
4546
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -53,7 +54,6 @@
5354
import org.apache.rocketmq.proxy.common.RenewEvent;
5455
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
5556
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
56-
import org.apache.rocketmq.common.utils.ExceptionUtils;
5757
import org.apache.rocketmq.proxy.config.ConfigurationManager;
5858
import org.apache.rocketmq.proxy.config.ProxyConfig;
5959
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
@@ -172,7 +172,7 @@ protected void scheduleRenewTask() {
172172

173173
protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) {
174174
try {
175-
group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(context, key, messageReceiptHandle));
175+
group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(context, key, messageReceiptHandle), 0);
176176
} catch (Exception e) {
177177
log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
178178
}

0 commit comments

Comments
 (0)