1717package org .apache .rocketmq .store .timer ;
1818
1919import com .conversantmedia .util .concurrent .DisruptorBlockingQueue ;
20+ import io .opentelemetry .api .common .Attributes ;
2021import java .io .File ;
2122import java .io .IOException ;
2223import java .nio .ByteBuffer ;
4041import java .util .concurrent .TimeUnit ;
4142import java .util .concurrent .atomic .AtomicInteger ;
4243import java .util .function .Function ;
43- import io .opentelemetry .api .common .Attributes ;
4444import org .apache .commons .collections .CollectionUtils ;
4545import org .apache .commons .lang3 .math .NumberUtils ;
4646import org .apache .rocketmq .common .ServiceThread ;
6161import org .apache .rocketmq .store .DefaultMessageStore ;
6262import org .apache .rocketmq .store .MessageStore ;
6363import org .apache .rocketmq .store .PutMessageResult ;
64+ import org .apache .rocketmq .store .RunningFlags ;
6465import org .apache .rocketmq .store .SelectMappedBufferResult ;
6566import org .apache .rocketmq .store .config .BrokerRole ;
6667import org .apache .rocketmq .store .config .MessageStoreConfig ;
@@ -160,6 +161,8 @@ public class TimerMessageStore {
160161 private final BrokerStatsManager brokerStatsManager ;
161162 private Function <MessageExtBrokerInner , PutMessageResult > escapeBridgeHook ;
162163
164+ private final Object lockWhenFlush = new Object ();
165+
163166 public TimerMessageStore (final MessageStore messageStore , final MessageStoreConfig storeConfig ,
164167 TimerCheckpoint timerCheckpoint , TimerMetrics timerMetrics ,
165168 final BrokerStatsManager brokerStatsManager ) throws IOException {
@@ -172,9 +175,29 @@ public TimerMessageStore(final MessageStore messageStore, final MessageStoreConf
172175
173176 // TimerWheel contains the fixed number of slots regardless of precision.
174177 this .slotsTotal = TIMER_WHEEL_TTL_DAY * DAY_SECS ;
178+
179+ String timerWheelPath = getTimerWheelPath (storeConfig .getStorePathRootDir ());
180+ long snapOffset = -1 ;
181+ if (storeConfig .isTimerWheelSnapshotFlush ()) {
182+ snapOffset = TimerWheel .getMaxSnapshotFlag (timerWheelPath );
183+ if (snapOffset > 0 ) {
184+ // correct recover offset
185+ timerCheckpoint .setLastTimerLogFlushPos (snapOffset );
186+ LOGGER .info ("found timerWheel snapshot offset {}" , snapOffset );
187+ } else {
188+ LOGGER .info ("not found timerWheel snapshot" , snapOffset );
189+ }
190+ }
191+
192+ RunningFlags runningFlags = null ;
193+ if (storeConfig .isEnableRunningFlagsInFlush () && messageStore != null ) {
194+ runningFlags = messageStore .getRunningFlags ();
195+ }
196+
175197 this .timerWheel = new TimerWheel (
176- getTimerWheelPath (storeConfig .getStorePathRootDir ()), this .slotsTotal , precisionMs );
177- this .timerLog = new TimerLog (getTimerLogPath (storeConfig .getStorePathRootDir ()), timerLogFileSize );
198+ timerWheelPath , this .slotsTotal , precisionMs , snapOffset );
199+ this .timerLog = new TimerLog (getTimerLogPath (storeConfig .getStorePathRootDir ()), timerLogFileSize ,
200+ runningFlags , storeConfig .isWriteWithoutMmap ());
178201 this .timerMetrics = timerMetrics ;
179202 this .timerCheckpoint = timerCheckpoint ;
180203 this .lastBrokerRole = storeConfig .getBrokerRole ();
@@ -617,7 +640,6 @@ public void shutdown() {
617640 }
618641 }
619642
620-
621643 protected void maybeMoveWriteTime () {
622644 if (currWriteTimeMs < formatTimeMs (System .currentTimeMillis ())) {
623645 currWriteTimeMs = formatTimeMs (System .currentTimeMillis ());
@@ -959,7 +981,7 @@ public void checkDequeueLatch(CountDownLatch latch, long delayedTime) throws Exc
959981 LOGGER .info ("Not Running dequeue, skip checkDequeueLatch for delayedTime:{}" , delayedTime );
960982 break ;
961983 }
962-
984+
963985 if (dequeuePutQueue .size () > 0
964986 || !checkStateForGetMessages (AbstractStateService .WAITING )
965987 || !checkStateForPutMessages (AbstractStateService .WAITING )) {
@@ -1478,7 +1500,13 @@ protected void fetchAndPutTimerRequest() throws Exception {
14781500 CountDownLatch latch = new CountDownLatch (trs .size ());
14791501 for (TimerRequest req : trs ) {
14801502 req .setLatch (latch );
1481- this .putMessageToTimerWheel (req );
1503+ if (storeConfig .isTimerWheelSnapshotFlush ()) {
1504+ synchronized (lockWhenFlush ) {
1505+ this .putMessageToTimerWheel (req );
1506+ }
1507+ } else {
1508+ this .putMessageToTimerWheel (req );
1509+ }
14821510 }
14831511 checkDequeueLatch (latch , -1 );
14841512 boolean allSuccess = trs .stream ().allMatch (TimerRequest ::isSucc );
@@ -1790,7 +1818,8 @@ public boolean needDelete(int magic) {
17901818 public class TimerFlushService extends ServiceThread {
17911819 private final SimpleDateFormat sdf = new SimpleDateFormat ("MM-dd HH:mm:ss" );
17921820
1793- @ Override public String getServiceName () {
1821+ @ Override
1822+ public String getServiceName () {
17941823 String brokerIdentifier = "" ;
17951824 if (TimerMessageStore .this .messageStore instanceof DefaultMessageStore && ((DefaultMessageStore ) TimerMessageStore .this .messageStore ).getBrokerConfig ().isInBrokerContainer ()) {
17961825 brokerIdentifier = ((DefaultMessageStore ) TimerMessageStore .this .messageStore ).getBrokerConfig ().getIdentifier ();
@@ -1805,33 +1834,55 @@ private String format(long time) {
18051834 @ Override
18061835 public void run () {
18071836 TimerMessageStore .LOGGER .info (this .getServiceName () + " service start" );
1808- long start = System .currentTimeMillis ();
18091837 while (!this .isStopped ()) {
18101838 try {
1811- prepareTimerCheckPoint ();
1812- timerLog .getMappedFileQueue ().flush (0 );
1813- timerWheel .flush ();
1814- timerCheckpoint .flush ();
1815- if (System .currentTimeMillis () - start > storeConfig .getTimerProgressLogIntervalMs ()) {
1816- start = System .currentTimeMillis ();
1817- long tmpQueueOffset = currQueueOffset ;
1818- ConsumeQueueInterface cq = messageStore .getConsumeQueue (TIMER_TOPIC , 0 );
1819- long maxOffsetInQueue = cq == null ? 0 : cq .getMaxOffsetInQueue ();
1820- TimerMessageStore .LOGGER .info ("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " +
1821- "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}" ,
1822- storeConfig .getBrokerRole (),
1823- format (commitReadTimeMs ), format (currReadTimeMs ), format (currWriteTimeMs ), getDequeueBehind (),
1824- tmpQueueOffset , maxOffsetInQueue - tmpQueueOffset , timerCheckpoint .getMasterTimerQueueOffset () - tmpQueueOffset ,
1825- enqueuePutQueue .size (), dequeueGetQueue .size (), dequeuePutQueue .size (), getAllCongestNum (), format (lastEnqueueButExpiredStoreTime ));
1826- }
1827- timerMetrics .persist ();
1828- waitForRunning (storeConfig .getTimerFlushIntervalMs ());
1839+ this .flush ();
18291840 } catch (Throwable e ) {
18301841 TimerMessageStore .LOGGER .error ("Error occurred in " + getServiceName (), e );
18311842 }
1843+ try {
1844+ waitForRunning (storeConfig .getTimerFlushIntervalMs ());
1845+ } catch (Throwable e ) {
1846+ // ignore interrupt
1847+ }
18321848 }
18331849 TimerMessageStore .LOGGER .info (this .getServiceName () + " service end" );
18341850 }
1851+
1852+ long start = System .currentTimeMillis ();
1853+ long lastSnapshotTime = System .currentTimeMillis ();
1854+
1855+ public void flush () throws IOException {
1856+ if (storeConfig .isTimerWheelSnapshotFlush ()) {
1857+ synchronized (lockWhenFlush ) {
1858+ prepareTimerCheckPoint ();
1859+ timerLog .getMappedFileQueue ().flush (0 );
1860+ if (System .currentTimeMillis () - lastSnapshotTime > storeConfig .getTimerWheelSnapshotIntervalMs ()) {
1861+ lastSnapshotTime = System .currentTimeMillis ();
1862+ timerWheel .backup (timerLog .getMappedFileQueue ().getFlushedWhere ());
1863+ }
1864+ timerCheckpoint .flush ();
1865+ }
1866+ } else {
1867+ prepareTimerCheckPoint ();
1868+ timerLog .getMappedFileQueue ().flush (0 );
1869+ timerWheel .flush ();
1870+ timerCheckpoint .flush ();
1871+ }
1872+ if (System .currentTimeMillis () - start > storeConfig .getTimerProgressLogIntervalMs ()) {
1873+ start = System .currentTimeMillis ();
1874+ long tmpQueueOffset = currQueueOffset ;
1875+ ConsumeQueueInterface cq = messageStore .getConsumeQueue (TIMER_TOPIC , 0 );
1876+ long maxOffsetInQueue = cq == null ? 0 : cq .getMaxOffsetInQueue ();
1877+ TimerMessageStore .LOGGER .info ("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " +
1878+ "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}" ,
1879+ storeConfig .getBrokerRole (),
1880+ format (commitReadTimeMs ), format (currReadTimeMs ), format (currWriteTimeMs ), getDequeueBehind (),
1881+ tmpQueueOffset , maxOffsetInQueue - tmpQueueOffset , timerCheckpoint .getMasterTimerQueueOffset () - tmpQueueOffset ,
1882+ enqueuePutQueue .size (), dequeueGetQueue .size (), dequeuePutQueue .size (), getAllCongestNum (), format (lastEnqueueButExpiredStoreTime ));
1883+ }
1884+ timerMetrics .persist ();
1885+ }
18351886 }
18361887
18371888 public long getAllCongestNum () {
@@ -2023,4 +2074,8 @@ public TimerCheckpoint getTimerCheckpoint() {
20232074 public static String buildDeleteKey (String realTopic , String uniqueKey ) {
20242075 return realTopic + "+" + uniqueKey ;
20252076 }
2077+
2078+ public TimerFlushService getTimerFlushService () {
2079+ return timerFlushService ;
2080+ }
20262081}
0 commit comments