1818
1919import com .google .common .annotations .VisibleForTesting ;
2020import com .google .common .collect .Lists ;
21+ import java .net .InetSocketAddress ;
22+ import java .util .AbstractMap ;
23+ import java .util .ArrayList ;
24+ import java .util .Arrays ;
25+ import java .util .Collections ;
26+ import java .util .List ;
27+ import java .util .Map ;
28+ import java .util .Objects ;
29+ import java .util .Optional ;
30+ import java .util .concurrent .BlockingQueue ;
31+ import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .ConcurrentMap ;
33+ import java .util .concurrent .CountDownLatch ;
34+ import java .util .concurrent .ExecutorService ;
35+ import java .util .concurrent .LinkedBlockingQueue ;
36+ import java .util .concurrent .ScheduledExecutorService ;
37+ import java .util .concurrent .ScheduledFuture ;
38+ import java .util .concurrent .TimeUnit ;
39+ import java .util .concurrent .locks .Lock ;
40+ import java .util .concurrent .locks .ReentrantLock ;
41+ import java .util .function .Function ;
42+ import java .util .function .Supplier ;
43+ import java .util .stream .Collectors ;
2144import org .apache .rocketmq .auth .authentication .factory .AuthenticationFactory ;
2245import org .apache .rocketmq .auth .authentication .manager .AuthenticationMetadataManager ;
2346import org .apache .rocketmq .auth .authorization .factory .AuthorizationFactory ;
5881import org .apache .rocketmq .broker .mqtrace .SendMessageHook ;
5982import org .apache .rocketmq .broker .offset .BroadcastOffsetManager ;
6083import org .apache .rocketmq .broker .offset .ConsumerOffsetManager ;
61- import org .apache .rocketmq .broker .offset .ConsumerOrderInfoManager ;
6284import org .apache .rocketmq .broker .offset .LmqConsumerOffsetManager ;
6385import org .apache .rocketmq .broker .out .BrokerOuterAPI ;
6486import org .apache .rocketmq .broker .plugin .BrokerAttachedPlugin ;
6587import org .apache .rocketmq .broker .pop .PopConsumerService ;
88+ import org .apache .rocketmq .broker .pop .orderly .OrderedConsumptionManager ;
89+ import org .apache .rocketmq .broker .pop .orderly .QueueLevelConsumerManager ;
6690import org .apache .rocketmq .broker .processor .AckMessageProcessor ;
6791import org .apache .rocketmq .broker .processor .AdminBrokerProcessor ;
6892import org .apache .rocketmq .broker .processor .ChangeInvisibleTimeProcessor ;
156180import org .apache .rocketmq .store .timer .TimerMessageStore ;
157181import org .apache .rocketmq .store .timer .TimerMetrics ;
158182
159- import java .net .InetSocketAddress ;
160- import java .util .AbstractMap ;
161- import java .util .ArrayList ;
162- import java .util .Arrays ;
163- import java .util .Collections ;
164- import java .util .List ;
165- import java .util .Map ;
166- import java .util .Objects ;
167- import java .util .Optional ;
168- import java .util .concurrent .BlockingQueue ;
169- import java .util .concurrent .ConcurrentHashMap ;
170- import java .util .concurrent .ConcurrentMap ;
171- import java .util .concurrent .CountDownLatch ;
172- import java .util .concurrent .ExecutorService ;
173- import java .util .concurrent .LinkedBlockingQueue ;
174- import java .util .concurrent .ScheduledExecutorService ;
175- import java .util .concurrent .ScheduledFuture ;
176- import java .util .concurrent .TimeUnit ;
177- import java .util .concurrent .locks .Lock ;
178- import java .util .concurrent .locks .ReentrantLock ;
179- import java .util .function .Function ;
180- import java .util .function .Supplier ;
181- import java .util .stream .Collectors ;
182-
183183public class BrokerController {
184184 protected static final Logger LOG = LoggerFactory .getLogger (LoggerName .BROKER_LOGGER_NAME );
185185 private static final Logger LOG_PROTECTION = LoggerFactory .getLogger (LoggerName .PROTECTION_LOGGER_NAME );
@@ -195,7 +195,7 @@ public class BrokerController {
195195 protected final BroadcastOffsetManager broadcastOffsetManager ;
196196 protected final ConsumerManager consumerManager ;
197197 protected final ConsumerFilterManager consumerFilterManager ;
198- protected final ConsumerOrderInfoManager consumerOrderInfoManager ;
198+ protected final OrderedConsumptionManager orderedConsumptionManager ;
199199 protected final PopInflightMessageCounter popInflightMessageCounter ;
200200 protected final PopConsumerService popConsumerService ;
201201 protected final ProducerManager producerManager ;
@@ -387,7 +387,7 @@ public BrokerController(
387387 this .consumerManager = new ConsumerManager (this .consumerIdsChangeListener , this .brokerStatsManager , this .brokerConfig );
388388 this .producerManager = new ProducerManager (this .brokerStatsManager );
389389 this .consumerFilterManager = new ConsumerFilterManager (this );
390- this .consumerOrderInfoManager = new ConsumerOrderInfoManager (this );
390+ this .orderedConsumptionManager = new QueueLevelConsumerManager (this );
391391 this .popInflightMessageCounter = new PopInflightMessageCounter (this );
392392 this .popConsumerService = brokerConfig .isPopConsumerKVServiceInit () ? new PopConsumerService (this ) : null ;
393393 this .clientHousekeepingService = new ClientHousekeepingService (this );
@@ -676,7 +676,7 @@ public void run() {
676676 public void run () {
677677 try {
678678 BrokerController .this .consumerFilterManager .persist ();
679- BrokerController .this .consumerOrderInfoManager .persist ();
679+ BrokerController .this .orderedConsumptionManager .persist ();
680680 } catch (Throwable e ) {
681681 LOG .error (
682682 "BrokerController: failed to persist config file of consumerFilter or consumerOrderInfo" ,
@@ -832,7 +832,7 @@ public boolean initializeMetadata() {
832832 result = result && this .consumerOffsetManager .load ();
833833 result = result && this .subscriptionGroupManager .load ();
834834 result = result && this .consumerFilterManager .load ();
835- result = result && this .consumerOrderInfoManager .load ();
835+ result = result && this .orderedConsumptionManager .load ();
836836 return result ;
837837 }
838838
@@ -1336,8 +1336,8 @@ public ConsumerFilterManager getConsumerFilterManager() {
13361336 return consumerFilterManager ;
13371337 }
13381338
1339- public ConsumerOrderInfoManager getConsumerOrderInfoManager () {
1340- return consumerOrderInfoManager ;
1339+ public OrderedConsumptionManager getOrderedConsumptionManager () {
1340+ return orderedConsumptionManager ;
13411341 }
13421342
13431343 public PopInflightMessageCounter getPopInflightMessageCounter () {
@@ -1618,9 +1618,9 @@ protected void shutdownBasicService() {
16181618 this .consumerOffsetManager .stop ();
16191619 }
16201620
1621- if (this .consumerOrderInfoManager != null ) {
1622- this .consumerOrderInfoManager .persist ();
1623- this .consumerOrderInfoManager .shutdown ();
1621+ if (this .orderedConsumptionManager != null ) {
1622+ this .orderedConsumptionManager .persist ();
1623+ this .orderedConsumptionManager .shutdown ();
16241624 }
16251625
16261626 if (this .configStorage != null ) {
0 commit comments