Skip to content

Commit 8191501

Browse files
Houlong66claude
andcommitted
fix(metrics): snapshot MetricData points before export to prevent AIOOBE
The OTel SDK's NumberDataPointMarshaler.createRepeated allocates an array based on points.size() then iterates. If callback threads concurrently add data points between size() and iteration, an ArrayIndexOutOfBoundsException occurs. This adds a defensive snapshot of all data point collections at the start of export(), ensuring the delegate exporter always receives immutable point collections. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 78bdf82 commit 8191501

File tree

1 file changed

+53
-3
lines changed

1 file changed

+53
-3
lines changed

broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,26 @@ public CompletableResultCode export(
115115
return delegate.export(metrics);
116116
}
117117

118+
// Snapshot to avoid concurrent-modification AIOOBE
119+
// in OTel SDK marshaling (see NumberDataPointMarshaler)
120+
List<MetricData> snapshotted =
121+
snapshotAllMetrics(metrics);
122+
118123
int maxBatchSize =
119124
maxBatchSizeSupplier.getAsInt();
120125

121126
int totalDataPoints = 0;
122-
for (MetricData md : metrics) {
127+
for (MetricData md : snapshotted) {
123128
totalDataPoints +=
124129
md.getData().getPoints().size();
125130
}
126131

127132
if (totalDataPoints <= maxBatchSize) {
128-
return delegate.export(metrics);
133+
return delegate.export(snapshotted);
129134
}
130135

131136
List<List<MetricData>> batches =
132-
splitIntoBatches(metrics, maxBatchSize);
137+
splitIntoBatches(snapshotted, maxBatchSize);
133138

134139
LOGGER.debug(
135140
"Splitting metrics export: "
@@ -184,6 +189,51 @@ private static void logFailedBatch(
184189
batchIndex, names);
185190
}
186191

192+
/**
193+
* Creates defensive snapshots of all MetricData by
194+
* copying their data point collections into new
195+
* ArrayLists. This prevents
196+
* {@link ArrayIndexOutOfBoundsException} in the OTel
197+
* SDK marshaling code when callback threads
198+
* concurrently modify point collections during export.
199+
*
200+
* @param metrics the original metrics collection
201+
* @return list of snapshotted MetricData
202+
*/
203+
private static List<MetricData> snapshotAllMetrics(
204+
final Collection<MetricData> metrics) {
205+
List<MetricData> result =
206+
new ArrayList<>(metrics.size());
207+
for (MetricData md : metrics) {
208+
try {
209+
result.add(snapshotMetricData(md));
210+
} catch (Exception e) {
211+
LOGGER.warn(
212+
"Failed to snapshot MetricData:"
213+
+ " {}, using original",
214+
md.getName(), e);
215+
result.add(md);
216+
}
217+
}
218+
return result;
219+
}
220+
221+
/**
222+
* Creates a snapshot of a single MetricData by copying
223+
* its points into a new ArrayList and reconstructing
224+
* the MetricData with immutable internal data.
225+
*
226+
* @param md the original MetricData
227+
* @return a new MetricData with snapshotted points
228+
*/
229+
private static MetricData snapshotMetricData(
230+
final MetricData md) {
231+
List<PointData> points =
232+
new ArrayList<>(md.getData().getPoints());
233+
return createMetricDataForType(
234+
md, md.getType(), points);
235+
}
236+
187237
/**
188238
* Splits metrics into sub-batches by data point count.
189239
* When a single MetricData has more points than the

0 commit comments

Comments
 (0)