Skip to content

Commit e8cc367

Browse files
committed
feat: add CDC end-to-end slt tests and true/false config option
- Add parquet_cdc.slt with 6 end-to-end tests: write parquet files with CDC enabled/disabled, read back and verify correctness across various data types, sizes, and CDC configurations. - Allow setting use_content_defined_chunking to 'true'/'false' to enable with defaults or disable, via a specific ConfigField impl for Option<CdcOptions>. - CdcOptions uses an inherent default() method instead of the Default trait to avoid the blanket Option<F> ConfigField impl conflict.
1 parent 5dc4d6e commit e8cc367

2 files changed

Lines changed: 433 additions & 13 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 202 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -687,21 +687,141 @@ config_namespace! {
687687
}
688688
}
689689

690-
config_namespace! {
691-
/// Options for content-defined chunking (CDC) when writing parquet files.
692-
/// See [`ParquetOptions::use_content_defined_chunking`].
693-
pub struct CdcOptions {
694-
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
695-
/// until this many bytes have been accumulated. Default is 256 KiB.
696-
pub min_chunk_size: usize, default = 256 * 1024
690+
/// Options for content-defined chunking (CDC) when writing parquet files.
691+
/// See [`ParquetOptions::use_content_defined_chunking`].
692+
///
693+
/// Can be enabled with default options by setting
694+
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
695+
/// like `use_content_defined_chunking.min_chunk_size`.
696+
#[derive(Debug, Clone, PartialEq)]
697+
pub struct CdcOptions {
698+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
699+
/// until this many bytes have been accumulated. Default is 256 KiB.
700+
pub min_chunk_size: usize,
701+
702+
/// Maximum chunk size in bytes. A split is forced when the accumulated
703+
/// size exceeds this value. Default is 1 MiB.
704+
pub max_chunk_size: usize,
705+
706+
/// Normalization level. Increasing this improves deduplication ratio
707+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
708+
pub norm_level: i32,
709+
}
710+
711+
// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
712+
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
713+
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
714+
// below to handle "true"/"false" for enabling/disabling CDC.
715+
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
716+
impl CdcOptions {
717+
/// Returns a new `CdcOptions` with default values.
718+
pub fn default() -> Self {
719+
Self {
720+
min_chunk_size: 256 * 1024,
721+
max_chunk_size: 1024 * 1024,
722+
norm_level: 0,
723+
}
724+
}
725+
}
697726

698-
/// Maximum chunk size in bytes. A split is forced when the accumulated
699-
/// size exceeds this value. Default is 1 MiB.
700-
pub max_chunk_size: usize, default = 1024 * 1024
727+
impl ConfigField for CdcOptions {
728+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
729+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
730+
match key {
731+
"min_chunk_size" => self.min_chunk_size.set(rem, value),
732+
"max_chunk_size" => self.max_chunk_size.set(rem, value),
733+
"norm_level" => self.norm_level.set(rem, value),
734+
_ => _config_err!(
735+
"Config value \"{}\" not found on CdcOptions",
736+
key
737+
),
738+
}
739+
}
701740

702-
/// Normalization level. Increasing this improves deduplication ratio
703-
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
704-
pub norm_level: i32, default = 0
741+
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
742+
let key = format!("{key_prefix}.min_chunk_size");
743+
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
744+
let key = format!("{key_prefix}.max_chunk_size");
745+
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
746+
let key = format!("{key_prefix}.norm_level");
747+
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
748+
}
749+
750+
fn reset(&mut self, key: &str) -> Result<()> {
751+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
752+
match key {
753+
"min_chunk_size" => {
754+
if rem.is_empty() {
755+
self.min_chunk_size = CdcOptions::default().min_chunk_size;
756+
Ok(())
757+
} else {
758+
self.min_chunk_size.reset(rem)
759+
}
760+
}
761+
"max_chunk_size" => {
762+
if rem.is_empty() {
763+
self.max_chunk_size = CdcOptions::default().max_chunk_size;
764+
Ok(())
765+
} else {
766+
self.max_chunk_size.reset(rem)
767+
}
768+
}
769+
"norm_level" => {
770+
if rem.is_empty() {
771+
self.norm_level = CdcOptions::default().norm_level;
772+
Ok(())
773+
} else {
774+
self.norm_level.reset(rem)
775+
}
776+
}
777+
_ => _config_err!(
778+
"Config value \"{}\" not found on CdcOptions",
779+
key
780+
),
781+
}
782+
}
783+
}
784+
785+
/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
786+
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
787+
/// setting individual sub-fields like `min_chunk_size`.
788+
impl ConfigField for Option<CdcOptions> {
789+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
790+
match self {
791+
Some(s) => s.visit(v, key, description),
792+
None => v.none(key, description),
793+
}
794+
}
795+
796+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
797+
if key.is_empty() {
798+
match value.to_ascii_lowercase().as_str() {
799+
"true" => {
800+
*self = Some(CdcOptions::default());
801+
Ok(())
802+
}
803+
"false" => {
804+
*self = None;
805+
Ok(())
806+
}
807+
_ => _config_err!(
808+
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
809+
),
810+
}
811+
} else {
812+
self.get_or_insert_with(CdcOptions::default)
813+
.set(key, value)
814+
}
815+
}
816+
817+
fn reset(&mut self, key: &str) -> Result<()> {
818+
if key.is_empty() {
819+
*self = None;
820+
Ok(())
821+
} else {
822+
self.get_or_insert_with(CdcOptions::default)
823+
.reset(key)
824+
}
705825
}
706826
}
707827

