Skip to content

Commit 9d59a8e

Browse files
authored
Cache available parallelism (#7620)
## Summary The idea is shamlessly taken from apache/datafusion#21084, which improved CI runtimes there significantly (and also makes a real difference in performance). Fetching the available parallelism isn't free, both because it might generate a stack trace and because it might end up doing a bunch of work, its even called out in its [docs](https://doc.rust-lang.org/std/thread/fn.available_parallelism.html). --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 2ad173d commit 9d59a8e

16 files changed

Lines changed: 53 additions & 41 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/vector-search-bench/src/prepare.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,6 @@ async fn write_shard_streaming(
162162
.open(vortex_path)
163163
.await?;
164164

165-
// This will write in parallel, using `std::thread::available_parallelism()`.
166-
// See `CompressingStrategy` for more details.
167165
flavor
168166
.create_write_options(&SESSION)
169167
.write(&mut output, stream)

clippy.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ disallowed-types = [
1212

1313
disallowed-methods = [
1414
{ path = "itertools::Itertools::counts", reason = "It uses the default hasher which is slow for primitives. Just inline the loop for better performance.", allow-invalid = true },
15-
{ path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result<Validity>`.", allow-invalid = true },
15+
{ path = "std::result::Result::and", reason = "This method is a footgun, especially when working with `Result<Validity>`." },
16+
{ path = "std::thread::available_parallelism", reason = "This function might do an unbounded amount of work, use `vortex_utils::parallelism::get_available_parallelism instead" },
1617
]

vortex-bench/src/polarsignals/data.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use parquet::file::properties::WriterProperties;
2525
use rand::RngExt;
2626
use rand::SeedableRng;
2727
use rand::rngs::StdRng;
28+
use vortex::utils::parallelism::get_available_parallelism;
2829

2930
use super::schema::Int64DictBuilder;
3031
use super::schema::LABELS;
@@ -146,9 +147,7 @@ pub fn generate_polarsignals_parquet(n_rows: usize, output_path: &Path) -> Resul
146147
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
147148

148149
let batch_size = 10_000;
149-
let num_threads = std::thread::available_parallelism()
150-
.map(|n| n.get())
151-
.unwrap_or(1);
150+
let num_threads = get_available_parallelism().unwrap_or(1);
152151

153152
let batch_ranges: Vec<(usize, usize)> = (0..n_rows)
154153
.step_by(batch_size)

vortex-datafusion/src/v2/source.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@
7070
use std::any::Any;
7171
use std::fmt;
7272
use std::fmt::Formatter;
73-
use std::num::NonZero;
74-
use std::num::NonZeroUsize;
7573
use std::sync::Arc;
7674

7775
use arrow_schema::DataType;
@@ -103,7 +101,6 @@ use vortex::array::arrow::ArrowArrayExecutor;
103101
use vortex::dtype::DType;
104102
use vortex::dtype::FieldPath;
105103
use vortex::dtype::Nullability;
106-
use vortex::error::VortexExpect;
107104
use vortex::error::VortexResult;
108105
use vortex::error::vortex_bail;
109106
use vortex::expr::Expression;
@@ -117,6 +114,7 @@ use vortex::io::session::RuntimeSessionExt;
117114
use vortex::scan::DataSourceRef;
118115
use vortex::scan::ScanRequest;
119116
use vortex::session::VortexSession;
117+
use vortex_utils::parallelism::get_available_parallelism;
120118

121119
use crate::convert::exprs::DefaultExpressionConvertor;
122120
use crate::convert::exprs::ExpressionConvertor;
@@ -277,9 +275,7 @@ impl VortexDataSourceBuilder {
277275
filter: None,
278276
limit: None,
279277
ordered: false,
280-
num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| {
281-
NonZero::new(1).vortex_expect("available parallelism must be non-zero")
282-
}),
278+
num_partitions: get_available_parallelism().unwrap_or(1),
283279
})
284280
}
285281
}
@@ -360,7 +356,7 @@ pub struct VortexDataSource {
360356
/// We use this as a hint for how many splits to execute concurrently in `open()`, but we
361357
/// always declare to DataFusion that we only have a single partition so that we can
362358
/// internally manage concurrency and fix the problem of partition skew.
363-
num_partitions: NonZeroUsize,
359+
num_partitions: usize,
364360
}
365361

