4040import org .apache .rocketmq .common .thread .ThreadPoolMonitor ;
4141import org .apache .rocketmq .common .utils .AbstractStartAndShutdown ;
4242import org .apache .rocketmq .common .utils .ConcurrentHashMapUtils ;
43+ import org .apache .rocketmq .common .utils .ExceptionUtils ;
4344import org .apache .rocketmq .common .utils .StartAndShutdown ;
4445import org .apache .rocketmq .common .utils .ThreadUtils ;
4546import org .apache .rocketmq .logging .org .slf4j .Logger ;
5354import org .apache .rocketmq .proxy .common .RenewEvent ;
5455import org .apache .rocketmq .proxy .common .RenewStrategyPolicy ;
5556import org .apache .rocketmq .proxy .common .channel .ChannelHelper ;
56- import org .apache .rocketmq .common .utils .ExceptionUtils ;
5757import org .apache .rocketmq .proxy .config .ConfigurationManager ;
5858import org .apache .rocketmq .proxy .config .ProxyConfig ;
5959import org .apache .rocketmq .proxy .service .metadata .MetadataService ;
@@ -70,6 +70,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
7070 protected final ScheduledExecutorService scheduledExecutorService =
7171 ThreadUtils .newSingleThreadScheduledExecutor (new ThreadFactoryImpl ("RenewalScheduledThread_" ));
7272 protected final ThreadPoolExecutor renewalWorkerService ;
73+ protected final ThreadPoolExecutor returnHandleGroupWorkerService ;
7374
7475 public DefaultReceiptHandleManager (MetadataService metadataService , ConsumerManager consumerManager , StateEventListener <RenewEvent > eventListener ) {
7576 this .metadataService = metadataService ;
@@ -83,6 +84,13 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
8384 "RenewalWorkerThread" ,
8485 proxyConfig .getRenewThreadPoolQueueCapacity ()
8586 );
87+ this .returnHandleGroupWorkerService = ThreadPoolMonitor .createAndMonitor (
88+ proxyConfig .getReturnHandleGroupThreadPoolNums (),
89+ proxyConfig .getReturnHandleGroupThreadPoolNums () * 2 ,
90+ 1 , TimeUnit .MINUTES ,
91+ "ReturnHandleGroupWorkerThread" ,
92+ proxyConfig .getRenewThreadPoolQueueCapacity ()
93+ );
8694 consumerManager .appendConsumerIdsChangeListener (new ConsumerIdsChangeListener () {
8795 @ Override
8896 public void handle (ConsumerGroupEvent event , String group , Object ... args ) {
@@ -172,7 +180,7 @@ protected void scheduleRenewTask() {
172180
173181 protected void renewMessage (ProxyContext context , ReceiptHandleGroupKey key , ReceiptHandleGroup group , String msgID , String handleStr ) {
174182 try {
175- group .computeIfPresent (msgID , handleStr , messageReceiptHandle -> startRenewMessage (context , key , messageReceiptHandle ));
183+ group .computeIfPresent (msgID , handleStr , messageReceiptHandle -> startRenewMessage (context , key , messageReceiptHandle ), 0 );
176184 } catch (Exception e ) {
177185 log .error ("error when renew message. msgID:{}, handleStr:{}" , msgID , handleStr , e );
178186 }
@@ -237,22 +245,33 @@ protected void clearGroup(ReceiptHandleGroupKey key) {
237245 if (key == null ) {
238246 return ;
239247 }
240- ProxyConfig proxyConfig = ConfigurationManager .getProxyConfig ();
241248 ReceiptHandleGroup handleGroup = receiptHandleGroupMap .remove (key );
242- if (handleGroup == null ) {
249+ returnHandleGroupWorkerService .submit (() -> returnHandleGroup (key , handleGroup ));
250+ }
251+
252+ // There is no longer any waiting for lock, and only the locked handles will be processed immediately,
253+ // while the handles that cannot be acquired will be kept waiting for the next scheduling.
254+ private void returnHandleGroup (ReceiptHandleGroupKey key , ReceiptHandleGroup handleGroup ) {
255+ if (handleGroup == null || handleGroup .isEmpty ()) {
243256 return ;
244257 }
258+ ProxyConfig proxyConfig = ConfigurationManager .getProxyConfig ();
245259 handleGroup .scan ((msgID , handle , v ) -> {
246260 try {
247261 handleGroup .computeIfPresent (msgID , handle , messageReceiptHandle -> {
248262 CompletableFuture <AckResult > future = new CompletableFuture <>();
249263 eventListener .fireEvent (new RenewEvent (key , messageReceiptHandle , proxyConfig .getInvisibleTimeMillisWhenClear (), RenewEvent .EventType .CLEAR_GROUP , future ));
250264 return CompletableFuture .completedFuture (null );
251- });
265+ }, 0 );
252266 } catch (Exception e ) {
253267 log .error ("error when clear handle for group. key:{}" , key , e );
254268 }
255269 });
270+ // scheduleRenewTask will trigger cleanup again
271+ if (!handleGroup .isEmpty ()) {
272+ log .warn ("The handle cannot be completely cleared, the remaining quantity is {}, key:{}" , handleGroup .getHandleNum (), key );
273+ receiptHandleGroupMap .putIfAbsent (key , handleGroup );
274+ }
256275 }
257276
258277 protected void clearAllHandle () {
0 commit comments