Skip to content

Commit 537035b

Browse files
committed
feat: add support for parquet content defined chunking options
1 parent 4084a18 commit 537035b

13 files changed

Lines changed: 411 additions & 8 deletions

File tree

Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,3 +282,21 @@ incremental = false
282282
inherits = "release"
283283
debug = true
284284
strip = false
285+
286+
[patch.crates-io]
287+
arrow = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
288+
arrow-arith = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
289+
arrow-array = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
290+
arrow-buffer = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
291+
arrow-cast = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
292+
arrow-csv = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
293+
arrow-data = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
294+
arrow-flight = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
295+
arrow-ipc = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
296+
arrow-json = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
297+
arrow-ord = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
298+
arrow-row = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
299+
arrow-schema = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
300+
arrow-select = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
301+
arrow-string = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }
302+
parquet = { git = "https://github.com/apache/arrow-rs", tag = "58.1.0-rc1" }

datafusion/common/src/config.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,7 @@ config_namespace! {
845845
/// default parquet writer setting
846846
pub bloom_filter_ndv: Option<u64>, default = None
847847

848+
848849
/// (writing) Controls whether DataFusion will attempt to speed up writing
849850
/// parquet files by serializing them in parallel. Each column
850851
/// in each row group in each output file are serialized in parallel
@@ -872,6 +873,27 @@ config_namespace! {
872873
/// writing out already in-memory data, such as from a cached
873874
/// data frame.
874875
pub maximum_buffered_record_batches_per_stream: usize, default = 2
876+
877+
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
878+
/// parquet files. When true, the other `cdc_*` options control the chunking
879+
/// behavior. When CDC is enabled, parallel writing is automatically disabled
880+
/// since the chunker state must persist across row groups.
881+
pub enable_content_defined_chunking: bool, default = false
882+
883+
/// (writing) Minimum chunk size in bytes for content-defined chunking.
884+
/// The rolling hash will not be updated until this size is reached for each chunk.
885+
/// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true.
886+
pub cdc_min_chunk_size: usize, default = 256 * 1024
887+
888+
/// (writing) Maximum chunk size in bytes for content-defined chunking.
889+
/// The chunker will create a new chunk whenever the chunk size exceeds this value.
890+
/// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true.
891+
pub cdc_max_chunk_size: usize, default = 1024 * 1024
892+
893+
/// (writing) Normalization level for content-defined chunking.
894+
/// Increasing this improves deduplication ratio but increases fragmentation.
895+
/// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true.
896+
pub cdc_norm_level: i64, default = 0
875897
}
876898
}
877899

@@ -1820,6 +1842,7 @@ config_field!(usize);
18201842
config_field!(f64);
18211843
config_field!(u64);
18221844
config_field!(u32);
1845+
config_field!(i64);
18231846

18241847
impl ConfigField for u8 {
18251848
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
9595
global,
9696
column_specific_options,
9797
key_value_metadata,
98-
crypto: _,
98+
..
9999
} = table_parquet_options;
100100

101101
let mut builder = global.into_writer_properties_builder()?;
@@ -191,6 +191,10 @@ impl ParquetOptions {
191191
bloom_filter_on_write,
192192
bloom_filter_fpp,
193193
bloom_filter_ndv,
194+
enable_content_defined_chunking,
195+
cdc_min_chunk_size,
196+
cdc_max_chunk_size,
197+
cdc_norm_level,
194198

195199
// not in WriterProperties
196200
enable_page_index: _,
@@ -247,6 +251,15 @@ impl ParquetOptions {
247251
if let Some(encoding) = encoding {
248252
builder = builder.set_encoding(parse_encoding_string(encoding)?);
249253
}
254+
if *enable_content_defined_chunking {
255+
builder = builder.set_content_defined_chunking(Some(
256+
parquet::file::properties::CdcOptions {
257+
min_chunk_size: *cdc_min_chunk_size,
258+
max_chunk_size: *cdc_max_chunk_size,
259+
norm_level: *cdc_norm_level as i32,
260+
},
261+
));
262+
}
250263

251264
Ok(builder)
252265
}
@@ -460,6 +473,10 @@ mod tests {
460473
skip_arrow_metadata: defaults.skip_arrow_metadata,
461474
coerce_int96: None,
462475
max_predicate_cache_size: defaults.max_predicate_cache_size,
476+
enable_content_defined_chunking: defaults.enable_content_defined_chunking,
477+
cdc_min_chunk_size: defaults.cdc_min_chunk_size,
478+
cdc_max_chunk_size: defaults.cdc_max_chunk_size,
479+
cdc_norm_level: defaults.cdc_norm_level,
463480
}
464481
}
465482

