Skip to content

Commit 2f2bf32

Browse files
alamberenavsarogullarimartin-g
authored
[branch-53] fix: Provide more generic API for the capacity limit parsing (#20372) (#20893)
- Part of #19692 - Closes #20371 on branch-53 This PR: - Backports #20372 from @erenavsarogullari to the branch-53 line Co-authored-by: Eren Avsarogullari <eren@apache.org> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
1 parent 5388c72 commit 2f2bf32

4 files changed

Lines changed: 180 additions & 19 deletions

File tree

benchmarks/src/util/options.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ pub struct CommonOpt {
5050

5151
/// Memory limit (e.g. '100M', '1.5G'). If not specified, run all pre-defined memory limits for given query
5252
/// if there's any, otherwise run with no memory limit.
53-
#[arg(long = "memory-limit", value_parser = parse_memory_limit)]
53+
#[arg(long = "memory-limit", value_parser = parse_capacity_limit)]
5454
pub memory_limit: Option<usize>,
5555

5656
/// The amount of memory to reserve for sort spill operations. DataFusion's default value will be used
5757
/// if not specified.
58-
#[arg(long = "sort-spill-reservation-bytes", value_parser = parse_memory_limit)]
58+
#[arg(long = "sort-spill-reservation-bytes", value_parser = parse_capacity_limit)]
5959
pub sort_spill_reservation_bytes: Option<usize>,
6060

6161
/// Activate debug mode to see more details
@@ -116,20 +116,26 @@ impl CommonOpt {
116116
}
117117
}
118118

119-
/// Parse memory limit from string to number of bytes
120-
/// e.g. '1.5G', '100M' -> 1572864
121-
fn parse_memory_limit(limit: &str) -> Result<usize, String> {
119+
/// Parse capacity limit from string to number of bytes by allowing units: K, M and G.
120+
/// Supports formats like '1.5G' -> 1610612736, '100M' -> 104857600
121+
fn parse_capacity_limit(limit: &str) -> Result<usize, String> {
122+
if limit.trim().is_empty() {
123+
return Err("Capacity limit cannot be empty".to_string());
124+
}
122125
let (number, unit) = limit.split_at(limit.len() - 1);
123126
let number: f64 = number
124127
.parse()
125-
.map_err(|_| format!("Failed to parse number from memory limit '{limit}'"))?;
128+
.map_err(|_| format!("Failed to parse number from capacity limit '{limit}'"))?;
129+
if number.is_sign_negative() || number.is_infinite() {
130+
return Err("Limit value should be positive finite number".to_string());
131+
}
126132

127133
match unit {
128134
"K" => Ok((number * 1024.0) as usize),
129135
"M" => Ok((number * 1024.0 * 1024.0) as usize),
130136
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
131137
_ => Err(format!(
132-
"Unsupported unit '{unit}' in memory limit '{limit}'"
138+
"Unsupported unit '{unit}' in capacity limit '{limit}'. Unit must be one of: 'K', 'M', 'G'"
133139
)),
134140
}
135141
}
@@ -139,16 +145,25 @@ mod tests {
139145
use super::*;
140146

141147
#[test]
142-
fn test_parse_memory_limit_all() {
148+
fn test_parse_capacity_limit_all() {
143149
// Test valid inputs
144-
assert_eq!(parse_memory_limit("100K").unwrap(), 102400);
145-
assert_eq!(parse_memory_limit("1.5M").unwrap(), 1572864);
146-
assert_eq!(parse_memory_limit("2G").unwrap(), 2147483648);
150+
assert_eq!(parse_capacity_limit("100K").unwrap(), 102400);
151+
assert_eq!(parse_capacity_limit("1.5M").unwrap(), 1572864);
152+
assert_eq!(parse_capacity_limit("2G").unwrap(), 2147483648);
147153

148154
// Test invalid unit
149-
assert!(parse_memory_limit("500X").is_err());
155+
assert!(parse_capacity_limit("500X").is_err());
150156

151157
// Test invalid number
152-
assert!(parse_memory_limit("abcM").is_err());
158+
assert!(parse_capacity_limit("abcM").is_err());
159+
160+
// Test negative number
161+
assert!(parse_capacity_limit("-1M").is_err());
162+
163+
// Test infinite number
164+
assert!(parse_capacity_limit("infM").is_err());
165+
166+
// Test negative infinite number
167+
assert!(parse_capacity_limit("-infM").is_err());
153168
}
154169
}

datafusion/core/src/execution/context/mod.rs

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,20 +1167,20 @@ impl SessionContext {
11671167
let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
11681168
builder = match key {
11691169
"memory_limit" => {
1170-
let memory_limit = Self::parse_memory_limit(value)?;
1170+
let memory_limit = Self::parse_capacity_limit(variable, value)?;
11711171
builder.with_memory_limit(memory_limit, 1.0)
11721172
}
11731173
"max_temp_directory_size" => {
1174-
let directory_size = Self::parse_memory_limit(value)?;
1174+
let directory_size = Self::parse_capacity_limit(variable, value)?;
11751175
builder.with_max_temp_directory_size(directory_size as u64)
11761176
}
11771177
"temp_directory" => builder.with_temp_file_path(value),
11781178
"metadata_cache_limit" => {
1179-
let limit = Self::parse_memory_limit(value)?;
1179+
let limit = Self::parse_capacity_limit(variable, value)?;
11801180
builder.with_metadata_cache_limit(limit)
11811181
}
11821182
"list_files_cache_limit" => {
1183-
let limit = Self::parse_memory_limit(value)?;
1183+
let limit = Self::parse_capacity_limit(variable, value)?;
11841184
builder.with_object_list_cache_limit(limit)
11851185
}
11861186
"list_files_cache_ttl" => {
@@ -1252,11 +1252,23 @@ impl SessionContext {
12521252
/// (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
12531253
/// );
12541254
/// ```
1255+
#[deprecated(
1256+
since = "53.0.0",
1257+
note = "please use `parse_capacity_limit` function instead."
1258+
)]
12551259
pub fn parse_memory_limit(limit: &str) -> Result<usize> {
1260+
if limit.trim().is_empty() {
1261+
return Err(plan_datafusion_err!("Empty limit value found!"));
1262+
}
12561263
let (number, unit) = limit.split_at(limit.len() - 1);
12571264
let number: f64 = number.parse().map_err(|_| {
12581265
plan_datafusion_err!("Failed to parse number from memory limit '{limit}'")
12591266
})?;
1267+
if number.is_sign_negative() || number.is_infinite() {
1268+
return Err(plan_datafusion_err!(
1269+
"Limit value should be positive finite number"
1270+
));
1271+
}
12601272

12611273
match unit {
12621274
"K" => Ok((number * 1024.0) as usize),
@@ -1266,6 +1278,51 @@ impl SessionContext {
12661278
}
12671279
}
12681280

1281+
/// Parse capacity limit from string to number of bytes by allowing units: K, M and G.
1282+
/// Supports formats like '1.5G', '100M', '512K'
1283+
///
1284+
/// # Examples
1285+
/// ```
1286+
/// use datafusion::execution::context::SessionContext;
1287+
///
1288+
/// assert_eq!(
1289+
/// SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", "1M").unwrap(),
1290+
/// 1024 * 1024
1291+
/// );
1292+
/// assert_eq!(
1293+
/// SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", "1.5G").unwrap(),
1294+
/// (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
1295+
/// );
1296+
/// ```
1297+
pub fn parse_capacity_limit(config_name: &str, limit: &str) -> Result<usize> {
1298+
if limit.trim().is_empty() {
1299+
return Err(plan_datafusion_err!(
1300+
"Empty limit value found for '{config_name}'"
1301+
));
1302+
}
1303+
let (number, unit) = limit.split_at(limit.len() - 1);
1304+
let number: f64 = number.parse().map_err(|_| {
1305+
plan_datafusion_err!(
1306+
"Failed to parse number from '{config_name}', limit '{limit}'"
1307+
)
1308+
})?;
1309+
if number.is_sign_negative() || number.is_infinite() {
1310+
return Err(plan_datafusion_err!(
1311+
"Limit value should be positive finite number for '{config_name}'"
1312+
));
1313+
}
1314+
1315+
match unit {
1316+
"K" => Ok((number * 1024.0) as usize),
1317+
"M" => Ok((number * 1024.0 * 1024.0) as usize),
1318+
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
1319+
_ => plan_err!(
1320+
"Unsupported unit '{unit}' in '{config_name}', limit '{limit}'. \
1321+
Unit must be one of: 'K', 'M', 'G'"
1322+
),
1323+
}
1324+
}
1325+
12691326
fn parse_duration(duration: &str) -> Result<Duration> {
12701327
let mut minutes = None;
12711328
let mut seconds = None;
@@ -2759,4 +2816,71 @@ mod tests {
27592816
assert!(have.is_err());
27602817
}
27612818
}
2819+
2820+
#[test]
2821+
fn test_parse_memory_limit() {
2822+
// Valid memory_limit
2823+
for (limit, want) in [
2824+
("1.5K", (1.5 * 1024.0) as usize),
2825+
("2M", (2f64 * 1024.0 * 1024.0) as usize),
2826+
("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
2827+
] {
2828+
#[expect(deprecated)]
2829+
let have = SessionContext::parse_memory_limit(limit).unwrap();
2830+
assert_eq!(want, have);
2831+
}
2832+
2833+
// Invalid memory_limit
2834+
for limit in [
2835+
"1B",
2836+
"1T",
2837+
"",
2838+
" ",
2839+
"XYZG",
2840+
"-1G",
2841+
"infG",
2842+
"-infG",
2843+
"G",
2844+
"1024B",
2845+
"invalid_size",
2846+
] {
2847+
#[expect(deprecated)]
2848+
let have = SessionContext::parse_memory_limit(limit);
2849+
assert!(have.is_err());
2850+
}
2851+
}
2852+
2853+
#[test]
2854+
fn test_parse_capacity_limit() {
2855+
const MEMORY_LIMIT: &str = "datafusion.runtime.memory_limit";
2856+
2857+
// Valid capacity_limit
2858+
for (limit, want) in [
2859+
("1.5K", (1.5 * 1024.0) as usize),
2860+
("2M", (2f64 * 1024.0 * 1024.0) as usize),
2861+
("1G", (1f64 * 1024.0 * 1024.0 * 1024.0) as usize),
2862+
] {
2863+
let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT, limit).unwrap();
2864+
assert_eq!(want, have);
2865+
}
2866+
2867+
// Invalid capacity_limit
2868+
for limit in [
2869+
"1B",
2870+
"1T",
2871+
"",
2872+
" ",
2873+
"XYZG",
2874+
"-1G",
2875+
"infG",
2876+
"-infG",
2877+
"G",
2878+
"1024B",
2879+
"invalid_size",
2880+
] {
2881+
let have = SessionContext::parse_capacity_limit(MEMORY_LIMIT, limit);
2882+
assert!(have.is_err());
2883+
assert!(have.unwrap_err().to_string().contains(MEMORY_LIMIT));
2884+
}
2885+
}
27622886
}

datafusion/core/tests/sql/runtime_config.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ async fn test_memory_limit_enforcement() {
145145
}
146146

147147
#[tokio::test]
148-
async fn test_invalid_memory_limit() {
148+
async fn test_invalid_memory_limit_when_unit_is_invalid() {
149149
let ctx = SessionContext::new();
150150

151151
let result = ctx
@@ -154,7 +154,26 @@ async fn test_invalid_memory_limit() {
154154

155155
assert!(result.is_err());
156156
let error_message = result.unwrap_err().to_string();
157-
assert!(error_message.contains("Unsupported unit 'X'"));
157+
assert!(
158+
error_message
159+
.contains("Unsupported unit 'X' in 'datafusion.runtime.memory_limit'")
160+
&& error_message.contains("Unit must be one of: 'K', 'M', 'G'")
161+
);
162+
}
163+
164+
#[tokio::test]
165+
async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
166+
let ctx = SessionContext::new();
167+
168+
let result = ctx
169+
.sql("SET datafusion.runtime.memory_limit = 'invalid_memory_limit'")
170+
.await;
171+
172+
assert!(result.is_err());
173+
let error_message = result.unwrap_err().to_string();
174+
assert!(error_message.contains(
175+
"Failed to parse number from 'datafusion.runtime.memory_limit', limit 'invalid_memory_limit'"
176+
));
158177
}
159178

160179
#[tokio::test]

datafusion/sqllogictest/test_files/set_variable.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,3 +447,6 @@ datafusion.runtime.max_temp_directory_size
447447
datafusion.runtime.memory_limit
448448
datafusion.runtime.metadata_cache_limit
449449
datafusion.runtime.temp_directory
450+
451+
statement error DataFusion error: Error during planning: Unsupported value Null
452+
SET datafusion.runtime.memory_limit = NULL

0 commit comments

Comments
 (0)