Skip to content

Commit 8fb5e58

Browse files
committed
chore: update parquet CDC options and proto serialization for arrow-rs 58.1
1 parent 1c1aae5 commit 8fb5e58

12 files changed

Lines changed: 302 additions & 299 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,24 @@ config_namespace! {
687687
}
688688
}
689689

690+
config_namespace! {
691+
/// Options for content-defined chunking (CDC) when writing parquet files.
692+
/// See [`ParquetOptions::use_content_defined_chunking`].
693+
pub struct CdcOptions {
694+
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
695+
/// until this many bytes have been accumulated. Default is 256 KiB.
696+
pub min_chunk_size: usize, default = 256 * 1024
697+
698+
/// Maximum chunk size in bytes. A split is forced when the accumulated
699+
/// size exceeds this value. Default is 1 MiB.
700+
pub max_chunk_size: usize, default = 1024 * 1024
701+
702+
/// Normalization level. Increasing this improves deduplication ratio
703+
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
704+
pub norm_level: i64, default = 0
705+
}
706+
}
707+
690708
config_namespace! {
691709
/// Options for reading and writing parquet files
692710
///
@@ -875,25 +893,10 @@ config_namespace! {
875893
pub maximum_buffered_record_batches_per_stream: usize, default = 2
876894

877895
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
878-
/// parquet files. When true, the other `cdc_*` options control the chunking
879-
/// behavior. When CDC is enabled, parallel writing is automatically disabled
880-
/// since the chunker state must persist across row groups.
881-
pub enable_content_defined_chunking: bool, default = false
882-
883-
/// (writing) Minimum chunk size in bytes for content-defined chunking.
884-
/// The rolling hash will not be updated until this size is reached for each chunk.
885-
/// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true.
886-
pub cdc_min_chunk_size: usize, default = 256 * 1024
887-
888-
/// (writing) Maximum chunk size in bytes for content-defined chunking.
889-
/// The chunker will create a new chunk whenever the chunk size exceeds this value.
890-
/// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true.
891-
pub cdc_max_chunk_size: usize, default = 1024 * 1024
892-
893-
/// (writing) Normalization level for content-defined chunking.
894-
/// Increasing this improves deduplication ratio but increases fragmentation.
895-
/// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true.
896-
pub cdc_norm_level: i64, default = 0
896+
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
897+
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
898+
/// automatically disabled since the chunker state must persist across row groups.
899+
pub use_content_defined_chunking: Option<CdcOptions>, default = None
897900
}
898901
}
899902

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,7 @@ impl ParquetOptions {
191191
bloom_filter_on_write,
192192
bloom_filter_fpp,
193193
bloom_filter_ndv,
194-
enable_content_defined_chunking,
195-
cdc_min_chunk_size,
196-
cdc_max_chunk_size,
197-
cdc_norm_level,
194+
use_content_defined_chunking,
198195

199196
// not in WriterProperties
200197
enable_page_index: _,
@@ -251,12 +248,12 @@ impl ParquetOptions {
251248
if let Some(encoding) = encoding {
252249
builder = builder.set_encoding(parse_encoding_string(encoding)?);
253250
}
254-
if *enable_content_defined_chunking {
251+
if let Some(cdc) = use_content_defined_chunking {
255252
builder = builder.set_content_defined_chunking(Some(
256253
parquet::file::properties::CdcOptions {
257-
min_chunk_size: *cdc_min_chunk_size,
258-
max_chunk_size: *cdc_max_chunk_size,
259-
norm_level: *cdc_norm_level as i32,
254+
min_chunk_size: cdc.min_chunk_size,
255+
max_chunk_size: cdc.max_chunk_size,
256+
norm_level: cdc.norm_level as i32,
260257
},
261258
));
262259
}
@@ -401,7 +398,9 @@ mod tests {
401398
use super::*;
402399
#[cfg(feature = "parquet_encryption")]
403400
use crate::config::ConfigFileEncryptionProperties;
404-
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
401+
use crate::config::{
402+
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
403+
};
405404
use crate::parquet_config::DFParquetWriterVersion;
406405
use parquet::basic::Compression;
407406
use parquet::file::properties::{
@@ -473,10 +472,7 @@ mod tests {
473472
skip_arrow_metadata: defaults.skip_arrow_metadata,
474473
coerce_int96: None,
475474
max_predicate_cache_size: defaults.max_predicate_cache_size,
476-
enable_content_defined_chunking: defaults.enable_content_defined_chunking,
477-
cdc_min_chunk_size: defaults.cdc_min_chunk_size,
478-
cdc_max_chunk_size: defaults.cdc_max_chunk_size,
479-
cdc_norm_level: defaults.cdc_norm_level,
475+
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
480476
}
481477
}
482478

@@ -593,21 +589,13 @@ mod tests {
593589
binary_as_string: global_options_defaults.binary_as_string,
594590
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
595591
coerce_int96: None,
596-
enable_content_defined_chunking: props
597-
.content_defined_chunking()
598-
.is_some(),
599-
cdc_min_chunk_size: props
600-
.content_defined_chunking()
601-
.map(|c| c.min_chunk_size)
602-
.unwrap_or(global_options_defaults.cdc_min_chunk_size),
603-
cdc_max_chunk_size: props
604-
.content_defined_chunking()
605-
.map(|c| c.max_chunk_size)
606-
.unwrap_or(global_options_defaults.cdc_max_chunk_size),
607-
cdc_norm_level: props
608-
.content_defined_chunking()
609-
.map(|c| c.norm_level as i64)
610-
.unwrap_or(global_options_defaults.cdc_norm_level),
592+
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
593+
CdcOptions {
594+
min_chunk_size: c.min_chunk_size,
595+
max_chunk_size: c.max_chunk_size,
596+
norm_level: c.norm_level as i64,
597+
}
598+
}),
611599
},
612600
column_specific_options,
613601
key_value_metadata,
@@ -821,10 +809,11 @@ mod tests {
821809
#[test]
822810
fn test_cdc_enabled_with_custom_options() {
823811
let mut opts = TableParquetOptions::default();
824-
opts.global.enable_content_defined_chunking = true;
825-
opts.global.cdc_min_chunk_size = 128 * 1024;
826-
opts.global.cdc_max_chunk_size = 512 * 1024;
827-
opts.global.cdc_norm_level = 2;
812+
opts.global.use_content_defined_chunking = Some(CdcOptions {
813+
min_chunk_size: 128 * 1024,
814+
max_chunk_size: 512 * 1024,
815+
norm_level: 2,
816+
});
828817
opts.arrow_schema(&Arc::new(Schema::empty()));
829818

830819
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
@@ -846,19 +835,20 @@ mod tests {
846835
#[test]
847836
fn test_cdc_round_trip_through_writer_props() {
848837
let mut opts = TableParquetOptions::default();
849-
opts.global.enable_content_defined_chunking = true;
850-
opts.global.cdc_min_chunk_size = 64 * 1024;
851-
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
852-
opts.global.cdc_norm_level = -1;
838+
opts.global.use_content_defined_chunking = Some(CdcOptions {
839+
min_chunk_size: 64 * 1024,
840+
max_chunk_size: 2 * 1024 * 1024,
841+
norm_level: -1,
842+
});
853843
opts.arrow_schema(&Arc::new(Schema::empty()));
854844

855845
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
856846
let recovered = session_config_from_writer_props(&props);
857847

858-
assert_eq!(recovered.global.enable_content_defined_chunking, true);
859-
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
860-
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
861-
assert_eq!(recovered.global.cdc_norm_level, -1);
848+
let cdc = recovered.global.use_content_defined_chunking.unwrap();
849+
assert_eq!(cdc.min_chunk_size, 64 * 1024);
850+
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
851+
assert_eq!(cdc.norm_level, -1);
862852
}
863853

864854
#[test]

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,7 @@ impl FileSink for ParquetSink {
13731373
// CDC requires the sequential writer: the chunker state lives in ArrowWriter
13741374
// and persists across row groups. The parallel path bypasses ArrowWriter entirely.
13751375
if !parquet_opts.global.allow_single_file_parallelism
1376-
|| parquet_opts.global.enable_content_defined_chunking
1376+
|| parquet_opts.global.use_content_defined_chunking.is_some()
13771377
{
13781378
let mut writer = self
13791379
.create_async_arrow_writer(

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -604,19 +604,13 @@ message ParquetOptions {
604604
uint64 max_predicate_cache_size = 33;
605605
}
606606

607-
bool content_defined_chunking = 35; // default = false
608-
609-
oneof cdc_min_chunk_size_opt {
610-
uint64 cdc_min_chunk_size = 36;
611-
}
612-
613-
oneof cdc_max_chunk_size_opt {
614-
uint64 cdc_max_chunk_size = 37;
615-
}
607+
CdcOptions content_defined_chunking = 35;
608+
}
616609

617-
oneof cdc_norm_level_opt {
618-
int32 cdc_norm_level = 38;
619-
}
610+
message CdcOptions {
611+
uint64 min_chunk_size = 1;
612+
uint64 max_chunk_size = 2;
613+
int32 norm_level = 3;
620614
}
621615

622616
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_common::{
4040
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
4141
arrow_datafusion_err,
4242
config::{
43-
CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
43+
CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
4444
TableParquetOptions,
4545
},
4646
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
@@ -1090,16 +1090,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
10901090
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
10911091
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
10921092
}).unwrap_or(None),
1093-
enable_content_defined_chunking: value.content_defined_chunking,
1094-
cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt {
1095-
protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize,
1096-
}).unwrap_or(ParquetOptions::default().cdc_min_chunk_size),
1097-
cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt {
1098-
protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize,
1099-
}).unwrap_or(ParquetOptions::default().cdc_max_chunk_size),
1100-
cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt {
1101-
protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64,
1102-
}).unwrap_or(ParquetOptions::default().cdc_norm_level),
1093+
use_content_defined_chunking: value.content_defined_chunking.map(|cdc| {
1094+
let defaults = CdcOptions::default();
1095+
CdcOptions {
1096+
min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size },
1097+
max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size },
1098+
norm_level: cdc.norm_level as i64,
1099+
}
1100+
}),
11031101
})
11041102
}
11051103
}
@@ -1276,7 +1274,7 @@ pub(crate) fn csv_writer_options_from_proto(
12761274
#[cfg(test)]
12771275
mod tests {
12781276
use super::*;
1279-
use datafusion_common::config::{ParquetOptions, TableParquetOptions};
1277+
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};
12801278

12811279
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
12821280
let proto: crate::protobuf_common::ParquetOptions =
@@ -1295,66 +1293,65 @@ mod tests {
12951293
#[test]
12961294
fn test_parquet_options_cdc_disabled_round_trip() {
12971295
let opts = ParquetOptions::default();
1298-
assert!(!opts.enable_content_defined_chunking);
1296+
assert!(opts.use_content_defined_chunking.is_none());
12991297
let recovered = parquet_options_proto_round_trip(opts.clone());
13001298
assert_eq!(opts, recovered);
13011299
}
13021300

13031301
#[test]
13041302
fn test_parquet_options_cdc_enabled_round_trip() {
13051303
let opts = ParquetOptions {
1306-
enable_content_defined_chunking: true,
1307-
cdc_min_chunk_size: 128 * 1024,
1308-
cdc_max_chunk_size: 512 * 1024,
1309-
cdc_norm_level: 2,
1304+
use_content_defined_chunking: Some(CdcOptions {
1305+
min_chunk_size: 128 * 1024,
1306+
max_chunk_size: 512 * 1024,
1307+
norm_level: 2,
1308+
}),
13101309
..ParquetOptions::default()
13111310
};
13121311
let recovered = parquet_options_proto_round_trip(opts.clone());
1313-
assert_eq!(recovered.enable_content_defined_chunking, true);
1314-
assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024);
1315-
assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024);
1316-
assert_eq!(recovered.cdc_norm_level, 2);
1312+
let cdc = recovered.use_content_defined_chunking.unwrap();
1313+
assert_eq!(cdc.min_chunk_size, 128 * 1024);
1314+
assert_eq!(cdc.max_chunk_size, 512 * 1024);
1315+
assert_eq!(cdc.norm_level, 2);
13171316
}
13181317

