@@ -1571,6 +1571,9 @@ public String getServiceName() {
15711571 public void run () {
15721572 setState (AbstractStateService .START );
15731573 TimerMessageStore .LOGGER .info (this .getServiceName () + " service start" );
1574+ //Mark different rounds
1575+ boolean isRound = true ;
1576+ Map <String ,MessageExt > avoidDeleteLose = new HashMap <>();
15741577 while (!this .isStopped ()) {
15751578 try {
15761579 setState (AbstractStateService .WAITING );
@@ -1587,9 +1590,18 @@ public void run() {
15871590 MessageExt msgExt = getMessageByCommitOffset (tr .getOffsetPy (), tr .getSizePy ());
15881591 if (null != msgExt ) {
15891592 if (needDelete (tr .getMagic ()) && !needRoll (tr .getMagic ())) {
1593+ //Clearing is performed once in each round.
1594+ //The deletion message is received first and the common message is received once
1595+ if (!isRound ) {
1596+ isRound = true ;
1597+ for (MessageExt messageExt : avoidDeleteLose .values ()) {
1598+ addMetric (messageExt , 1 );
1599+ }
1600+ avoidDeleteLose .clear ();
1601+ }
15901602 if (msgExt .getProperty (MessageConst .PROPERTY_TIMER_DEL_UNIQKEY ) != null && tr .getDeleteList () != null ) {
1591- //Execute metric plus one for messages that fail to be deleted
1592- addMetric (msgExt , 1 );
1603+
1604+ avoidDeleteLose . put (msgExt . getProperty ( MessageConst . PROPERTY_TIMER_DEL_UNIQKEY ), msgExt );
15931605 tr .getDeleteList ().add (msgExt .getProperty (MessageConst .PROPERTY_TIMER_DEL_UNIQKEY ));
15941606 }
15951607 tr .idempotentRelease ();
@@ -1599,10 +1611,13 @@ public void run() {
15991611 if (null == uniqueKey ) {
16001612 LOGGER .warn ("No uniqueKey for msg:{}" , msgExt );
16011613 }
1614+ //Mark ready for next round
1615+ if (isRound ) {
1616+ isRound = false ;
1617+ }
16021618 if (null != uniqueKey && tr .getDeleteList () != null && tr .getDeleteList ().size () > 0
16031619 && tr .getDeleteList ().contains (buildDeleteKey (getRealTopic (msgExt ), uniqueKey ))) {
1604- //Normally, it cancels out with the +1 above
1605- addMetric (msgExt , -1 );
1620+ avoidDeleteLose .remove (uniqueKey );
16061621 doRes = true ;
16071622 tr .idempotentRelease ();
16081623 perfCounterTicks .getCounter ("dequeue_delete" ).flow (1 );
0 commit comments