Skip to content

Commit 5944e8b

Browse files
authored
feat: create builder for disk manager (#16191)
* feat: create builder for disk manager Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * fix: apply clippy suggestions Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: use DiskManagerBuilder Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * fix: format code Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * fix: update format and clippy suggestions Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * ci: trigger build Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * doc: update DiskManagerMode comment Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * chore: deprecate DiskManagerConfig and DiskManager::try_new Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: propagate deprecation Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: propagate deprecation on wasmtest Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> --------- Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com>
1 parent 7002a00 commit 5944e8b

8 files changed

Lines changed: 193 additions & 60 deletions

File tree

benchmarks/src/util/options.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{num::NonZeroUsize, sync::Arc};
1919

2020
use datafusion::{
2121
execution::{
22-
disk_manager::DiskManagerConfig,
22+
disk_manager::DiskManagerBuilder,
2323
memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool},
2424
runtime_env::RuntimeEnvBuilder,
2525
},
@@ -110,7 +110,7 @@ impl CommonOpt {
110110
};
111111
rt_builder = rt_builder
112112
.with_memory_pool(pool)
113-
.with_disk_manager(DiskManagerConfig::NewOs);
113+
.with_disk_manager_builder(DiskManagerBuilder::default());
114114
}
115115
Ok(rt_builder)
116116
}

