Skip to content

Commit e20d56b

Browse files
authored
[ISSUE #9254] Refactor notifyMessageArriveInBatch in RocksDBConsumeQueueStore to adapt to CombineConsumeQueueStore (#9566)
1 parent 438077e commit e20d56b

3 files changed

Lines changed: 21 additions & 2 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ public class DefaultMessageStore implements MessageStore {
155155
private final BrokerConfig brokerConfig;
156156

157157
private volatile boolean shutdown = true;
158-
protected boolean notifyMessageArriveInBatch = false;
158+
159+
private boolean notifyMessageArriveInBatch = false;
159160

160161
protected StoreCheckpoint storeCheckpoint;
161162
private TimerMessageStore timerMessageStore;
@@ -3011,4 +3012,13 @@ public void destroyConsumeQueueStore(boolean loadAfterDestroy) {
30113012
public MessageStoreStateMachine getStateMachine() {
30123013
return stateMachine;
30133014
}
3015+
3016+
public boolean isNotifyMessageArriveInBatch() {
3017+
return notifyMessageArriveInBatch;
3018+
}
3019+
3020+
public void setNotifyMessageArriveInBatch(boolean notifyMessageArriveInBatch) {
3021+
this.notifyMessageArriveInBatch = notifyMessageArriveInBatch;
3022+
}
3023+
30143024
}

store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig, final Br
3131
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws
3232
IOException {
3333
super(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, topicConfigTable);
34-
notifyMessageArriveInBatch = true;
3534
}
3635

3736
@Override

store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,18 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
9393

9494
private long dispatchFromPhyOffset;
9595

96+
/**
97+
* there are two threads to notify longPolling when build cq successfully
98+
*
99+
* @see DefaultMessageStore.ReputMessageService#doReput()
100+
* @see RocksGroupCommitService#groupCommit()
101+
* <p>
102+
* RocksDB CQ is build by RocksGroupCommitService, so we do not need to notify longPolling in
103+
* ReputMessageService
104+
*/
96105
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
97106
super(messageStore);
107+
messageStore.setNotifyMessageArriveInBatch(true);
98108

99109
this.storePath = StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
100110
this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, storePath);

0 commit comments

Comments
 (0)