Skip to content

Commit d99f15c

Browse files
feat: Expose used MemoryPool in ResourcesExhausted error messages
1 parent d68373e commit d99f15c

10 files changed

Lines changed: 78 additions & 50 deletions

File tree

datafusion-cli/tests/cli_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,11 @@ fn bind_to_settings(snapshot_name: &str) -> SettingsBindDropGuard {
261261
"Consumer(can spill: bool) consumed XB, peak XB",
262262
);
263263
settings.add_filter(
264-
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
264+
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
265265
"Error: Failed to allocate ",
266266
);
267267
settings.add_filter(
268-
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
268+
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
269269
"Resources exhausted: Failed to allocate",
270270
);
271271

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ exit_code: 1
1616
[CLI_VERSION]
1717
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1818
caused by
19-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
19+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
2020
Consumer(can spill: bool) consumed XB, peak XB,
2121
Consumer(can spill: bool) consumed XB, peak XB.
2222
Error: Failed to allocate

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ exit_code: 1
1414
[CLI_VERSION]
1515
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1616
caused by
17-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
17+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
1818
Consumer(can spill: bool) consumed XB, peak XB,
1919
Consumer(can spill: bool) consumed XB, peak XB,
2020
Consumer(can spill: bool) consumed XB, peak XB.

datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,14 @@ async fn automatic_usage_example() -> Result<()> {
113113
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit',
114114
or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
115115
caused by
116-
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
116+
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) using 'greedy' pool as:
117117
ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, peak 10.0 MB,
118118
ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, peak 10.0 MB,
119119
ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB,
120120
ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB,
121121
ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB.
122-
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool
122+
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated
123+
for this reservation - 7.1 MB remain available for the total 'greedy' pool
123124
*/
124125
}
125126
}

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async fn group_by_none() {
8787
.with_query("select median(request_bytes) from t")
8888
.with_expected_errors(vec![
8989
"Resources exhausted: Additional allocation failed",
90-
"with top memory consumers (across reservations) as:\n AggregateStream",
90+
"with top memory consumers (across reservations) using 'greedy' pool as:\n AggregateStream",
9191
])
9292
.with_memory_limit(2_000)
9393
.run()
@@ -99,7 +99,7 @@ async fn group_by_row_hash() {
9999
TestCase::new()
100100
.with_query("select count(*) from t GROUP BY response_bytes")
101101
.with_expected_errors(vec![
102-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
102+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
103103
])
104104
.with_memory_limit(2_000)
105105
.run()
@@ -112,7 +112,7 @@ async fn group_by_hash() {
112112
// group by dict column
113113
.with_query("select count(*) from t GROUP BY service, host, pod, container")
114114
.with_expected_errors(vec![
115-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
115+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
116116
])
117117
.with_memory_limit(1_000)
118118
.run()
@@ -126,7 +126,7 @@ async fn join_by_key_multiple_partitions() {
126126
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
127127
.with_expected_errors(vec![
128128
"Resources exhausted: Additional allocation failed",
129-
"with top memory consumers (across reservations) as:\n HashJoinInput",
129+
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
130130
])
131131
.with_memory_limit(1_000)
132132
.with_config(config)
@@ -141,7 +141,7 @@ async fn join_by_key_single_partition() {
141141
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
142142
.with_expected_errors(vec![
143143
"Resources exhausted: Additional allocation failed",
144-
"with top memory consumers (across reservations) as:\n HashJoinInput",
144+
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
145145
])
146146
.with_memory_limit(1_000)
147147
.with_config(config)
@@ -154,7 +154,7 @@ async fn join_by_expression() {
154154
TestCase::new()
155155
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
156156
.with_expected_errors(vec![
157-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
157+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n NestedLoopJoinLoad[0]",
158158
])
159159
.with_memory_limit(1_000)
160160
.run()
@@ -167,7 +167,7 @@ async fn cross_join() {
167167
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
168168
.with_expected_errors(vec![
169169
"Resources exhausted: Additional allocation failed",
170-
"with top memory consumers (across reservations) as:\n CrossJoinExec",
170+
"with top memory consumers (across reservations) using 'greedy' pool as:\n CrossJoinExec",
171171
])
172172
.with_memory_limit(1_000)
173173
.run()
@@ -224,7 +224,7 @@ async fn symmetric_hash_join() {
224224
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
225225
)
226226
.with_expected_errors(vec![
227-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
227+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SymmetricHashJoinStream",
228228
])
229229
.with_memory_limit(1_000)
230230
.with_scenario(Scenario::AccessLogStreaming)
@@ -242,7 +242,7 @@ async fn sort_preserving_merge() {
242242
// so only a merge is needed
243243
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
244244
.with_expected_errors(vec![
245-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
245+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SortPreservingMergeExec",
246246
])
247247
// provide insufficient memory to merge
248248
.with_memory_limit(partition_size / 2)
@@ -322,7 +322,7 @@ async fn sort_spill_reservation() {
322322
test.clone()
323323
.with_expected_errors(vec![
324324
"Resources exhausted: Additional allocation failed",
325-
"with top memory consumers (across reservations) as:",
325+
"with top memory consumers (across reservations) using 'greedy' pool as:",
326326
"B for ExternalSorterMerge",
327327
])
328328
.with_config(config)
@@ -353,7 +353,7 @@ async fn oom_recursive_cte() {
353353
)
354354
.with_expected_errors(vec![
355355
"Resources exhausted: Additional allocation failed",
356-
"with top memory consumers (across reservations) as:\n RecursiveQuery",
356+
"with top memory consumers (across reservations) using 'greedy' pool as:\n RecursiveQuery",
357357
])
358358
.with_memory_limit(2_000)
359359
.run()
@@ -405,7 +405,7 @@ async fn oom_with_tracked_consumer_pool() {
405405
.with_expected_errors(vec![
406406
"Failed to allocate additional",
407407
"for ParquetSink(ArrowColumnWriter)",
408-
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
408+
"Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n ParquetSink(ArrowColumnWriter)"
409409
])
410410
.with_memory_pool(Arc::new(
411411
TrackConsumersPool::new(

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ pub use pool::*;
182182
/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
183183
/// providing better error messages on the largest memory users.
184184
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
185+
/// Return pool name
186+
fn name(&self) -> &str;
187+
185188
/// Registers a new [`MemoryConsumer`]
186189
///
187190
/// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory

0 commit comments

Comments
 (0)