@@ -70,6 +70,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
7070 protected final ScheduledExecutorService scheduledExecutorService =
7171 ThreadUtils .newSingleThreadScheduledExecutor (new ThreadFactoryImpl ("RenewalScheduledThread_" ));
7272 protected final ThreadPoolExecutor renewalWorkerService ;
73+ protected final ThreadPoolExecutor returnHandleGroupWorkerService ;
7374
7475 public DefaultReceiptHandleManager (MetadataService metadataService , ConsumerManager consumerManager , StateEventListener <RenewEvent > eventListener ) {
7576 this .metadataService = metadataService ;
@@ -83,6 +84,13 @@ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerMana
8384 "RenewalWorkerThread" ,
8485 proxyConfig .getRenewThreadPoolQueueCapacity ()
8586 );
87+ this .returnHandleGroupWorkerService = ThreadPoolMonitor .createAndMonitor (
88+ proxyConfig .getReturnHandleGroupThreadPoolNums (),
89+ proxyConfig .getReturnHandleGroupThreadPoolNums () * 2 ,
90+ 1 , TimeUnit .MINUTES ,
91+ "ReturnHandleGroupWorkerThread" ,
92+ proxyConfig .getRenewThreadPoolQueueCapacity ()
93+ );
8694 consumerManager .appendConsumerIdsChangeListener (new ConsumerIdsChangeListener () {
8795 @ Override
8896 public void handle (ConsumerGroupEvent event , String group , Object ... args ) {
@@ -237,22 +245,31 @@ protected void clearGroup(ReceiptHandleGroupKey key) {
237245 if (key == null ) {
238246 return ;
239247 }
240- ProxyConfig proxyConfig = ConfigurationManager .getProxyConfig ();
241248 ReceiptHandleGroup handleGroup = receiptHandleGroupMap .remove (key );
242- if (handleGroup == null ) {
249+ returnHandleGroupWorkerService .submit (() -> returnHandleGroup (key , handleGroup , 0 , 0 ));
250+ }
251+
252+ // When trying for the first time, do not wait for the handle lock, and try to process all handles that can be locked first,
253+ // and then wait for the lock when processing HandleData for the second time.
254+ private void returnHandleGroup (ReceiptHandleGroupKey key , ReceiptHandleGroup handleGroup , long lockTimeout , int attemptTimes ) {
255+ if (handleGroup == null || handleGroup .isEmpty () || attemptTimes > 2 ) {
243256 return ;
244257 }
258+ ProxyConfig proxyConfig = ConfigurationManager .getProxyConfig ();
245259 handleGroup .scan ((msgID , handle , v ) -> {
246260 try {
247261 handleGroup .computeIfPresent (msgID , handle , messageReceiptHandle -> {
248262 CompletableFuture <AckResult > future = new CompletableFuture <>();
249263 eventListener .fireEvent (new RenewEvent (key , messageReceiptHandle , proxyConfig .getInvisibleTimeMillisWhenClear (), RenewEvent .EventType .CLEAR_GROUP , future ));
250264 return CompletableFuture .completedFuture (null );
251- });
265+ }, lockTimeout );
252266 } catch (Exception e ) {
253267 log .error ("error when clear handle for group. key:{}" , key , e );
254268 }
255269 });
270+ if (!handleGroup .isEmpty ()) {
271+ returnHandleGroupWorkerService .submit (() -> returnHandleGroup (key , handleGroup , proxyConfig .getLockTimeoutMsInHandleGroup (), attemptTimes + 1 ));
272+ }
256273 }
257274
258275 protected void clearAllHandle () {
0 commit comments