@@ -3604,4 +3724,73 @@ mod tests {
36043724
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
36053725
);
36063726
}
3727+
3728+
#[cfg(feature = "parquet")]
3729+
#[test]
3730+
fn set_cdc_option_with_boolean_true() {
3731+
use crate::config::ConfigOptions;
3732+
3733+
let mut config = ConfigOptions::default();
3734+
assert!(config
3735+
.execution
3736+
.parquet
3737+
.use_content_defined_chunking
3738+
.is_none());
3739+
3740+
// Setting to "true" should enable CDC with default options
3741+
config
3742+
.set(
3743+
"datafusion.execution.parquet.use_content_defined_chunking",
3744+
"true",
3745+
)
3746+
.unwrap();
3747+
let cdc = config
3748+
.execution
3749+
.parquet
3750+
.use_content_defined_chunking
3751+
.as_ref()
3752+
.expect("CDC should be enabled");
3753+
assert_eq!(cdc.min_chunk_size, 256 * 1024);
3754+
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3755+
assert_eq!(cdc.norm_level, 0);
3756+
3757+
// Setting to "false" should disable CDC
3758+
config
3759+
.set(
3760+
"datafusion.execution.parquet.use_content_defined_chunking",
3761+
"false",
3762+
)
3763+
.unwrap();
3764+
assert!(config
3765+
.execution
3766+
.parquet
3767+
.use_content_defined_chunking
3768+
.is_none());
3769+
}
3770+
3771+
#[cfg(feature = "parquet")]
3772+
#[test]
3773+
fn set_cdc_option_with_subfields() {
3774+
use crate::config::ConfigOptions;
3775+
3776+
let mut config = ConfigOptions::default();
3777+
3778+
// Setting sub-fields should also enable CDC
3779+
config
3780+
.set(
3781+
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
3782+
"1024",
3783+
)
3784+
.unwrap();
3785+
let cdc = config
3786+
.execution
3787+
.parquet
3788+
.use_content_defined_chunking
3789+
.as_ref()
3790+
.expect("CDC should be enabled");
3791+
assert_eq!(cdc.min_chunk_size, 1024);
3792+
// Other fields should be defaults
3793+
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
3794+
assert_eq!(cdc.norm_level, 0);
3795+
}
36073796
}

0 commit comments

Comments
 (0)