Skip to content

Commit 897b01c

Browse files
Refactor part I
1 parent d99f15c commit 897b01c

10 files changed

Lines changed: 121 additions & 60 deletions

File tree

datafusion-cli/tests/cli_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,11 @@ fn bind_to_settings(snapshot_name: &str) -> SettingsBindDropGuard {
261261
"Consumer(can spill: bool) consumed XB, peak XB",
262262
);
263263
settings.add_filter(
264-
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
264+
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total memory pool: '.*?'",
265265
"Error: Failed to allocate ",
266266
);
267267
settings.add_filter(
268-
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
268+
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total memory pool: '.*?'",
269269
"Resources exhausted: Failed to allocate",
270270
);
271271

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ exit_code: 1
1616
[CLI_VERSION]
1717
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1818
caused by
19-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
19+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
2020
Consumer(can spill: bool) consumed XB, peak XB,
2121
Consumer(can spill: bool) consumed XB, peak XB.
22-
Error: Failed to allocate
22+
Error: Failed to allocate
2323

2424
----- stderr -----

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ exit_code: 1
1414
[CLI_VERSION]
1515
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1616
caused by
17-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
17+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
1818
Consumer(can spill: bool) consumed XB, peak XB,
1919
Consumer(can spill: bool) consumed XB, peak XB,
2020
Consumer(can spill: bool) consumed XB, peak XB.

datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,14 @@ async fn automatic_usage_example() -> Result<()> {
113113
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit',
114114
or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
115115
caused by
116-
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) using 'greedy' pool as:
116+
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
117117
ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, peak 10.0 MB,
118118
ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, peak 10.0 MB,
119119
ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB,
120120
ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB,
121121
ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB.
122122
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated
123-
for this reservation - 7.1 MB remain available for the total 'greedy' pool
123+
for this reservation - 7.1 MB remain available for the total memory pool
124124
*/
125125
}
126126
}

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async fn group_by_none() {
8787
.with_query("select median(request_bytes) from t")
8888
.with_expected_errors(vec![
8989
"Resources exhausted: Additional allocation failed",
90-
"with top memory consumers (across reservations) using 'greedy' pool as:\n AggregateStream",
90+
"with top memory consumers (across reservations) as:\n AggregateStream",
9191
])
9292
.with_memory_limit(2_000)
9393
.run()
@@ -99,7 +99,7 @@ async fn group_by_row_hash() {
9999
TestCase::new()
100100
.with_query("select count(*) from t GROUP BY response_bytes")
101101
.with_expected_errors(vec![
102-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
102+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
103103
])
104104
.with_memory_limit(2_000)
105105
.run()
@@ -112,7 +112,7 @@ async fn group_by_hash() {
112112
// group by dict column
113113
.with_query("select count(*) from t GROUP BY service, host, pod, container")
114114
.with_expected_errors(vec![
115-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
115+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
116116
])
117117
.with_memory_limit(1_000)
118118
.run()
@@ -126,7 +126,7 @@ async fn join_by_key_multiple_partitions() {
126126
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
127127
.with_expected_errors(vec![
128128
"Resources exhausted: Additional allocation failed",
129-
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
129+
"with top memory consumers (across reservations) as:\n HashJoinInput",
130130
])
131131
.with_memory_limit(1_000)
132132
.with_config(config)
@@ -141,7 +141,7 @@ async fn join_by_key_single_partition() {
141141
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
142142
.with_expected_errors(vec![
143143
"Resources exhausted: Additional allocation failed",
144-
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
144+
"with top memory consumers (across reservations) as:\n HashJoinInput",
145145
])
146146
.with_memory_limit(1_000)
147147
.with_config(config)
@@ -154,7 +154,7 @@ async fn join_by_expression() {
154154
TestCase::new()
155155
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
156156
.with_expected_errors(vec![
157-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n NestedLoopJoinLoad[0]",
157+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
158158
])
159159
.with_memory_limit(1_000)
160160
.run()
@@ -167,7 +167,7 @@ async fn cross_join() {
167167
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
168168
.with_expected_errors(vec![
169169
"Resources exhausted: Additional allocation failed",
170-
"with top memory consumers (across reservations) using 'greedy' pool as:\n CrossJoinExec",
170+
"with top memory consumers (across reservations) as:\n CrossJoinExec",
171171
])
172172
.with_memory_limit(1_000)
173173
.run()
@@ -224,7 +224,7 @@ async fn symmetric_hash_join() {
224224
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
225225
)
226226
.with_expected_errors(vec![
227-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SymmetricHashJoinStream",
227+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
228228
])
229229
.with_memory_limit(1_000)
230230
.with_scenario(Scenario::AccessLogStreaming)
@@ -242,7 +242,7 @@ async fn sort_preserving_merge() {
242242
// so only a merge is needed
243243
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
244244
.with_expected_errors(vec![
245-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SortPreservingMergeExec",
245+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
246246
])
247247
// provide insufficient memory to merge
248248
.with_memory_limit(partition_size / 2)
@@ -322,7 +322,7 @@ async fn sort_spill_reservation() {
322322
test.clone()
323323
.with_expected_errors(vec![
324324
"Resources exhausted: Additional allocation failed",
325-
"with top memory consumers (across reservations) using 'greedy' pool as:",
325+
"with top memory consumers (across reservations) as:",
326326
"B for ExternalSorterMerge",
327327
])
328328
.with_config(config)
@@ -353,7 +353,7 @@ async fn oom_recursive_cte() {
353353
)
354354
.with_expected_errors(vec![
355355
"Resources exhausted: Additional allocation failed",
356-
"with top memory consumers (across reservations) using 'greedy' pool as:\n RecursiveQuery",
356+
"with top memory consumers (across reservations) as:\n RecursiveQuery",
357357
])
358358
.with_memory_limit(2_000)
359359
.run()
@@ -405,7 +405,7 @@ async fn oom_with_tracked_consumer_pool() {
405405
.with_expected_errors(vec![
406406
"Failed to allocate additional",
407407
"for ParquetSink(ArrowColumnWriter)",
408-
"Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n ParquetSink(ArrowColumnWriter)"
408+
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
409409
])
410410
.with_memory_pool(Arc::new(
411411
TrackConsumersPool::new(

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! help with allocation accounting.
2020
2121
use datafusion_common::{Result, internal_datafusion_err};
22+
use std::fmt::Display;
2223
use std::hash::{Hash, Hasher};
2324
use std::{cmp::Ordering, sync::Arc, sync::atomic};
2425

@@ -181,7 +182,7 @@ pub use pool::*;
181182
///
182183
/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
183184
/// providing better error messages on the largest memory users.
184-
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
185+
pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display {
185186
/// Return pool name
186187
fn name(&self) -> &str;
187188

@@ -235,7 +236,7 @@ pub enum MemoryLimit {
235236
/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
236237
/// a particular `MemoryConsumer`;
237238
///
238-
/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
239+
/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefore not cloneable,
239240
/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
240241
/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
241242
/// and is only guaranteed to share the name and inner properties.

0 commit comments

Comments
 (0)