datafusion-cli/src/main.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::execution::memory_pool::{
2828
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
2929
};
3030
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
31-
use datafusion::execution::DiskManager;
3231
use datafusion::prelude::SessionContext;
3332
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3433
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -43,7 +42,7 @@ use datafusion_cli::{
4342
use clap::Parser;
4443
use datafusion::common::config_err;
4544
use datafusion::config::ConfigOptions;
46-
use datafusion::execution::disk_manager::DiskManagerConfig;
45+
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
4746
use mimalloc::MiMalloc;
4847

4948
#[global_allocator]
@@ -200,15 +199,10 @@ async fn main_inner() -> Result<()> {
200199

201200
// set disk limit
202201
if let Some(disk_limit) = args.disk_limit {
203-
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
204-
205-
DiskManager::set_arc_max_temp_directory_size(
206-
&mut disk_manager,
207-
disk_limit.try_into().unwrap(),
208-
)?;
209-
210-
let disk_config = DiskManagerConfig::new_existing(disk_manager);
211-
rt_builder = rt_builder.with_disk_manager(disk_config);
202+
let builder = DiskManagerBuilder::default()
203+
.with_mode(DiskManagerMode::OsTmpDirectory)
204+
.with_max_temp_directory_size(disk_limit.try_into().unwrap());
205+
rt_builder = rt_builder.with_disk_manager_builder(builder);
212206
}
213207

214208
let runtime_env = rt_builder.build_arc()?;

datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,15 @@ use arrow_schema::SchemaRef;
2525
use datafusion::datasource::MemTable;
2626
use datafusion::prelude::{SessionConfig, SessionContext};
2727
use datafusion_common::{instant::Instant, Result};
28+
use datafusion_execution::disk_manager::DiskManagerBuilder;
2829
use datafusion_execution::memory_pool::{
2930
human_readable_size, MemoryPool, UnboundedMemoryPool,
3031
};
3132
use datafusion_expr::display_schema;
3233
use datafusion_physical_plan::spill::get_record_batch_memory_size;
3334
use std::time::Duration;
3435

35-
use datafusion_execution::{
36-
disk_manager::DiskManagerConfig, memory_pool::FairSpillPool,
37-
runtime_env::RuntimeEnvBuilder,
38-
};
36+
use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder};
3937
use rand::prelude::IndexedRandom;
4038
use rand::Rng;
4139
use rand::{rngs::StdRng, SeedableRng};
@@ -548,7 +546,7 @@ impl SortFuzzerTestGenerator {
548546

549547
let runtime = RuntimeEnvBuilder::new()
550548
.with_memory_pool(memory_pool)
551-
.with_disk_manager(DiskManagerConfig::NewOs)
549+
.with_disk_manager_builder(DiskManagerBuilder::default())
552550
.build_arc()?;
553551

554552
let ctx = SessionContext::new_with_config_rt(config, runtime);

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion::assert_batches_eq;
3131
use datafusion::datasource::memory::MemorySourceConfig;
3232
use datafusion::datasource::source::DataSourceExec;
3333
use datafusion::datasource::{MemTable, TableProvider};
34-
use datafusion::execution::disk_manager::DiskManagerConfig;
3534
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3635
use datafusion::execution::session_state::SessionStateBuilder;
3736
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -41,11 +40,12 @@ use datafusion::prelude::{SessionConfig, SessionContext};
4140
use datafusion_catalog::streaming::StreamingTable;
4241
use datafusion_catalog::Session;
4342
use datafusion_common::{assert_contains, Result};
43+
use datafusion_execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
4444
use datafusion_execution::memory_pool::{
4545
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
4646
};
4747
use datafusion_execution::runtime_env::RuntimeEnv;
48-
use datafusion_execution::{DiskManager, TaskContext};
48+
use datafusion_execution::TaskContext;
4949
use datafusion_expr::{Expr, TableType};
5050
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
5151
use datafusion_physical_optimizer::join_selection::JoinSelection;
@@ -204,7 +204,7 @@ async fn sort_merge_join_spill() {
204204
)
205205
.with_memory_limit(1_000)
206206
.with_config(config)
207-
.with_disk_manager_config(DiskManagerConfig::NewOs)
207+
.with_disk_manager_builder(DiskManagerBuilder::default())
208208
.with_scenario(Scenario::AccessLogStreaming)
209209
.run()
210210
.await
@@ -288,7 +288,7 @@ async fn sort_spill_reservation() {
288288
.with_memory_limit(mem_limit)
289289
// use a single partition so only a sort is needed
290290
.with_scenario(scenario)
291-
.with_disk_manager_config(DiskManagerConfig::NewOs)
291+
.with_disk_manager_builder(DiskManagerBuilder::default())
292292
.with_expected_plan(
293293
// It is important that this plan only has a SortExec, not
294294
// also merge, so we can ensure the sort could finish
@@ -550,9 +550,10 @@ async fn setup_context(
550550
disk_limit: u64,
551551
memory_pool_limit: usize,
552552
) -> Result<SessionContext> {
553-
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
554-
555-
DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?;
553+
let disk_manager = DiskManagerBuilder::default()
554+
.with_mode(DiskManagerMode::OsTmpDirectory)
555+
.with_max_temp_directory_size(disk_limit)
556+
.build()?;
556557

557558
let runtime = RuntimeEnvBuilder::new()
558559
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
@@ -561,7 +562,7 @@ async fn setup_context(
561562

562563
let runtime = Arc::new(RuntimeEnv {
563564
memory_pool: runtime.memory_pool.clone(),
564-
disk_manager,
565+
disk_manager: Arc::new(disk_manager),
565566
cache_manager: runtime.cache_manager.clone(),
566567
object_store_registry: runtime.object_store_registry.clone(),
567568
});
@@ -641,7 +642,7 @@ struct TestCase {
641642
scenario: Scenario,
642643
/// How should the disk manager (that allows spilling) be
643644
/// configured? Defaults to `Disabled`
644-
disk_manager_config: DiskManagerConfig,
645+
disk_manager_builder: DiskManagerBuilder,
645646
/// Expected explain plan, if non-empty
646647
expected_plan: Vec<String>,
647648
/// Is the plan expected to pass? Defaults to false
@@ -657,7 +658,8 @@ impl TestCase {
657658
config: SessionConfig::new(),
658659
memory_pool: None,
659660
scenario: Scenario::AccessLog,
660-
disk_manager_config: DiskManagerConfig::Disabled,
661+
disk_manager_builder: DiskManagerBuilder::default()
662+
.with_mode(DiskManagerMode::Disabled),
661663
expected_plan: vec![],
662664
expected_success: false,
663665
}
@@ -714,11 +716,11 @@ impl TestCase {
714716

715717
/// Specify if the disk manager should be enabled. If true,
716718
/// operators that support it can spill
717-
pub fn with_disk_manager_config(
719+
pub fn with_disk_manager_builder(
718720
mut self,
719-
disk_manager_config: DiskManagerConfig,
721+
disk_manager_builder: DiskManagerBuilder,
720722
) -> Self {
721-
self.disk_manager_config = disk_manager_config;
723+
self.disk_manager_builder = disk_manager_builder;
722724
self
723725
}
724726

@@ -737,7 +739,7 @@ impl TestCase {
737739
memory_pool,
738740
config,
739741
scenario,
740-
disk_manager_config,
742+
disk_manager_builder,
741743
expected_plan,
742744
expected_success,
743745
} = self;
@@ -746,7 +748,7 @@ impl TestCase {
746748

747749
let mut builder = RuntimeEnvBuilder::new()
748750
// disk manager setting controls the spilling
749-
.with_disk_manager(disk_manager_config)
751+
.with_disk_manager_builder(disk_manager_builder)
750752
.with_memory_limit(memory_limit, MEMORY_FRACTION);
751753

752754
if let Some(pool) = memory_pool {

0 commit comments

Comments
 (0)