2121import java .util .List ;
2222import java .util .concurrent .CompletableFuture ;
2323import java .util .concurrent .CopyOnWriteArrayList ;
24+ import java .util .concurrent .TimeUnit ;
2425import java .util .concurrent .locks .ReentrantReadWriteLock ;
2526import java .util .stream .Collectors ;
2627import org .apache .rocketmq .tieredstore .common .AppendResult ;
@@ -70,16 +71,40 @@ public void recover() {
7071 this .fileSegmentTable .addAll (fileSegmentList .stream ().sorted ().collect (Collectors .toList ()));
7172 }
7273
74+ /**
75+ * Retrieves the correct file size when initializing the file segment.
76+ *
77+ * @param fileSegment The file segment to get the size for.
78+ * @return The correct length if the remote file exists,
79+ * 0 if it does not exist,
80+ * or -1 if the RPC fails.
81+ * @see <a href="https://github.com/apache/rocketmq/issues/9544">Related GitHub Issue</a>
82+ */
83+ public long getFileCorrectSize (FileSegment fileSegment ) {
84+ while (true ) {
85+ long fileSize = fileSegment .getSize ();
86+ if (fileSize != GET_FILE_SIZE_ERROR ) {
87+ log .debug ("FlatAppendFile get file correct size, filePath={} fileType={}, fileSize={}" ,
88+ fileSegment .getPath (), fileSegment .getFileType (), fileSize );
89+ return fileSize ;
90+ } else {
91+ log .warn ("FlatAppendFile get file correct size error, filePath={}, fileType={}" ,
92+ fileSegment .getPath (), fileSegment .getFileType ());
93+ try {
94+ TimeUnit .MILLISECONDS .sleep (50 );
95+ } catch (InterruptedException e ) {
96+ log .warn ("FlatAppendFile get file correct size interrupted" , e );
97+ }
98+ }
99+ }
100+ }
101+
73102 public void recoverFileSize () {
74103 if (fileSegmentTable .isEmpty () || FileSegmentType .INDEX .equals (fileType )) {
75104 return ;
76105 }
77106 FileSegment fileSegment = fileSegmentTable .get (fileSegmentTable .size () - 1 );
78- long fileSize = fileSegment .getSize ();
79- if (fileSize == GET_FILE_SIZE_ERROR ) {
80- log .warn ("FlatAppendFile get last file size error, filePath: {}" , this .filePath );
81- return ;
82- }
107+ long fileSize = this .getFileCorrectSize (fileSegment );
83108 if (fileSegment .getCommitPosition () != fileSize ) {
84109 fileSegment .initPosition (fileSize );
85110 flushFileSegmentMeta (fileSegment );
@@ -90,7 +115,7 @@ public void recoverFileSize() {
90115 public void initOffset (long offset ) {
91116 if (this .fileSegmentTable .isEmpty ()) {
92117 FileSegment fileSegment = fileSegmentFactory .createSegment (fileType , filePath , offset );
93- fileSegment .initPosition (fileSegment . getSize ( ));
118+ fileSegment .initPosition (this . getFileCorrectSize ( fileSegment ));
94119 this .flushFileSegmentMeta (fileSegment );
95120 this .fileSegmentTable .add (fileSegment );
96121 }
0 commit comments