366362
impl fmt::Debug for VortexDataSource {
@@ -428,7 +424,7 @@ impl DataSource for VortexDataSource {
428424

429425
let handle = session.handle();
430426
let stream = scan_streams
431-
.try_flatten_unordered(Some(num_partitions.get() * 2))
427+
.try_flatten_unordered(Some(num_partitions * 2))
432428
.map(move |result| {
433429
let session = session.clone();
434430
let schema = Arc::clone(&projected_schema);
@@ -437,7 +433,7 @@ impl DataSource for VortexDataSource {
437433
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
438434
})
439435
})
440-
.buffered(num_partitions.get())
436+
.buffered(num_partitions)
441437
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
442438

443439
// Apply leftover projection (expressions that couldn't be pushed into Vortex).
@@ -488,8 +484,7 @@ impl DataSource for VortexDataSource {
488484
) -> DFResult<Option<Arc<dyn DataSource>>> {
489485
// Vortex handles parallelism internally — always use a single partition.
490486
let mut this = self.clone();
491-
this.num_partitions = NonZero::new(target_partitions)
492-
.ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?;
487+
this.num_partitions = target_partitions;
493488
this.ordered |= output_ordering.is_some();
494489
Ok(Some(Arc::new(this)))
495490
}

vortex-duckdb/src/datasource.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use vortex::scalar_fn::fns::pack::Pack;
5151
use vortex::scan::DataSource;
5252
use vortex::scan::ScanRequest;
5353
use vortex_utils::aliases::hash_set::HashSet;
54+
use vortex_utils::parallelism::get_available_parallelism;
5455

5556
use crate::RUNTIME;
5657
use crate::SESSION;
@@ -304,9 +305,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
304305

305306
let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?;
306307

307-
let num_workers = std::thread::available_parallelism()
308-
.map(|n| n.get())
309-
.unwrap_or(1);
308+
let num_workers = get_available_parallelism().unwrap_or(1);
310309

311310
// We create an async bounded channel so that all thread-local workers can pull the next
312311
// available array chunk regardless of which partition it came from.

vortex-io/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ vortex-buffer = { workspace = true }
3939
vortex-error = { workspace = true }
4040
vortex-metrics = { workspace = true }
4141
vortex-session = { workspace = true }
42+
vortex-utils = { workspace = true }
4243

4344
[target.'cfg(unix)'.dependencies]
4445
custom-labels = { workspace = true }

vortex-io/src/runtime/pool.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::time::Duration;
99
use parking_lot::Mutex;
1010
use smol::block_on;
1111
use vortex_error::VortexExpect;
12+
use vortex_utils::parallelism::get_available_parallelism;
1213

1314
#[derive(Clone)]
1415
pub struct CurrentThreadWorkerPool {
@@ -25,10 +26,10 @@ impl CurrentThreadWorkerPool {
2526
}
2627

2728
/// Set the number of worker threads to the available system parallelism as reported by
28-
/// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread.
29+
/// [`get_available_parallelism()`] minus 1, to leave a slot open for the calling thread.
2930
pub fn set_workers_to_available_parallelism(&self) {
30-
let n = std::thread::available_parallelism()
31-
.map(|n| n.get().saturating_sub(1).max(1))
31+
let n = get_available_parallelism()
32+
.map(|n| n.saturating_sub(1).max(1))
3233
.unwrap_or(1);
3334
self.set_workers(n);
3435
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_btrblocks::BtrBlocksCompressor;
1414
use vortex_error::VortexResult;
1515
use vortex_io::session::RuntimeSessionExt;
1616
use vortex_session::VortexSession;
17+
use vortex_utils::parallelism::get_available_parallelism;
1718

1819
use crate::LayoutRef;
1920
use crate::LayoutStrategy;
@@ -67,9 +68,7 @@ impl CompressingStrategy {
6768
child: Arc::new(child),
6869
compressor: Arc::new(compressor),
6970
stats: Stat::all().collect(),
70-
concurrency: std::thread::available_parallelism()
71-
.map(|v| v.get())
72-
.unwrap_or(1),
71+
concurrency: get_available_parallelism().unwrap_or(1),
7372
}
7473
}
7574

vortex-layout/src/layouts/zoned/writer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_array::stats::PRUNING_STATS;
1414
use vortex_error::VortexResult;
1515
use vortex_io::session::RuntimeSessionExt;
1616
use vortex_session::VortexSession;
17+
use vortex_utils::parallelism::get_available_parallelism;
1718

1819
use crate::IntoLayout;
1920
use crate::LayoutRef;
@@ -44,9 +45,7 @@ impl Default for ZonedLayoutOptions {
4445
block_size: 8192,
4546
stats: PRUNING_STATS.into(),
4647
max_variable_length_statistics_size: 64,
47-
concurrency: std::thread::available_parallelism()
48-
.map(|v| v.get())
49-
.unwrap_or(1),
48+
concurrency: get_available_parallelism().unwrap_or(1),
5049
}
5150
}
5251
}

0 commit comments

Comments
 (0)