@@ -576,6 +593,21 @@ mod tests {
576593
binary_as_string: global_options_defaults.binary_as_string,
577594
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
578595
coerce_int96: None,
596+
enable_content_defined_chunking: props
597+
.content_defined_chunking()
598+
.is_some(),
599+
cdc_min_chunk_size: props
600+
.content_defined_chunking()
601+
.map(|c| c.min_chunk_size)
602+
.unwrap_or(global_options_defaults.cdc_min_chunk_size),
603+
cdc_max_chunk_size: props
604+
.content_defined_chunking()
605+
.map(|c| c.max_chunk_size)
606+
.unwrap_or(global_options_defaults.cdc_max_chunk_size),
607+
cdc_norm_level: props
608+
.content_defined_chunking()
609+
.map(|c| c.norm_level as i64)
610+
.unwrap_or(global_options_defaults.cdc_norm_level),
579611
},
580612
column_specific_options,
581613
key_value_metadata,
@@ -786,6 +818,49 @@ mod tests {
786818
);
787819
}
788820

821+
#[test]
822+
fn test_cdc_enabled_with_custom_options() {
823+
let mut opts = TableParquetOptions::default();
824+
opts.global.enable_content_defined_chunking = true;
825+
opts.global.cdc_min_chunk_size = 128 * 1024;
826+
opts.global.cdc_max_chunk_size = 512 * 1024;
827+
opts.global.cdc_norm_level = 2;
828+
opts.arrow_schema(&Arc::new(Schema::empty()));
829+
830+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
831+
let cdc = props.content_defined_chunking().expect("CDC should be set");
832+
assert_eq!(cdc.min_chunk_size, 128 * 1024);
833+
assert_eq!(cdc.max_chunk_size, 512 * 1024);
834+
assert_eq!(cdc.norm_level, 2);
835+
}
836+
837+
#[test]
838+
fn test_cdc_disabled_by_default() {
839+
let mut opts = TableParquetOptions::default();
840+
opts.arrow_schema(&Arc::new(Schema::empty()));
841+
842+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
843+
assert!(props.content_defined_chunking().is_none());
844+
}
845+
846+
#[test]
847+
fn test_cdc_round_trip_through_writer_props() {
848+
let mut opts = TableParquetOptions::default();
849+
opts.global.enable_content_defined_chunking = true;
850+
opts.global.cdc_min_chunk_size = 64 * 1024;
851+
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
852+
opts.global.cdc_norm_level = -1;
853+
opts.arrow_schema(&Arc::new(Schema::empty()));
854+
855+
let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
856+
let recovered = session_config_from_writer_props(&props);
857+
858+
assert_eq!(recovered.global.enable_content_defined_chunking, true);
859+
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
860+
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
861+
assert_eq!(recovered.global.cdc_norm_level, -1);
862+
}
863+
789864
#[test]
790865
fn test_bloom_filter_set_ndv_only() {
791866
// the TableParquetOptions::default, with only ndv set

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,11 @@ impl FileSink for ParquetSink {
13681368

13691369
while let Some((path, mut rx)) = file_stream_rx.recv().await {
13701370
let parquet_props = self.create_writer_props(&runtime, &path).await?;
1371-
if !parquet_opts.global.allow_single_file_parallelism {
1371+
// CDC requires the sequential writer: the chunker state lives in ArrowWriter
1372+
// and persists across row groups. The parallel path bypasses ArrowWriter entirely.
1373+
if !parquet_opts.global.allow_single_file_parallelism
1374+
|| parquet_opts.global.enable_content_defined_chunking
1375+
{
13721376
let mut writer = self
13731377
.create_async_arrow_writer(
13741378
&path,

datafusion/proto-common/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ name = "datafusion_proto_common"
3737
[features]
3838
default = []
3939
json = ["serde", "pbjson"]
40+
parquet = ["datafusion-common/parquet", "dep:parquet"]
4041

4142
[dependencies]
4243
arrow = { workspace = true }
4344
datafusion-common = { workspace = true }
45+
parquet = { workspace = true, optional = true }
4446
pbjson = { workspace = true, optional = true }
4547
prost = { workspace = true }
4648
serde = { version = "1.0", optional = true }

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,20 @@ message ParquetOptions {
603603
oneof max_predicate_cache_size_opt {
604604
uint64 max_predicate_cache_size = 33;
605605
}
606+
607+
bool content_defined_chunking = 35; // default = false
608+
609+
oneof cdc_min_chunk_size_opt {
610+
uint64 cdc_min_chunk_size = 36;
611+
}
612+
613+
oneof cdc_max_chunk_size_opt {
614+
uint64 cdc_max_chunk_size = 37;
615+
}
616+
617+
oneof cdc_norm_level_opt {
618+
int32 cdc_norm_level = 38;
619+
}
606620
}
607621

608622
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,16 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
10901090
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
10911091
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
10921092
}).unwrap_or(None),
1093+
enable_content_defined_chunking: value.content_defined_chunking,
1094+
cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt {
1095+
protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize,
1096+
}).unwrap_or(ParquetOptions::default().cdc_min_chunk_size),
1097+
cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt {
1098+
protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize,
1099+
}).unwrap_or(ParquetOptions::default().cdc_max_chunk_size),
1100+
cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt {
1101+
protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64,
1102+
}).unwrap_or(ParquetOptions::default().cdc_norm_level),
10931103
})
10941104
}
10951105
}
@@ -1152,17 +1162,17 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
11521162
column_specific_options.insert(column_name.clone(), options.try_into()?);
11531163
}
11541164
}
1155-
Ok(TableParquetOptions {
1165+
let opts = TableParquetOptions {
11561166
global: value
11571167
.global
11581168
.as_ref()
11591169
.map(|v| v.try_into())
11601170
.unwrap()
11611171
.unwrap(),
11621172
column_specific_options,
1163-
key_value_metadata: Default::default(),
1164-
crypto: Default::default(),
1165-
})
1173+
..Default::default()
1174+
};
1175+
Ok(opts)
11661176
}
11671177
}
11681178

