Skip to content

Commit 4a15256

Browse files
authored
[ISSUE #9300] Periodic cleanup of inactive items in StatsItemSet (#9301)
1 parent c5be5c8 commit 4a15256

6 files changed

Lines changed: 111 additions & 2 deletions

File tree

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ public class BrokerConfig extends BrokerIdentity {
130130
private boolean accountStatsEnable = true;
131131
private boolean accountStatsPrintZeroValues = true;
132132

133+
private int maxStatsIdleTimeInMinutes = -1;
134+
133135
private boolean transferMsgByHeap = true;
134136

135137
private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
@@ -1535,6 +1537,14 @@ public void setAccountStatsPrintZeroValues(boolean accountStatsPrintZeroValues)
15351537
this.accountStatsPrintZeroValues = accountStatsPrintZeroValues;
15361538
}
15371539

1540+
public int getMaxStatsIdleTimeInMinutes() {
1541+
return maxStatsIdleTimeInMinutes;
1542+
}
1543+
1544+
public void setMaxStatsIdleTimeInMinutes(int maxStatsIdleTimeInMinutes) {
1545+
this.maxStatsIdleTimeInMinutes = maxStatsIdleTimeInMinutes;
1546+
}
1547+
15381548
public boolean isLockInStrictMode() {
15391549
return lockInStrictMode;
15401550
}

common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class MomentStatsItem {
3131
private final String statsKey;
3232
private final ScheduledExecutorService scheduledExecutorService;
3333
private final Logger log;
34+
private long lastUpdateTimestamp = System.currentTimeMillis();
3435

3536
public MomentStatsItem(String statsName, String statsKey,
3637
ScheduledExecutorService scheduledExecutorService, Logger log) {
@@ -72,4 +73,12 @@ public String getStatsKey() {
7273
public String getStatsName() {
7374
return statsName;
7475
}
76+
77+
public long getLastUpdateTimestamp() {
78+
return lastUpdateTimestamp;
79+
}
80+
81+
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
82+
this.lastUpdateTimestamp = lastUpdateTimestamp;
83+
}
7584
}

common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import java.util.concurrent.ScheduledExecutorService;
2525
import java.util.concurrent.TimeUnit;
2626
import org.apache.rocketmq.common.UtilAll;
27+
import org.apache.rocketmq.common.constant.LoggerName;
2728
import org.apache.rocketmq.logging.org.slf4j.Logger;
29+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2830

2931
public class MomentStatsItemSet {
32+
private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
3033
private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable =
3134
new ConcurrentHashMap<>(128);
3235
private final String statsName;
@@ -72,6 +75,13 @@ private void printAtMinutes() {
7275
public void setValue(final String statsKey, final int value) {
7376
MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
7477
statsItem.getValue().set(value);
78+
statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
79+
}
80+
81+
public void setValue(final String statsKey, final long value) {
82+
MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
83+
statsItem.getValue().set(value);
84+
statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
7585
}
7686

7787
public void delValueByInfixKey(final String statsKey, String separator) {
@@ -109,4 +119,17 @@ public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
109119

110120
return statsItem;
111121
}
122+
123+
public void cleanResource(int maxStatsIdleTimeInMinutes) {
124+
COMMERCIAL_LOG.info("CleanStatisticItem: kind:{}, size:{}", statsName, this.statsItemTable.size());
125+
Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
126+
while (it.hasNext()) {
127+
Entry<String, MomentStatsItem> next = it.next();
128+
MomentStatsItem statsItem = next.getValue();
129+
if (System.currentTimeMillis() - statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
130+
it.remove();
131+
COMMERCIAL_LOG.info("CleanStatisticItem: removeKind:{}, removeKey:{}", statsName, statsItem.getStatsKey());
132+
}
133+
}
134+
}
112135
}

common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class StatsItem {
3838

3939
private final String statsName;
4040
private final String statsKey;
41+
private long lastUpdateTimestamp = System.currentTimeMillis();
4142
private final ScheduledExecutorService scheduledExecutorService;
4243

4344
private final Logger logger;
@@ -229,6 +230,14 @@ public String getStatsName() {
229230
public LongAdder getTimes() {
230231
return times;
231232
}
233+
234+
public long getLastUpdateTimestamp() {
235+
return lastUpdateTimestamp;
236+
}
237+
238+
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
239+
this.lastUpdateTimestamp = lastUpdateTimestamp;
240+
}
232241
}
233242

234243
class CallSnapshot {

common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import java.util.concurrent.ScheduledExecutorService;
2525
import java.util.concurrent.TimeUnit;
2626
import org.apache.rocketmq.common.UtilAll;
27+
import org.apache.rocketmq.common.constant.LoggerName;
2728
import org.apache.rocketmq.logging.org.slf4j.Logger;
29+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2830

2931
public class StatsItemSet {
32+
private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
3033
private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
3134
new ConcurrentHashMap<>(128);
3235

@@ -157,12 +160,14 @@ public void addValue(final String statsKey, final int incValue, final int incTim
157160
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
158161
statsItem.getValue().add(incValue);
159162
statsItem.getTimes().add(incTimes);
163+
statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
160164
}
161165

162166
public void addRTValue(final String statsKey, final int incValue, final int incTimes) {
163167
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
164168
statsItem.getValue().add(incValue);
165169
statsItem.getTimes().add(incTimes);
170+
statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
166171
}
167172

168173
public void delValue(final String statsKey) {
@@ -256,4 +261,18 @@ public StatsSnapshot getStatsDataInDay(final String statsKey) {
256261
public StatsItem getStatsItem(final String statsKey) {
257262
return this.statsItemTable.get(statsKey);
258263
}
264+
265+
266+
public void cleanResource(int maxStatsIdleTimeInMinutes) {
267+
COMMERCIAL_LOG.info("CleanStatisticItemOld: kind:{}, size:{}", statsName, this.statsItemTable.size());
268+
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
269+
while (it.hasNext()) {
270+
Entry<String, StatsItem> next = it.next();
271+
StatsItem statsItem = next.getValue();
272+
if (System.currentTimeMillis() - statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
273+
it.remove();
274+
COMMERCIAL_LOG.info("CleanStatisticItemOld: removeKind:{}, removeKey:{}", statsName, statsItem.getStatsKey());
275+
}
276+
}
277+
}
259278
}

store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.HashMap;
2020
import java.util.concurrent.ScheduledExecutorService;
21+
import java.util.concurrent.TimeUnit;
22+
2123
import org.apache.commons.lang3.tuple.Pair;
2224
import org.apache.rocketmq.common.BrokerConfig;
2325
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -121,6 +123,8 @@ public class BrokerStatsManager {
121123
public static final String CHANNEL_ACTIVITY_IDLE = "IDLE";
122124
public static final String CHANNEL_ACTIVITY_EXCEPTION = "EXCEPTION";
123125
public static final String CHANNEL_ACTIVITY_CLOSE = "CLOSE";
126+
private static final String[] NEED_CLEAN_STATS_SET =
127+
new String[] {TOPIC_PUT_NUMS, TOPIC_PUT_SIZE, GROUP_GET_NUMS, GROUP_GET_SIZE, SNDBCK_PUT_NUMS, GROUP_GET_LATENCY};
124128

125129
/**
126130
* read disk follow stats
@@ -134,6 +138,7 @@ public class BrokerStatsManager {
134138
private ScheduledExecutorService scheduledExecutorService;
135139
private ScheduledExecutorService commercialExecutor;
136140
private ScheduledExecutorService accountExecutor;
141+
private ScheduledExecutorService cleanResourceExecutor;
137142

138143
private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
139144
private final String clusterName;
@@ -277,6 +282,12 @@ public boolean online(StatisticsItem item) {
277282
return false;
278283
}
279284
});
285+
cleanResourceExecutor.scheduleWithFixedDelay(new Runnable() {
286+
@Override
287+
public void run() {
288+
cleanAllResource();
289+
}
290+
}, 10, 10, TimeUnit.MINUTES);
280291
}
281292

282293
private void initScheduleService() {
@@ -286,6 +297,8 @@ private void initScheduleService() {
286297
ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
287298
this.accountExecutor =
288299
ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
300+
this.cleanResourceExecutor =
301+
ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanStatsResourceThread", true, brokerConfig));
289302
}
290303

291304
public MomentStatsItemSet getMomentStatsItemSetFallSize() {
@@ -318,6 +331,7 @@ public void start() {
318331
public void shutdown() {
319332
this.scheduledExecutorService.shutdown();
320333
this.commercialExecutor.shutdown();
334+
this.cleanResourceExecutor.shutdown();
321335
}
322336

323337
public StatsItem getStatsItem(final String statsName, final String statsKey) {
@@ -589,13 +603,13 @@ public double tpsGroupGetNums(final String group, final String topic) {
589603
public void recordDiskFallBehindTime(final String group, final String topic, final int queueId,
590604
final long fallBehind) {
591605
final String statsKey = buildStatsKey(queueId, topic, group);
592-
this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
606+
this.momentStatsItemSetFallTime.setValue(statsKey, fallBehind);
593607
}
594608

595609
public void recordDiskFallBehindSize(final String group, final String topic, final int queueId,
596610
final long fallBehind) {
597611
final String statsKey = buildStatsKey(queueId, topic, group);
598-
this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
612+
this.momentStatsItemSetFallSize.setValue(statsKey, fallBehind);
599613
}
600614

601615
public void incDLQStatValue(final String key, final String owner, final String group,
@@ -764,6 +778,31 @@ public interface StateGetter {
764778
boolean online(String instanceId, String group, String topic);
765779
}
766780

781+
782+
private void cleanAllResource() {
783+
try {
784+
int maxStatsIdleTimeInMinutes = brokerConfig != null ? brokerConfig.getMaxStatsIdleTimeInMinutes() : -1;
785+
if (maxStatsIdleTimeInMinutes < 0) {
786+
COMMERCIAL_LOG.info("[BrokerStatsManager#cleanAllResource] maxStatsIdleTimeInMinutes={}, no need to clean resource", maxStatsIdleTimeInMinutes);
787+
return;
788+
}
789+
if (maxStatsIdleTimeInMinutes <= 10 && maxStatsIdleTimeInMinutes >= 0) {
790+
maxStatsIdleTimeInMinutes = 30;
791+
}
792+
for (String statsKind : NEED_CLEAN_STATS_SET) {
793+
StatsItemSet statsItemSet = this.statsTable.get(statsKind);
794+
if (null == statsItemSet) {
795+
continue;
796+
}
797+
statsItemSet.cleanResource(maxStatsIdleTimeInMinutes);
798+
}
799+
momentStatsItemSetFallSize.cleanResource(maxStatsIdleTimeInMinutes);
800+
momentStatsItemSetFallTime.cleanResource(maxStatsIdleTimeInMinutes);
801+
} catch (Throwable throwable) {
802+
COMMERCIAL_LOG.error("[BrokerStatsManager#cleanAllResource] clean resource error", throwable);
803+
}
804+
}
805+
767806
public enum StatsType {
768807
SEND_SUCCESS,
769808
SEND_FAILURE,

0 commit comments

Comments
 (0)