Skip to content

Commit 91edaad

Browse files
committed
chore: update parquet CDC options and proto serialization for arrow-rs 58.1
1 parent c4b7816 commit 91edaad

12 files changed

Lines changed: 302 additions & 299 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,24 @@ config_namespace! {
687687
}
688688
}
689689

690+
config_namespace! {
691+
/// Options for content-defined chunking (CDC) when writing parquet files.
692+
/// See [`ParquetOptions::use_content_defined_chunking`].
693+
pub struct CdcOptions {
694+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
695+
/// until this many bytes have been accumulated. Default is 256 KiB.
696+
pub min_chunk_size: usize, default = 256 * 1024
697+
698+
/// Maximum chunk size in bytes. A split is forced when the accumulated
699+
/// size exceeds this value. Default is 1 MiB.
700+
pub max_chunk_size: usize, default = 1024 * 1024
701+
702+
/// Normalization level. Increasing this improves deduplication ratio
703+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
704+
pub norm_level: i64, default = 0
705+
}
706+
}
707+
690708
config_namespace! {
691709
/// Options for reading and writing parquet files
692710
///
@@ -875,25 +893,10 @@ config_namespace! {
875893
pub maximum_buffered_record_batches_per_stream: usize, default = 2
876894

877895
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
878-
/// parquet files. When true, the other `cdc_*` options control the chunking
879-
/// behavior. When CDC is enabled, parallel writing is automatically disabled
880-
/// since the chunker state must persist across row groups.
881-
pub enable_content_defined_chunking: bool, default = false
882-
883-
/// (writing) Minimum chunk size in bytes for content-defined chunking.
884-
/// The rolling hash will not be updated until this size is reached for each chunk.
885-
/// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true.
886-
pub cdc_min_chunk_size: usize, default = 256 * 1024
887-
888-
/// (writing) Maximum chunk size in bytes for content-defined chunking.
889-
/// The chunker will create a new chunk whenever the chunk size exceeds this value.
890-
/// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true.
891-
pub cdc_max_chunk_size: usize, default = 1024 * 1024
892-
893-
/// (writing) Normalization level for content-defined chunking.
894-
/// Increasing this improves deduplication ratio but increases fragmentation.
895-
/// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true.
896-
pub cdc_norm_level: i64, default = 0
896+
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
897+
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
898+
/// automatically disabled since the chunker state must persist across row groups.
899+
pub use_content_defined_chunking: Option<CdcOptions>, default = None
897900
}
898901
}
899902

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,7 @@ impl ParquetOptions {
191191
bloom_filter_on_write,
192192
bloom_filter_fpp,
193193
bloom_filter_ndv,
194-
enable_content_defined_chunking,
195-
cdc_min_chunk_size,
196-
cdc_max_chunk_size,
197-
cdc_norm_level,
194+
use_content_defined_chunking,
198195

199196
// not in WriterProperties
200197
enable_page_index: _,
@@ -251,12 +248,12 @@ impl ParquetOptions {
251248
if let Some(encoding) = encoding {
252249
builder = builder.set_encoding(parse_encoding_string(encoding)?);
253250
}
254-
if *enable_content_defined_chunking {
251+
if let Some(cdc) = use_content_defined_chunking {
255252
builder = builder.set_content_defined_chunking(Some(
256253
parquet::file::properties::CdcOptions {
257-
min_chunk_size: *cdc_min_chunk_size,
258-
max_chunk_size: *cdc_max_chunk_size,
259-
norm_level: *cdc_norm_level as i32,
254+
min_chunk_size: cdc.min_chunk_size,
255+
max_chunk_size: cdc.max_chunk_size,
256+
norm_level: cdc.norm_level as i32,
260257
},
261258
));
262259
}
@@ -401,7 +398,9 @@ mod tests {
401398
use super::*;
402399
#[cfg(feature = "parquet_encryption")]
403400
use crate::config::ConfigFileEncryptionProperties;
404-
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
401+
use crate::config::{
402+
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
403+
};
405404
use crate::parquet_config::DFParquetWriterVersion;
406405
use parquet::basic::Compression;
407406
use parquet::file::properties::{
@@ -473,10 +472,7 @@ mod tests {
473472
skip_arrow_metadata: defaults.skip_arrow_metadata,
474473
coerce_int96: None,
475474
max_predicate_cache_size: defaults.max_predicate_cache_size,
476-
enable_content_defined_chunking: defaults.enable_content_defined_chunking,
477-
cdc_min_chunk_size: defaults.cdc_min_chunk_size,
478-
cdc_max_chunk_size: defaults.cdc_max_chunk_size,
479-
cdc_norm_level: defaults.cdc_norm_level,
475+
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
480476
}
481477
}
482478

@@ -593,21 +589,13 @@ mod tests {
593589
binary_as_string: global_options_defaults.binary_as_string,
594590
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
595591
coerce_int96: None,
596-
enable_content_defined_chunking: props
597-
.content_defined_chunking()
598-
.is_some(),
599-
cdc_min_chunk_size: props
600-
.content_defined_chunking()
601-
.map(|c| c.min_chunk_size)
602-
.unwrap_or(global_options_defaults.cdc_min_chunk_size),
603-
cdc_max_chunk_size: props
604-
.content_defined_chunking()
605-
.map(|c| c.max_chunk_size)
606-
.unwrap_or(global_options_defaults.cdc_max_chunk_size),
607-
cdc_norm_level: props
608-
.content_defined_chunking()
609-
.map(|c| c.norm_level as i64)
610-
.unwrap_or(global_options_defaults.cdc_norm_level),
592+
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
593+
CdcOptions {
594+
min_chunk_size: c.min_chunk_size,
595+
max_chunk_size: c.max_chunk_size,
596+
norm_level: c.norm_level as i64,
597+
}
598+
}),
611599
},
612600
column_specific_options,
613601
key_value_metadata,
@@ -821,10 +809,11 @@ mod tests {
821809
#[test]
822810
fn test_cdc_enabled_with_custom_options() {
823811
let mut opts = TableParquetOptions::default();
824-
opts.global.enable_content_defined_chunking = true;
825-
opts.global.cdc_min_chunk_size = 128 * 1024;
826-
opts.global.cdc_max_chunk_size = 512 * 1024;
827-
opts.global.cdc_norm_level = 2;
812+
opts.global.use_content_defined_chunking = Some(CdcOptions {
813+
min_chunk_size: 128 * 1024,
814+
max_chunk_size: 512 * 1024,
815+
norm_level: 2,
816+
});
828817
opts.arrow_schema(&Arc::new(Schema::empty()));
829818

830819
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
@@ -846,19 +835,20 @@ mod tests {
846835
#[test]
847836
fn test_cdc_round_trip_through_writer_props() {
848837
let mut opts = TableParquetOptions::default();
849-
opts.global.enable_content_defined_chunking = true;
850-
opts.global.cdc_min_chunk_size = 64 * 1024;
851-
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
852-
opts.global.cdc_norm_level = -1;
838+
opts.global.use_content_defined_chunking = Some(CdcOptions {
839+
min_chunk_size: 64 * 1024,
840+
max_chunk_size: 2 * 1024 * 1024,
841+
norm_level: -1,
842+
});
853843
opts.arrow_schema(&Arc::new(Schema::empty()));
854844

855845
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
856846
let recovered = session_config_from_writer_props(&props);
857847

858-
assert_eq!(recovered.global.enable_content_defined_chunking, true);
859-
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
860-
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
861-
assert_eq!(recovered.global.cdc_norm_level, -1);
848+
let cdc = recovered.global.use_content_defined_chunking.unwrap();
849+
assert_eq!(cdc.min_chunk_size, 64 * 1024);
850+
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
851+
assert_eq!(cdc.norm_level, -1);
862852
}
863853

864854
#[test]

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,7 @@ impl FileSink for ParquetSink {
13731373
// CDC requires the sequential writer: the chunker state lives in ArrowWriter
13741374
// and persists across row groups. The parallel path bypasses ArrowWriter entirely.
13751375
if !parquet_opts.global.allow_single_file_parallelism
1376-
|| parquet_opts.global.enable_content_defined_chunking
1376+
|| parquet_opts.global.use_content_defined_chunking.is_some()
13771377
{
13781378
let mut writer = self
13791379
.create_async_arrow_writer(

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -604,19 +604,13 @@ message ParquetOptions {
604604
uint64 max_predicate_cache_size = 33;
605605
}
606606

607-
bool content_defined_chunking = 35; // default = false
608-
609-
oneof cdc_min_chunk_size_opt {
610-
uint64 cdc_min_chunk_size = 36;
611-
}
612-
613-
oneof cdc_max_chunk_size_opt {
614-
uint64 cdc_max_chunk_size = 37;
615-
}
607+
CdcOptions content_defined_chunking = 35;
608+
}
616609

617-
oneof cdc_norm_level_opt {
618-
int32 cdc_norm_level = 38;
619-
}
610+
message CdcOptions {
611+
uint64 min_chunk_size = 1;
612+
uint64 max_chunk_size = 2;
613+
int32 norm_level = 3;
620614
}
621615

622616
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_common::{
3939
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
4040
arrow_datafusion_err,
4141
config::{
42-
CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
42+
CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
4343
TableParquetOptions,
4444
},
4545
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
@@ -1089,16 +1089,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
10891089
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
10901090
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
10911091
}).unwrap_or(None),
1092-
enable_content_defined_chunking: value.content_defined_chunking,
1093-
cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt {
1094-
protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize,
1095-
}).unwrap_or(ParquetOptions::default().cdc_min_chunk_size),
1096-
cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt {
1097-
protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize,
1098-
}).unwrap_or(ParquetOptions::default().cdc_max_chunk_size),
1099-
cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt {
1100-
protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64,
1101-
}).unwrap_or(ParquetOptions::default().cdc_norm_level),
1092+
use_content_defined_chunking: value.content_defined_chunking.map(|cdc| {
1093+
let defaults = CdcOptions::default();
1094+
CdcOptions {
1095+
min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size },
1096+
max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size },
1097+
norm_level: cdc.norm_level as i64,
1098+
}
1099+
}),
11021100
})
11031101
}
11041102
}
@@ -1275,7 +1273,7 @@ pub(crate) fn csv_writer_options_from_proto(
12751273
#[cfg(test)]
12761274
mod tests {
12771275
use super::*;
1278-
use datafusion_common::config::{ParquetOptions, TableParquetOptions};
1276+
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};
12791277

12801278
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
12811279
let proto: crate::protobuf_common::ParquetOptions =
@@ -1294,66 +1292,65 @@ mod tests {
12941292
#[test]
12951293
fn test_parquet_options_cdc_disabled_round_trip() {
12961294
let opts = ParquetOptions::default();
1297-
assert!(!opts.enable_content_defined_chunking);
1295+
assert!(opts.use_content_defined_chunking.is_none());
12981296
let recovered = parquet_options_proto_round_trip(opts.clone());
12991297
assert_eq!(opts, recovered);
13001298
}
13011299

13021300
#[test]
13031301
fn test_parquet_options_cdc_enabled_round_trip() {
13041302
let opts = ParquetOptions {
1305-
enable_content_defined_chunking: true,
1306-
cdc_min_chunk_size: 128 * 1024,
1307-
cdc_max_chunk_size: 512 * 1024,
1308-
cdc_norm_level: 2,
1303+
use_content_defined_chunking: Some(CdcOptions {
1304+
min_chunk_size: 128 * 1024,
1305+
max_chunk_size: 512 * 1024,
1306+
norm_level: 2,
1307+
}),
13091308
..ParquetOptions::default()
13101309
};
13111310
let recovered = parquet_options_proto_round_trip(opts.clone());
1312-
assert_eq!(recovered.enable_content_defined_chunking, true);
1313-
assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024);
1314-
assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024);
1315-
assert_eq!(recovered.cdc_norm_level, 2);
1311+
let cdc = recovered.use_content_defined_chunking.unwrap();
1312+
assert_eq!(cdc.min_chunk_size, 128 * 1024);
1313+
assert_eq!(cdc.max_chunk_size, 512 * 1024);
1314+
assert_eq!(cdc.norm_level, 2);
13161315
}
13171316