@@ -1262,3 +1272,89 @@ pub(crate) fn csv_writer_options_from_proto(
12621272
.with_null(writer_options.null_value.clone())
12631273
.with_double_quote(writer_options.double_quote))
12641274
}
1275+
1276+
#[cfg(test)]
1277+
mod tests {
1278+
use super::*;
1279+
use datafusion_common::config::{ParquetOptions, TableParquetOptions};
1280+
1281+
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
1282+
let proto: crate::protobuf_common::ParquetOptions =
1283+
(&opts).try_into().expect("to_proto");
1284+
ParquetOptions::try_from(&proto).expect("from_proto")
1285+
}
1286+
1287+
fn table_parquet_options_proto_round_trip(
1288+
opts: TableParquetOptions,
1289+
) -> TableParquetOptions {
1290+
let proto: crate::protobuf_common::TableParquetOptions =
1291+
(&opts).try_into().expect("to_proto");
1292+
TableParquetOptions::try_from(&proto).expect("from_proto")
1293+
}
1294+
1295+
#[test]
1296+
fn test_parquet_options_cdc_disabled_round_trip() {
1297+
let opts = ParquetOptions::default();
1298+
assert!(!opts.enable_content_defined_chunking);
1299+
let recovered = parquet_options_proto_round_trip(opts.clone());
1300+
assert_eq!(opts, recovered);
1301+
}
1302+
1303+
#[test]
1304+
fn test_parquet_options_cdc_enabled_round_trip() {
1305+
let opts = ParquetOptions {
1306+
enable_content_defined_chunking: true,
1307+
cdc_min_chunk_size: 128 * 1024,
1308+
cdc_max_chunk_size: 512 * 1024,
1309+
cdc_norm_level: 2,
1310+
..ParquetOptions::default()
1311+
};
1312+
let recovered = parquet_options_proto_round_trip(opts.clone());
1313+
assert_eq!(recovered.enable_content_defined_chunking, true);
1314+
assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024);
1315+
assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024);
1316+
assert_eq!(recovered.cdc_norm_level, 2);
1317+
}
1318+
1319+
#[test]
1320+
fn test_parquet_options_cdc_negative_norm_level_round_trip() {
1321+
let opts = ParquetOptions {
1322+
enable_content_defined_chunking: true,
1323+
cdc_norm_level: -3,
1324+
..ParquetOptions::default()
1325+
};
1326+
let recovered = parquet_options_proto_round_trip(opts);
1327+
assert_eq!(recovered.cdc_norm_level, -3);
1328+
}
1329+
1330+
#[test]
1331+
fn test_table_parquet_options_cdc_round_trip() {
1332+
let mut opts = TableParquetOptions::default();
1333+
opts.global.enable_content_defined_chunking = true;
1334+
opts.global.cdc_min_chunk_size = 64 * 1024;
1335+
opts.global.cdc_max_chunk_size = 2 * 1024 * 1024;
1336+
opts.global.cdc_norm_level = -1;
1337+
1338+
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1339+
assert_eq!(recovered.global.enable_content_defined_chunking, true);
1340+
assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024);
1341+
assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024);
1342+
assert_eq!(recovered.global.cdc_norm_level, -1);
1343+
}
1344+
1345+
#[test]
1346+
fn test_table_parquet_options_cdc_disabled_round_trip() {
1347+
let opts = TableParquetOptions::default();
1348+
assert!(!opts.global.enable_content_defined_chunking);
1349+
let recovered = table_parquet_options_proto_round_trip(opts.clone());
1350+
assert_eq!(recovered.global.enable_content_defined_chunking, false);
1351+
assert_eq!(
1352+
recovered.global.cdc_min_chunk_size,
1353+
ParquetOptions::default().cdc_min_chunk_size
1354+
);
1355+
assert_eq!(
1356+
recovered.global.cdc_max_chunk_size,
1357+
ParquetOptions::default().cdc_max_chunk_size
1358+
);
1359+
}
1360+
}

0 commit comments

Comments
 (0)