4444import org .apache .rocketmq .common .ServiceThread ;
4545import org .apache .rocketmq .common .TopicConfig ;
4646import org .apache .rocketmq .common .TopicFilterType ;
47+ import org .apache .rocketmq .common .constant .ConsumeInitMode ;
4748import org .apache .rocketmq .common .constant .LoggerName ;
4849import org .apache .rocketmq .common .constant .PermName ;
4950import org .apache .rocketmq .common .message .MessageAccessor ;
@@ -197,7 +198,18 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
197198 return context ;
198199 }
199200
200- public Long getPopOffset (String groupId , String topicId , int queueId ) {
201+ public long getPopOffset (String groupId , String topicId , int queueId , int initMode ) {
202+ long offset = this .brokerController .getConsumerOffsetManager ().queryPullOffset (groupId , topicId , queueId );
203+ if (offset < 0L ) {
204+ try {
205+ offset = this .brokerController .getPopMessageProcessor ()
206+ .getInitOffset (topicId , groupId , queueId , initMode , true );
207+ log .info ("PopConsumerService init offset, groupId={}, topicId={}, queueId={}, init={}, offset={}" ,
208+ groupId , topicId , queueId , ConsumeInitMode .MIN == initMode ? "min" : "max" , offset );
209+ } catch (ConsumeQueueException e ) {
210+ throw new RuntimeException (e );
211+ }
212+ }
201213 Long resetOffset =
202214 this .brokerController .getConsumerOffsetManager ().queryThenEraseResetOffset (topicId , groupId , queueId );
203215 if (resetOffset != null ) {
@@ -206,7 +218,7 @@ public Long getPopOffset(String groupId, String topicId, int queueId) {
206218 this .brokerController .getConsumerOffsetManager ()
207219 .commitOffset ("ResetPopOffset" , groupId , topicId , queueId , resetOffset );
208220 }
209- return resetOffset ;
221+ return resetOffset != null ? resetOffset : offset ;
210222 }
211223
212224 public CompletableFuture <GetMessageResult > getMessageAsync (String clientHost ,
@@ -215,9 +227,6 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
215227 log .debug ("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}" ,
216228 groupId , topicId , offset , queueId , batchSize , filter != null );
217229
218- Long resetOffset = this .getPopOffset (groupId , topicId , queueId );
219- final long currentOffset = resetOffset != null ? resetOffset : offset ;
220-
221230 CompletableFuture <GetMessageResult > getMessageFuture =
222231 brokerController .getMessageStore ().getMessageAsync (groupId , topicId , queueId , offset , batchSize , filter );
223232
@@ -240,7 +249,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
240249
241250 log .warn ("PopConsumerService getMessageAsync, initial offset because store is no correct, " +
242251 "groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}" ,
243- groupId , topicId , queueId , batchSize , currentOffset , result .getNextBeginOffset ());
252+ groupId , topicId , queueId , batchSize , offset , result .getNextBeginOffset ());
244253
245254 return brokerController .getMessageStore ().getMessageAsync (
246255 groupId , topicId , queueId , result .getNextBeginOffset (), batchSize , filter );
@@ -299,7 +308,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
299308 result .addRestCount (this .getPendingFilterCount (groupId , topicId , queueId ));
300309 return CompletableFuture .completedFuture (result );
301310 } else {
302- long consumeOffset = brokerController . getConsumerOffsetManager (). queryPullOffset ( groupId , topicId , queueId );
311+ final long consumeOffset = this . getPopOffset ( groupId , topicId , queueId , result . getInitMode () );
303312 return getMessageAsync (clientHost , groupId , topicId , queueId , consumeOffset , remain , filter )
304313 .thenApply (getMessageResult -> addGetMessageResult (
305314 result , getMessageResult , topicId , queueId , retryType , consumeOffset ));
@@ -308,11 +317,11 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
308317 }
309318
310319 public CompletableFuture <PopConsumerContext > popAsync (String clientHost , long popTime , long invisibleTime ,
311- String groupId , String topicId , int queueId , int batchSize , boolean fifo , String attemptId ,
320+ String groupId , String topicId , int queueId , int batchSize , boolean fifo , String attemptId , int initMode ,
312321 MessageFilter filter ) {
313322
314323 PopConsumerContext popConsumerContext =
315- new PopConsumerContext (clientHost , popTime , invisibleTime , groupId , fifo , attemptId );
324+ new PopConsumerContext (clientHost , popTime , invisibleTime , groupId , fifo , initMode , attemptId );
316325
317326 TopicConfig topicConfig = brokerController .getTopicConfigManager ().selectTopicConfig (topicId );
318327 if (topicConfig == null || !consumerLockService .tryLock (groupId , topicId )) {
0 commit comments