13181317
#[test]
13191318
fn test_parquet_options_cdc_negative_norm_level_round_trip() {
13201319
let opts = ParquetOptions {
1321-
enable_content_defined_chunking: true,
1322-
cdc_norm_level: -3,
1320+
use_content_defined_chunking: Some(CdcOptions {
1321+
norm_level: -3,
1322+
..CdcOptions::default()
1323+
}),
13231324
..ParquetOptions::default()
13241325
};
13251326
let recovered = parquet_options_proto_round_trip(opts);
1326-
assert_eq!(recovered.cdc_norm_level, -3);
1327+
assert_eq!(
1328+
recovered.use_content_defined_chunking.unwrap().norm_level,
1329+
-3
1330+
);
13271331
}
13281332

13291333
#[test]
13301334
fn test_table_parquet_options_cdc_round_trip() {
13311335
let mut opts = TableParquetOptions::default();
1332-
opts.global.enable_content_defined_chunking = true;
1333-
opts.global.cdc_min_chunk_size = 64 * 1024;
1334-
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
1335-
opts.global.cdc_norm_level = -1;
1336+
opts.global.use_content_defined_chunking = Some(CdcOptions {
1337+
min_chunk_size: 64 * 1024,
1338+
max_chunk_size: 2 * 1024 * 1024,
1339+
norm_level: -1,
1340+
});
13361341

13371342
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1338-
assert_eq!(recovered.global.enable_content_defined_chunking, true);
1339-
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
1340-
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
1341-
assert_eq!(recovered.global.cdc_norm_level, -1);
1343+
let cdc = recovered.global.use_content_defined_chunking.unwrap();
1344+
assert_eq!(cdc.min_chunk_size, 64 * 1024);
1345+
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
1346+
assert_eq!(cdc.norm_level, -1);
13421347
}
13431348

13441349
#[test]
13451350
fn test_table_parquet_options_cdc_disabled_round_trip() {
13461351
let opts = TableParquetOptions::default();
1347-
assert!(!opts.global.enable_content_defined_chunking);
1352+
assert!(opts.global.use_content_defined_chunking.is_none());
13481353
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1349-
assert_eq!(recovered.global.enable_content_defined_chunking, false);
1350-
assert_eq!(
1351-
recovered.global.cdc_min_chunk_size,
1352-
ParquetOptions::default().cdc_min_chunk_size
1353-
);
1354-
assert_eq!(
1355-
recovered.global.cdc_max_chunk_size,
1356-
ParquetOptions::default().cdc_max_chunk_size
1357-
);
1354+
assert!(recovered.global.use_content_defined_chunking.is_none());
13581355
}
13591356
}

0 commit comments

Comments
 (0)