@@ -236,8 +236,8 @@ public long getPopOffset(String groupId, String topicId, int queueId, int initMo
236236 public CompletableFuture <GetMessageResult > getMessageAsync (String clientHost ,
237237 String groupId , String topicId , int queueId , long offset , int batchSize , MessageFilter filter ) {
238238
239- log .debug ("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}" ,
240- groupId , topicId , offset , queueId , batchSize , filter != null );
239+ log .debug ("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, " +
240+ "offset={}, batchSize={}, filter={}" , groupId , topicId , queueId , offset , batchSize , filter != null );
241241
242242 CompletableFuture <GetMessageResult > getMessageFuture =
243243 brokerController .getMessageStore ().getMessageAsync (groupId , topicId , queueId , offset , batchSize , filter );
@@ -552,7 +552,7 @@ public CompletableFuture<Boolean> revive(PopConsumerRecord record) {
552552
553553 @ SuppressWarnings ("StatementWithEmptyBody" )
554554 public void clearCache (String groupId , String topicId , int queueId ) {
555- while (consumerLockService .tryLock (groupId , topicId )) {
555+ while (! consumerLockService .tryLock (groupId , topicId )) {
556556 }
557557 try {
558558 if (popConsumerCache != null ) {
@@ -592,7 +592,7 @@ public long revive(AtomicLong currentTime, int maxCount) {
592592 if (!result ) {
593593 if (record .getAttemptTimes () < brokerConfig .getPopReviveMaxAttemptTimes ()) {
594594 long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS [
595- Math .min (REWRITE_INTERVALS_IN_SECONDS .length , record .getAttemptTimes ())];
595+ Math .min (REWRITE_INTERVALS_IN_SECONDS .length - 1 , record .getAttemptTimes ())];
596596 long nextInvisibleTime = record .getInvisibleTime () + backoffInterval ;
597597 PopConsumerRecord retryRecord = new PopConsumerRecord (System .currentTimeMillis (),
598598 record .getGroupId (), record .getTopicId (), record .getQueueId (),
@@ -760,7 +760,7 @@ public synchronized void transferToFsStore() {
760760 ck .setQueueId (record .getQueueId ());
761761 ck .setBrokerName (brokerConfig .getBrokerName ());
762762 ck .addDiff (0 );
763- ck .setRePutTimes (ck . getRePutTimes ( ));
763+ ck .setRePutTimes (String . valueOf ( record . getAttemptTimes () ));
764764 int reviveQueueId = (int ) record .getOffset () % brokerConfig .getReviveQueueNum ();
765765 MessageExtBrokerInner ckMsg =
766766 brokerController .getPopMessageProcessor ().buildCkMsg (ck , reviveQueueId );
0 commit comments