@@ -738,6 +738,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
738738 // recover by the minimum time stamp
739739 boolean checkCRCOnRecover = this .defaultMessageStore .getMessageStoreConfig ().isCheckCRCOnRecover ();
740740 boolean checkDupInfo = this .defaultMessageStore .getMessageStoreConfig ().isDuplicationEnable ();
741+ boolean checkCommitLogOffsetOnRecover = this .defaultMessageStore .getMessageStoreConfig ().isCheckCommitLogOffsetOnRecover ();
741742 int maxRecoverNum = this .defaultMessageStore .getMessageStoreConfig ().getCommitLogRecoverMaxNum ();
742743 if (maxRecoverNum <= 0 ) {
743744 maxRecoverNum = 10 ;
@@ -792,8 +793,18 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
792793 while (true ) {
793794 DispatchRequest dispatchRequest = this .checkMessageAndReturnSize (byteBuffer , checkCRCOnRecover , checkDupInfo );
794795 int size = dispatchRequest .getMsgSize ();
795-
796796 if (dispatchRequest .isSuccess ()) {
797+ // Check commitlog offset validity if enabled
798+ if (checkCommitLogOffsetOnRecover ) {
799+ if (dispatchRequest .getCommitLogOffset () < mappedFile .getFileFromOffset ()
800+ || dispatchRequest .getCommitLogOffset () > mappedFile .getFileFromOffset () + mappedFile .getFileSize ()) {
801+ log .warn ("found illegal commitlog offset {} in {}, current pos={}, it will be truncated." ,
802+ dispatchRequest .getCommitLogOffset (), mappedFile .getFileName (), processOffset + mappedFileOffset );
803+ log .info ("recover physics file end, {} pos={}" , mappedFile .getFileName (), byteBuffer .position ());
804+
805+ break ;
806+ }
807+ }
797808 // Normal data
798809 if (size > 0 ) {
799810 lastValidMsgPhyOffset = processOffset + mappedFileOffset ;
@@ -925,7 +936,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
925936 return isMappedFileMatchedRecover (phyOffset , storeTimestamp , recoverNormally );
926937 }
927938
928- private boolean isMappedFileMatchedRecover (long phyOffset , long storeTimestamp , boolean recoverNormally ) throws RocksDBException {
939+ private boolean isMappedFileMatchedRecover (long phyOffset , long storeTimestamp ,
940+ boolean recoverNormally ) throws RocksDBException {
929941 boolean result = this .defaultMessageStore .getQueueStore ().isMappedFileMatchedRecover (phyOffset , storeTimestamp , recoverNormally );
930942 if (null != this .defaultMessageStore .getTransMessageRocksDBStore () && defaultMessageStore .getMessageStoreConfig ().isTransRocksDBEnable () && !defaultMessageStore .getMessageStoreConfig ().isTransWriteOriginTransHalfEnable ()) {
931943 result = result && this .defaultMessageStore .getTransMessageRocksDBStore ().isMappedFileMatchedRecover (phyOffset );
@@ -2386,7 +2398,7 @@ public void run() {
23862398 long costTime = this .systemClock .now () - beginClockTimestamp ;
23872399 log .info ("[{}] scanFilesInPageCache-cost {} ms." , costTime > 30 * 1000 ? "NOTIFYME" : "OK" , costTime );
23882400 } catch (Throwable e ) {
2389- log .warn ("{} service has e: " , this .getServiceName () , e );
2401+ log .warn ("{} service has e: " , this .getServiceName (), e );
23902402 }
23912403 }
23922404 log .info ("{} service end" , this .getServiceName ());
0 commit comments