13191318
#[test]
13201319
fn test_parquet_options_cdc_negative_norm_level_round_trip() {
13211320
let opts = ParquetOptions {
1322-
enable_content_defined_chunking: true,
1323-
cdc_norm_level: -3,
1321+
use_content_defined_chunking: Some(CdcOptions {
1322+
norm_level: -3,
1323+
..CdcOptions::default()
1324+
}),
13241325
..ParquetOptions::default()
13251326
};
13261327
let recovered = parquet_options_proto_round_trip(opts);
1327-
assert_eq!(recovered.cdc_norm_level, -3);
1328+
assert_eq!(
1329+
recovered.use_content_defined_chunking.unwrap().norm_level,
1330+
-3
1331+
);
13281332
}
13291333

13301334
#[test]
13311335
fn test_table_parquet_options_cdc_round_trip() {
13321336
let mut opts = TableParquetOptions::default();
1333-
opts.global.enable_content_defined_chunking = true;
1334-
opts.global.cdc_min_chunk_size = 64 * 1024;
1335-
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
1336-
opts.global.cdc_norm_level = -1;
1337+
opts.global.use_content_defined_chunking = Some(CdcOptions {
1338+
min_chunk_size: 64 * 1024,
1339+
max_chunk_size: 2 * 1024 * 1024,
1340+
norm_level: -1,
1341+
});
13371342

13381343
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1339-
assert_eq!(recovered.global.enable_content_defined_chunking, true);
1340-
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
1341-
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
1342-
assert_eq!(recovered.global.cdc_norm_level, -1);
1344+
let cdc = recovered.global.use_content_defined_chunking.unwrap();
1345+
assert_eq!(cdc.min_chunk_size, 64 * 1024);
1346+
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
1347+
assert_eq!(cdc.norm_level, -1);
13431348
}
13441349

13451350
#[test]
13461351
fn test_table_parquet_options_cdc_disabled_round_trip() {
13471352
let opts = TableParquetOptions::default();
1348-
assert!(!opts.global.enable_content_defined_chunking);
1353+
assert!(opts.global.use_content_defined_chunking.is_none());
13491354
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1350-
assert_eq!(recovered.global.enable_content_defined_chunking, false);
1351-
assert_eq!(
1352-
recovered.global.cdc_min_chunk_size,
1353-
ParquetOptions::default().cdc_min_chunk_size
1354-
);
1355-
assert_eq!(
1356-
recovered.global.cdc_max_chunk_size,
1357-
ParquetOptions::default().cdc_max_chunk_size
1358-
);
1355+
assert!(recovered.global.use_content_defined_chunking.is_none());
13591356
}
13601357
}

0 commit comments

Comments
 (0)