From 6bba3068c73e597f9630ee2869c5208ddd540a05 Mon Sep 17 00:00:00 2001 From: guyinyou Date: Tue, 20 Jan 2026 18:59:16 +0800 Subject: [PATCH] Add PreprocessHandler interface in AllocateMappedFileService Change-Id: I4e81916a79f89c095ffb7b860c8ccd49e88c76ea --- .../store/AllocateMappedFileService.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 7664e284ec8..85042fdbc97 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -45,12 +45,31 @@ public class AllocateMappedFileService extends ServiceThread { new PriorityBlockingQueue<>(); private volatile boolean hasException = false; private DefaultMessageStore messageStore; + private PreprocessHandler preprocessHandler; public AllocateMappedFileService(DefaultMessageStore messageStore) { this.messageStore = messageStore; } + /** + * Set preprocess handler for external extension + * + * @param preprocessHandler the preprocess handler + */ + public void setPreprocessHandler(PreprocessHandler preprocessHandler) { + this.preprocessHandler = preprocessHandler; + } + public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { + // Execute preprocess logic if handler is set + final PreprocessHandler finalPreprocessHandler = this.preprocessHandler; + if (finalPreprocessHandler != null) { + try { + finalPreprocessHandler.preprocess(nextFilePath, nextNextFilePath, fileSize); + } catch (Throwable t) { + log.warn("Preprocess handler in AllocateMappedFileService execution failed", t); + } + } int canSubmitRequests = 2; if (this.messageStore.isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() @@ -230,6 +249,21 @@ private boolean mmapOperation() { return true; } + /** + * Preprocess handler interface for external extension + */ + @FunctionalInterface + public interface PreprocessHandler { + /** + * Preprocess before allocating mapped file + * + * @param nextFilePath the next file path + * @param nextNextFilePath the next next file path + * @param fileSize the file size + */ + void preprocess(String nextFilePath, String nextNextFilePath, int fileSize); + } + static class AllocateRequest implements Comparable { // Full file path private String filePath;