6464import org .apache .rocketmq .common .message .MessageExt ;
6565import org .apache .rocketmq .common .message .MessageExtBrokerInner ;
6666import org .apache .rocketmq .common .topic .TopicValidator ;
67+ import org .apache .rocketmq .common .utils .ConcurrentHashMapUtils ;
6768import org .apache .rocketmq .logging .org .slf4j .Logger ;
6869import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
6970import org .apache .rocketmq .remoting .CommandCallback ;
@@ -150,11 +151,11 @@ public static String genAckUniqueId(AckMsg ackMsg) {
150151
151152 public static String genBatchAckUniqueId (BatchAckMsg batchAckMsg ) {
152153 return batchAckMsg .getTopic ()
153- + PopAckConstants .SPLIT + batchAckMsg .getQueueId ()
154- + PopAckConstants .SPLIT + batchAckMsg .getAckOffsetList ().toString ()
155- + PopAckConstants .SPLIT + batchAckMsg .getConsumerGroup ()
156- + PopAckConstants .SPLIT + batchAckMsg .getPopTime ()
157- + PopAckConstants .SPLIT + PopAckConstants .BATCH_ACK_TAG ;
154+ + PopAckConstants .SPLIT + batchAckMsg .getQueueId ()
155+ + PopAckConstants .SPLIT + batchAckMsg .getAckOffsetList ().toString ()
156+ + PopAckConstants .SPLIT + batchAckMsg .getConsumerGroup ()
157+ + PopAckConstants .SPLIT + batchAckMsg .getPopTime ()
158+ + PopAckConstants .SPLIT + PopAckConstants .BATCH_ACK_TAG ;
158159 }
159160
160161 public static String genCkUniqueId (PopCheckPoint ck ) {
@@ -861,7 +862,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
861862
862863 private boolean isPopShouldStop (String topic , String group , int queueId ) {
863864 return brokerController .getBrokerConfig ().isEnablePopMessageThreshold () &&
864- brokerController .getPopInflightMessageCounter ().getGroupPopInFlightMessageNum (topic , group , queueId ) > brokerController .getBrokerConfig ().getPopInflightMessageThreshold ();
865+ brokerController .getPopInflightMessageCounter ().getGroupPopInFlightMessageNum (topic , group , queueId ) > brokerController .getBrokerConfig ().getPopInflightMessageThreshold ();
865866 }
866867
867868 private long getPopOffset (String topic , String group , int queueId , int initMode , boolean init , String lockKey ,
@@ -908,7 +909,7 @@ private long getInitOffset(String topic, String group, int queueId, int initMode
908909 }
909910 if (init ) { // whichever initMode
910911 this .brokerController .getConsumerOffsetManager ().commitOffset (
911- "getPopOffset" , group , topic , queueId , offset );
912+ "getPopOffset" , group , topic , queueId , offset );
912913 }
913914 return offset ;
914915 }
@@ -1002,12 +1003,13 @@ static class TimedLock {
10021003 private volatile long lockTime ;
10031004
10041005 public TimedLock () {
1005- this .lock = new AtomicBoolean (true );
1006+ // init lock status, false means not locked
1007+ this .lock = new AtomicBoolean (false );
10061008 this .lockTime = System .currentTimeMillis ();
10071009 }
10081010
10091011 public boolean tryLock () {
1010- boolean ret = lock .compareAndSet (true , false );
1012+ boolean ret = lock .compareAndSet (false , true );
10111013 if (ret ) {
10121014 this .lockTime = System .currentTimeMillis ();
10131015 return true ;
@@ -1017,11 +1019,11 @@ public boolean tryLock() {
10171019 }
10181020
10191021 public void unLock () {
1020- lock .set (true );
1022+ lock .set (false );
10211023 }
10221024
10231025 public boolean isLock () {
1024- return ! lock .get ();
1026+ return lock .get ();
10251027 }
10261028
10271029 public long getLockTime () {
@@ -1041,21 +1043,7 @@ public boolean tryLock(String topic, String consumerGroup, int queueId) {
10411043 }
10421044
10431045 public boolean tryLock (String key ) {
1044- TimedLock timedLock = expiredLocalCache .get (key );
1045-
1046- if (timedLock == null ) {
1047- TimedLock old = expiredLocalCache .putIfAbsent (key , new TimedLock ());
1048- if (old != null ) {
1049- return false ;
1050- } else {
1051- timedLock = expiredLocalCache .get (key );
1052- }
1053- }
1054-
1055- if (timedLock == null ) {
1056- return false ;
1057- }
1058-
1046+ TimedLock timedLock = ConcurrentHashMapUtils .computeIfAbsent (expiredLocalCache , key , k -> new TimedLock ());
10591047 return timedLock .tryLock ();
10601048 }
10611049
0 commit comments