diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index bec75fe2fb6..8ba9834702e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -546,7 +546,7 @@ private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String m } } - public class HandlePutResultTask implements Runnable { + class HandlePutResultTask implements Runnable { private final int delayLevel; public HandlePutResultTask(int delayLevel) { @@ -558,6 +558,12 @@ public void run() { LinkedBlockingQueue pendingQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); + // Check if the queue exists for the given level + if (pendingQueue == null) { + log.warn("No pending queue found for delay level: {}", this.delayLevel); + return; + } + PutResultProcess putResultProcess; while ((putResultProcess = pendingQueue.peek()) != null) { try { @@ -599,7 +605,7 @@ private void scheduleNextTask() { } } - public class PutResultProcess { + class PutResultProcess { private String topic; private long offset; private long physicOffset; @@ -824,7 +830,7 @@ public String toString() { } } - public enum ProcessStatus { + enum ProcessStatus { /** * In process, the processing result has not yet been returned. */