Skip to content

Commit 9f10d38

Browse files
authored
[ISSUE #9313] Add scheduled clean task. (#9314)
* add scheduled clean task. * make code compatible to lmq. * make code compatible to lmq. * make code compatible to lmq. * make code compatible to lmq.
1 parent 7ef3610 commit 9f10d38

4 files changed

Lines changed: 28 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,18 @@ public void run() {
691691
}
692692
}, 10, 1, TimeUnit.SECONDS);
693693

694+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
695+
@Override
696+
public void run() {
697+
try {
698+
BrokerController.this.messageStore.getTimerMessageStore().getTimerMetrics()
699+
.cleanMetrics(BrokerController.this.topicConfigManager.getTopicConfigTable().keySet());
700+
} catch (Throwable e) {
701+
LOG.error("BrokerController: failed to clean unused timer metrics.", e);
702+
}
703+
}
704+
}, 3, 3, TimeUnit.MINUTES);
705+
694706
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
695707

696708
@Override

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,7 @@ private void deleteTopicInBroker(String topic) {
825825
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
826826
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
827827
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
828+
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
828829
}
829830

830831
private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request) {

broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ public void init() throws Exception {
259259

260260
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig(topic));
261261
brokerController.getMessageStoreConfig().setTimerWheelEnable(false);
262+
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore);
263+
when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
262264
}
263265

264266
@After

store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.locks.Lock;
3838
import java.util.concurrent.locks.ReentrantLock;
3939
import org.apache.rocketmq.common.ConfigManager;
40+
import org.apache.rocketmq.common.MixAll;
4041
import org.apache.rocketmq.common.constant.LoggerName;
4142
import org.apache.rocketmq.common.message.MessageConst;
4243
import org.apache.rocketmq.common.message.MessageExt;
@@ -184,7 +185,8 @@ public void cleanMetrics(Set<String> topics) {
184185
while (iterator.hasNext()) {
185186
Map.Entry<String, Metric> entry = iterator.next();
186187
final String topic = entry.getKey();
187-
if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
188+
if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)
189+
|| topic.startsWith(MixAll.LMQ_PREFIX)) {
188190
continue;
189191
}
190192
if (topics.contains(topic)) {
@@ -196,6 +198,16 @@ public void cleanMetrics(Set<String> topics) {
196198
}
197199
}
198200

201+
public boolean removeTimingCount(String topic) {
202+
try {
203+
timingCount.remove(topic);
204+
} catch (Exception e) {
205+
log.error("removeTimingCount error", e);
206+
return false;
207+
}
208+
return true;
209+
}
210+
199211
public static class TimerMetricsSerializeWrapper extends RemotingSerializable {
200212
private ConcurrentMap<String, Metric> timingCount =
201213
new ConcurrentHashMap<>(1024);

0 commit comments

Comments
 (0)