@@ -162,17 +162,15 @@ public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult,
162162 return result ;
163163 }
164164
165- public PopConsumerContext addGetMessageResult (PopConsumerContext context , GetMessageResult result ,
165+ public PopConsumerContext handleGetMessageResult (PopConsumerContext context , GetMessageResult result ,
166166 String topicId , int queueId , PopConsumerRecord .RetryType retryType , long offset ) {
167167
168- if (result .getStatus () == GetMessageStatus . FOUND && !result .getMessageQueueOffset ().isEmpty ()) {
168+ if (GetMessageStatus . FOUND . equals ( result .getStatus ()) && !result .getMessageQueueOffset ().isEmpty ()) {
169169 if (context .isFifo ()) {
170170 this .setFifoBlocked (context , context .getGroupId (), topicId , queueId , result .getMessageQueueOffset ());
171171 }
172-
173- // build request header here
172+ // build response header here
174173 context .addGetMessageResult (result , topicId , queueId , retryType , offset );
175-
176174 if (brokerConfig .isPopConsumerKVServiceLog ()) {
177175 log .info ("PopConsumerService pop, time={}, invisible={}, " +
178176 "groupId={}, topic={}, queueId={}, offset={}, attemptId={}" ,
@@ -181,20 +179,23 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
181179 }
182180 }
183181
184- if (!context .isFifo () && result .getNextBeginOffset () > OFFSET_NOT_EXIST ) {
182+ long commitOffset = offset ;
183+ if (context .isFifo ()) {
184+ if (!GetMessageStatus .FOUND .equals (result .getStatus ())) {
185+ commitOffset = result .getNextBeginOffset ();
186+ }
187+ } else {
185188 this .brokerController .getConsumerOffsetManager ().commitPullOffset (
186189 context .getClientHost (), context .getGroupId (), topicId , queueId , result .getNextBeginOffset ());
187- long commitOffset = result .getStatus () == GetMessageStatus .FOUND ? offset : result .getNextBeginOffset ();
188190 if (brokerConfig .isEnablePopBufferMerge () && popConsumerCache != null ) {
189191 long minOffset = popConsumerCache .getMinOffsetInCache (context .getGroupId (), topicId , queueId );
190192 if (minOffset != OFFSET_NOT_EXIST ) {
191193 commitOffset = minOffset ;
192194 }
193195 }
194- this .brokerController .getConsumerOffsetManager ().commitOffset (
195- context .getClientHost (), context .getGroupId (), topicId , queueId , commitOffset );
196196 }
197-
197+ this .brokerController .getConsumerOffsetManager ().commitOffset (
198+ context .getClientHost (), context .getGroupId (), topicId , queueId , commitOffset );
198199 return context ;
199200 }
200201
@@ -310,7 +311,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
310311 } else {
311312 final long consumeOffset = this .getPopOffset (groupId , topicId , queueId , result .getInitMode ());
312313 return getMessageAsync (clientHost , groupId , topicId , queueId , consumeOffset , remain , filter )
313- .thenApply (getMessageResult -> addGetMessageResult (
314+ .thenApply (getMessageResult -> handleGetMessageResult (
314315 result , getMessageResult , topicId , queueId , retryType , consumeOffset ));
315316 }
316317 });
0 commit comments