Skip to content

Commit 94ba5e1

Browse files
guyinyouguyinyou
andauthored
[ISSUE #10031] Add PreprocessHandler interface in AllocateMappedFileService
Change-Id: I4e81916a79f89c095ffb7b860c8ccd49e88c76ea Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 36adf12 commit 94ba5e1

1 file changed

Lines changed: 34 additions & 0 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,31 @@ public class AllocateMappedFileService extends ServiceThread {
4545
new PriorityBlockingQueue<>();
4646
private volatile boolean hasException = false;
4747
private DefaultMessageStore messageStore;
48+
private PreprocessHandler preprocessHandler;
4849

4950
public AllocateMappedFileService(DefaultMessageStore messageStore) {
5051
this.messageStore = messageStore;
5152
}
5253

54+
/**
55+
* Set preprocess handler for external extension
56+
*
57+
* @param preprocessHandler the preprocess handler
58+
*/
59+
public void setPreprocessHandler(PreprocessHandler preprocessHandler) {
60+
this.preprocessHandler = preprocessHandler;
61+
}
62+
5363
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
64+
// Execute preprocess logic if handler is set
65+
final PreprocessHandler finalPreprocessHandler = this.preprocessHandler;
66+
if (finalPreprocessHandler != null) {
67+
try {
68+
finalPreprocessHandler.preprocess(nextFilePath, nextNextFilePath, fileSize);
69+
} catch (Throwable t) {
70+
log.warn("Preprocess handler in AllocateMappedFileService execution failed", t);
71+
}
72+
}
5473
int canSubmitRequests = 2;
5574
if (this.messageStore.isTransientStorePoolEnable()) {
5675
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
@@ -230,6 +249,21 @@ private boolean mmapOperation() {
230249
return true;
231250
}
232251

252+
/**
253+
* Preprocess handler interface for external extension
254+
*/
255+
@FunctionalInterface
256+
public interface PreprocessHandler {
257+
/**
258+
* Preprocess before allocating mapped file
259+
*
260+
* @param nextFilePath the next file path
261+
* @param nextNextFilePath the next next file path
262+
* @param fileSize the file size
263+
*/
264+
void preprocess(String nextFilePath, String nextNextFilePath, int fileSize);
265+
}
266+
233267
static class AllocateRequest implements Comparable<AllocateRequest> {
234268
// Full file path
235269
private String filePath;

0 commit comments

Comments
 (0)