Skip to content

Commit 9abbd3a

Browse files
Fix test_tracked_consumers_pool_deregister
1 parent 139a405 commit 9abbd3a

File tree

1 file changed

+77
-65
lines changed
  • datafusion/execution/src/memory_pool

1 file changed

+77
-65
lines changed

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 77 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,6 @@ pub struct TrackConsumersPool<I> {
423423

424424
impl<I: MemoryPool> Display for TrackConsumersPool<I> {
425425
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
426-
// let tracked_consumers = self.tracked_consumers.lock();
427426
write!(
428427
f,
429428
"{}",
@@ -482,6 +481,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
482481
}
483482
}
484483

484+
/// Returns a reference to the wrapped inner [`MemoryPool`].
485+
pub fn inner(&self) -> &I {
486+
&self.inner
487+
}
488+
485489
/// Returns a snapshot of all currently tracked consumers.
486490
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
487491
self.tracked_consumers
@@ -624,7 +628,7 @@ fn provide_top_memory_consumers_to_error_msg(
624628
#[cfg(test)]
625629
mod tests {
626630
use super::*;
627-
use insta::{Settings, allow_duplicates, assert_snapshot};
631+
use insta::{Settings, allow_duplicates, assert_snapshot, with_settings};
628632
use std::sync::Arc;
629633

630634
fn make_settings() -> Settings {
@@ -824,72 +828,80 @@ mod tests {
824828

825829
#[test]
826830
fn test_tracked_consumers_pool_deregister() {
827-
fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
828-
// Baseline: see the 2 memory consumers
829-
let setting = make_settings();
830-
let _bound = setting.bind_to_scope();
831-
let r0 = MemoryConsumer::new("r0").register(&pool);
832-
r0.grow(10);
833-
let r1_consumer = MemoryConsumer::new("r1");
834-
let r1 = r1_consumer.register(&pool);
835-
r1.grow(20);
836-
837-
let res = r0.try_grow(150);
838-
assert!(res.is_err());
839-
let error = res.unwrap_err().strip_backtrace();
840-
allow_duplicates!(assert_snapshot!(error, @r"
841-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
842-
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
843-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
844-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
845-
"));
846-
847-
// Test: unregister one
848-
// only the remaining one should be listed
849-
drop(r1);
850-
let res = r0.try_grow(150);
851-
assert!(res.is_err());
852-
let error = res.unwrap_err().strip_backtrace();
853-
allow_duplicates!(assert_snapshot!(error, @r"
854-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
855-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
856-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
857-
"));
858-
859-
// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
860-
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
861-
let res = r0.try_grow(150);
862-
assert!(res.is_err());
863-
let error = res.unwrap_err().strip_backtrace();
864-
allow_duplicates!(assert_snapshot!(error, @r"
865-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
866-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
867-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
868-
"));
869-
870-
// Test: the registration needs to free itself (or be dropped),
871-
// for the proper error message
872-
let res = r0.try_grow(150);
873-
assert!(res.is_err());
874-
let error = res.unwrap_err().strip_backtrace();
875-
allow_duplicates!(assert_snapshot!(error, @r"
876-
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
877-
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
878-
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)
879-
"));
831+
fn test_per_pool_type<P: MemoryPool + 'static>(pool: Arc<TrackConsumersPool<P>>) {
832+
// `snapshot_suffix` ties each insta snapshot to this pool's inner backend; filters
833+
// normalize inner pool `Display` so fair vs greedy share the same `@` reference text.
834+
with_settings!({
835+
snapshot_suffix => pool.inner().name().to_string(),
836+
filters => vec![
837+
(
838+
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
839+
"$1#[ID](can spill: $2)",
840+
),
841+
(
842+
r"for the total memory pool: [^\n]+",
843+
"for the total memory pool: [INNER_POOL]",
844+
),
845+
],
846+
}, {
847+
let memory_pool: Arc<dyn MemoryPool> = Arc::<TrackConsumersPool<P>>::clone(&pool);
848+
let r0 = MemoryConsumer::new("r0").register(&memory_pool);
849+
r0.grow(10);
850+
let r1 = MemoryConsumer::new("r1").register(&memory_pool);
851+
r1.grow(20);
852+
853+
// Baseline: see the 2 memory consumers
854+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
855+
assert_snapshot!(error, @r"
856+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
857+
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
858+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
859+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL]
860+
");
861+
862+
// Test: unregister one — only the remaining consumer should be listed
863+
drop(r1);
864+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
865+
assert_snapshot!(error, @r"
866+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
867+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
868+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
869+
");
870+
871+
// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
872+
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
873+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
874+
assert_snapshot!(error, @r"
875+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
876+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
877+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
878+
");
879+
880+
// Test: the registration needs to free itself (or be dropped),
881+
// for the proper error message
882+
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
883+
assert_snapshot!(error, @r"
884+
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
885+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
886+
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
887+
");
888+
}
889+
);
880890
}
881891

882-
let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
883-
FairSpillPool::new(100),
884-
NonZeroUsize::new(3).unwrap(),
885-
));
886-
test_per_pool_type(tracked_spill_pool);
892+
allow_duplicates! {
893+
let tracked_spill_pool = Arc::new(TrackConsumersPool::new(
894+
FairSpillPool::new(100),
895+
NonZeroUsize::new(3).unwrap(),
896+
));
897+
test_per_pool_type(tracked_spill_pool);
887898

888-
let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
889-
GreedyMemoryPool::new(100),
890-
NonZeroUsize::new(3).unwrap(),
891-
));
892-
test_per_pool_type(tracked_greedy_pool);
899+
let tracked_greedy_pool = Arc::new(TrackConsumersPool::new(
900+
GreedyMemoryPool::new(100),
901+
NonZeroUsize::new(3).unwrap(),
902+
));
903+
test_per_pool_type(tracked_greedy_pool);
904+
}
893905
}
894906

895907
#[test]

0 commit comments

Comments
 (0)