Skip to content

Commit 10d6c19

Browse files
Fix test_tracked_consumers_pool_deregister
1 parent 88e9856 commit 10d6c19

File tree

1 file changed

+77
-64
lines changed
  • datafusion/execution/src/memory_pool

1 file changed

+77
-64
lines changed

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 77 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
482482
}
483483
}
484484

485+
/// Returns a reference to the wrapped inner [`MemoryPool`].
486+
pub fn inner(&self) -> &I {
487+
&self.inner
488+
}
489+
485490
/// Returns a snapshot of all currently tracked consumers.
486491
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
487492
self.tracked_consumers
@@ -624,7 +629,7 @@ fn provide_top_memory_consumers_to_error_msg(
624629
#[cfg(test)]
625630
mod tests {
626631
use super::*;
627-
use insta::{Settings, allow_duplicates, assert_snapshot};
632+
use insta::{Settings, allow_duplicates, assert_snapshot, with_settings};
628633
use std::sync::Arc;
629634

630635
fn make_settings() -> Settings {
@@ -824,72 +829,80 @@ mod tests {
824829

825830
#[test]
826831
fn test_tracked_consumers_pool_deregister() {
827-
fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
828-
// Baseline: see the 2 memory consumers
829-
let setting = make_settings();
830-
let _bound = setting.bind_to_scope();
831-
let r0 = MemoryConsumer::new("r0").register(&pool);
832-
r0.grow(10);
833-
let r1_consumer = MemoryConsumer::new("r1");
834-
let r1 = r1_consumer.register(&pool);
835-
r1.grow(20);
836-
837-
let res = r0.try_grow(150);
838-
assert!(res.is_err());
839-
let error = res.unwrap_err().strip_backtrace();
840-
allow_duplicates!(assert_snapshot!(error, @r"
841-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
842-
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
843-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
844-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
845-
"));
846-
847-
// Test: unregister one
848-
// only the remaining one should be listed
849-
drop(r1);
850-
let res = r0.try_grow(150);
851-
assert!(res.is_err());
852-
let error = res.unwrap_err().strip_backtrace();
853-
allow_duplicates!(assert_snapshot!(error, @r"
854-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
855-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
856-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
857-
"));
858-
859-
// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
860-
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
861-
let res = r0.try_grow(150);
862-
assert!(res.is_err());
863-
let error = res.unwrap_err().strip_backtrace();
864-
allow_duplicates!(assert_snapshot!(error, @r"
865-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
866-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
867-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
868-
"));
869-
870-
// Test: the registration needs to free itself (or be dropped),
871-
// for the proper error message
872-
let res = r0.try_grow(150);
873-
assert!(res.is_err());
874-
let error = res.unwrap_err().strip_backtrace();
875-
allow_duplicates!(assert_snapshot!(error, @r"
876-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
877-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
878-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
879-
"));
832+
fn test_per_pool_type<P: MemoryPool + 'static>(pool: Arc<TrackConsumersPool<P>>) {
833+
// `snapshot_suffix` ties each insta snapshot to this pool's inner backend; filters
834+
// normalize inner pool `Display` so fair vs greedy share the same `@` reference text.
835+
with_settings!({
836+
snapshot_suffix => pool.inner().name().to_string(),
837+
filters => vec![
838+
(
839+
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
840+
"$1#[ID](can spill: $2)",
841+
),
842+
(
843+
r"for the total memory pool: [^\n]+",
844+
"for the total memory pool: [INNER_POOL]",
845+
),
846+
],
847+
}, {
848+
let memory_pool: Arc<dyn MemoryPool> = Arc::<TrackConsumersPool<P>>::clone(&pool);
849+
let r0 = MemoryConsumer::new("r0").register(&memory_pool);
850+
r0.grow(10);
851+
let r1 = MemoryConsumer::new("r1").register(&memory_pool);
852+
r1.grow(20);
853+
854+
// Baseline: see the 2 memory consumers
855+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
856+
assert_snapshot!(error, @r"
857+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
858+
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
859+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
860+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL]
861+
");
862+
863+
// Test: unregister one — only the remaining consumer should be listed
864+
drop(r1);
865+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
866+
assert_snapshot!(error, @r"
867+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
868+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
869+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
870+
");
871+
872+
// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
873+
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
874+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
875+
assert_snapshot!(error, @r"
876+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
877+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
878+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
879+
");
880+
881+
// Test: the registration needs to free itself (or be dropped),
882+
// for the proper error message
883+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
884+
assert_snapshot!(error, @r"
885+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
886+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
887+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
888+
");
889+
}
890+
);
880891
}
881892

882-
let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
883-
FairSpillPool::new(100),
884-
NonZeroUsize::new(3).unwrap(),
885-
));
886-
test_per_pool_type(tracked_spill_pool);
893+
allow_duplicates! {
894+
let tracked_spill_pool = Arc::new(TrackConsumersPool::new(
895+
FairSpillPool::new(100),
896+
NonZeroUsize::new(3).unwrap(),
897+
));
898+
test_per_pool_type(tracked_spill_pool);
887899

888-
let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
889-
GreedyMemoryPool::new(100),
890-
NonZeroUsize::new(3).unwrap(),
891-
));
892-
test_per_pool_type(tracked_greedy_pool);
900+
let tracked_greedy_pool = Arc::new(TrackConsumersPool::new(
901+
GreedyMemoryPool::new(100),
902+
NonZeroUsize::new(3).unwrap(),
903+
));
904+
test_per_pool_type(tracked_greedy_pool);
905+
}
893906
}
894907

895908
#[test]

0 commit comments

Comments
 (0)