@@ -348,7 +348,7 @@ public Pair<Long, Long> getConsumerLagStats(String group, String topic, int queu
348348 brokerOffset = 0 ;
349349 }
350350
351- if (isPop ) {
351+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
352352 long pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
353353 if (pullOffset < 0 ) {
354354 pullOffset = offsetManager .queryOffset (group , topic , queueId );
@@ -401,7 +401,7 @@ public Pair<Long, Long> getInFlightMsgStats(String group, String topic, boolean
401401
402402 public Pair <Long , Long > getInFlightMsgStats (String group , String topic , int queueId , boolean isPop )
403403 throws ConsumeQueueException {
404- if (isPop ) {
404+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
405405 long inflight = popInflightMessageCounter .getGroupPopInFlightMessageNum (topic , group , queueId );
406406 long pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
407407 if (pullOffset < 0 ) {
@@ -456,14 +456,11 @@ public long getAvailableMsgCount(String group, String topic, int queueId, boolea
456456 }
457457
458458 long pullOffset ;
459- if (isPop ) {
459+ if (isPop && ! brokerConfig . isPopConsumerKVServiceEnable () ) {
460460 pullOffset = popBufferMergeService .getLatestOffset (topic , group , queueId );
461461 if (pullOffset < 0 ) {
462462 pullOffset = offsetManager .queryOffset (group , topic , queueId );
463463 }
464- if (pullOffset < 0 ) {
465- pullOffset = brokerOffset ;
466- }
467464 } else {
468465 pullOffset = offsetManager .queryPullOffset (group , topic , queueId );
469466 }
0 commit comments