Skip to content

Commit fa5e9d0

Browse files
adriangbclaude
andcommitted
refactor: address remaining PR #22000 review feedback
Follow-ups to the cherry-picked refactor that landed the file_index keying: * Reject `TABLESAMPLE` without an explicit method instead of silently treating it as `SYSTEM` (#22000 (comment)). PostgreSQL requires an explicit method and Spark defaults to block-level; picking one here in core would commit to semantics callers may not want. Added an slt case to lock the new error. * Rephrase the `SamplePushdown` planning error from "TABLESAMPLE is not supported for this source" to "TABLESAMPLE could not be pushed down" since the failure may originate at any node along the passthrough chain, not just the leaf source (#22000 (comment)). Updated the slt assertion to match. * Dedupe the SYSTEM-mode adaptive split comments in the parquet opener; the outer block now covers determinism and the inner block covers the row-group-vs-row split math without overlap (#22000 (comment)). * Update the `select.md` and `relation_planner/table_sample.rs` REPEATABLE wording to reflect that sampling now keys on the execution `partition_index`, not the on-disk file path (#22000 (comment) and #discussion_r3187445171). * Replace the opener-level "REPEATABLE ignores file name" test with a "sampling keys on partition_index" test that verifies same partition_index → same selection regardless of file name and different partition_index → uncorrelated samples. Added `with_partition_index` to the test builder. * Refresh the `run_examples-7` snapshot to match the new seed mix (the per-row-group hash now folds in the optional REPEATABLE seed alongside `file_index`; deterministic but a different draw). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ec48974 commit fa5e9d0

6 files changed

Lines changed: 64 additions & 46 deletions

File tree

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,10 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
306306
// through. Routed against the parquet-backed copy of the table so
307307
// the `SamplePushdown` rule can absorb the sample into the scan.
308308
// `REPEATABLE(42)` makes the rows deterministic across runs and
309-
// across machines (the seed dominates the file path in the
310-
// sampler's hash input).
309+
// across machines: the parquet sampler keys on the seed plus the
310+
// execution `partition_index` (a stable per-file id), never on the
311+
// on-disk path, so the same query against the same data picks the
312+
// same rows everywhere.
311313
let results = run_example(
312314
ctx,
313315
"Example 7: SYSTEM (handled by the built-in, not this example)",
@@ -318,11 +320,11 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
318320
+---------+---------+
319321
| column1 | column2 |
320322
+---------+---------+
321-
| 5 | row_5 |
322323
| 6 | row_6 |
323324
| 7 | row_7 |
324325
| 8 | row_8 |
325326
| 9 | row_9 |
327+
| 10 | row_10 |
326328
+---------+---------+
327329
");
328330

datafusion/datasource-parquet/src/opener.rs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,6 +1777,13 @@ mod test {
17771777
self
17781778
}
17791779

1780+
/// Set the partition_index plumbed through to the opener.
1781+
/// Sampling keys on this so different partitions decorrelate.
1782+
fn with_partition_index(mut self, partition_index: usize) -> Self {
1783+
self.partition_index = partition_index;
1784+
self
1785+
}
1786+
17801787
/// Set the object store (required for building).
17811788
fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
17821789
self.store = Some(store);
@@ -3009,11 +3016,15 @@ mod test {
30093016
);
30103017
}
30113018

3012-
/// REPEATABLE(seed) must produce the same selection regardless of
3013-
/// where the parquet file lives. This is the SQL semantics users
3014-
/// expect from `TABLESAMPLE ... REPEATABLE(n)`.
3019+
/// Sampling must key on the execution `partition_index`, not the
3020+
/// on-disk file path: two parquet files with different names but
3021+
/// identical content opened at the same partition_index pick the
3022+
/// same rows. Different partition_index values must decorrelate.
3023+
/// This is what makes `TABLESAMPLE ... REPEATABLE(n)` reproducible
3024+
/// across environments without leaking object-store paths into the
3025+
/// sample seed.
30153026
#[tokio::test]
3016-
async fn system_target_remaining_repeatable_seed_ignores_file_name() {
3027+
async fn system_target_remaining_sample_keys_on_partition_index() {
30173028
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
30183029

30193030
let values: Vec<Option<i32>> = (0..100).map(Some).collect();
@@ -3033,15 +3044,18 @@ mod test {
30333044
..Default::default()
30343045
};
30353046

3036-
let opener = ParquetMorselizerBuilder::new()
3047+
// Same partition_index for both files -> same selection,
3048+
// regardless of file name.
3049+
let opener_p0 = ParquetMorselizerBuilder::new()
30373050
.with_store(Arc::clone(&store))
30383051
.with_schema(Arc::clone(&schema))
30393052
.with_projection_indices(&[0])
3040-
.with_sampling(sampling)
3053+
.with_partition_index(0)
3054+
.with_sampling(sampling.clone())
30413055
.build();
30423056

30433057
let stream_a = open_file(
3044-
&opener,
3058+
&opener_p0,
30453059
PartitionedFile::new(
30463060
"first.parquet".to_string(),
30473061
u64::try_from(len_a).unwrap(),
@@ -3050,7 +3064,7 @@ mod test {
30503064
.await
30513065
.unwrap();
30523066
let stream_b = open_file(
3053-
&opener,
3067+
&opener_p0,
30543068
PartitionedFile::new(
30553069
"second_path.parquet".to_string(),
30563070
u64::try_from(len_b).unwrap(),
@@ -3064,52 +3078,37 @@ mod test {
30643078

30653079
assert_eq!(
30663080
rows_a, rows_b,
3067-
"REPEATABLE(seed) must select the same rows regardless of file path"
3081+
"sampling must key on partition_index, not file name"
30683082
);
30693083
assert!(
30703084
!rows_a.is_empty() && rows_a.len() < 100,
30713085
"expected a strict subset; got {} rows",
30723086
rows_a.len()
30733087
);
30743088

3075-
// Without a seed the selection must depend on the file name —
3076-
// otherwise unrelated parquet files in the same scan would all
3077-
// produce correlated samples, defeating the purpose of file-
3078-
// axis randomisation.
3079-
let unseeded_sampling = crate::sampling::ParquetSampling {
3080-
system_target_remaining: Some(0.5),
3081-
row_cluster_size: 4,
3082-
..Default::default()
3083-
};
3084-
let unseeded_opener = ParquetMorselizerBuilder::new()
3089+
// Different partition_index -> uncorrelated samples even with
3090+
// the same seed. Otherwise every file in a parallel scan would
3091+
// pick the same row indices, defeating file-axis randomisation.
3092+
let opener_p1 = ParquetMorselizerBuilder::new()
30853093
.with_store(Arc::clone(&store))
30863094
.with_schema(Arc::clone(&schema))
30873095
.with_projection_indices(&[0])
3088-
.with_sampling(unseeded_sampling)
3096+
.with_partition_index(1)
3097+
.with_sampling(sampling)
30893098
.build();
3090-
let stream_a2 = open_file(
3091-
&unseeded_opener,
3099+
let stream_a_p1 = open_file(
3100+
&opener_p1,
30923101
PartitionedFile::new(
30933102
"first.parquet".to_string(),
30943103
u64::try_from(len_a).unwrap(),
30953104
),
30963105
)
30973106
.await
30983107
.unwrap();
3099-
let stream_b2 = open_file(
3100-
&unseeded_opener,
3101-
PartitionedFile::new(
3102-
"second_path.parquet".to_string(),
3103-
u64::try_from(len_b).unwrap(),
3104-
),
3105-
)
3106-
.await
3107-
.unwrap();
3108-
let rows_a2 = collect_values(stream_a2).await;
3109-
let rows_b2 = collect_values(stream_b2).await;
3108+
let rows_a_p1 = collect_values(stream_a_p1).await;
31103109
assert_ne!(
3111-
rows_a2, rows_b2,
3112-
"without a seed, different file names should produce different samples"
3110+
rows_a, rows_a_p1,
3111+
"different partition_index must produce different samples"
31133112
);
31143113
}
31153114

datafusion/physical-optimizer/src/sample_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl PhysicalOptimizerRule for SamplePushdown {
7373
Pushdown::Pushed(new_child) => Ok(Transformed::yes(new_child)),
7474
Pushdown::Failed(reason) => {
7575
datafusion_common::plan_err!(
76-
"TABLESAMPLE is not supported for this source: {reason}. \
76+
"TABLESAMPLE could not be pushed down: {reason}. \
7777
A generic post-scan SampleExec is not yet implemented; \
7878
see https://github.com/apache/datafusion/issues/16533"
7979
)

datafusion/sql/src/sample.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,21 @@ impl RelationPlanner for TableSampleSystemPlanner {
9999
}
100100
match ts.name {
101101
// The built-in planner only handles SYSTEM (and BLOCK as an
102-
// alias for SYSTEM, matching Hive). Anything else is a
103-
// semantics commitment we don't want to make in core.
104-
Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) | None => {}
102+
// alias for SYSTEM, matching Hive). An unspecified method
103+
// is rejected rather than silently picking SYSTEM, since
104+
// the right default differs by engine (PostgreSQL requires
105+
// an explicit method; Spark defaults to block-level).
106+
// Anything else is a semantics commitment we don't want to
107+
// make in core.
108+
Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) => {}
109+
None => {
110+
return not_impl_err!(
111+
"TABLESAMPLE without an explicit method is not supported; \
112+
write TABLESAMPLE SYSTEM (...) (or register a custom \
113+
RelationPlanner before the built-in TableSampleSystemPlanner \
114+
to define a default)."
115+
);
116+
}
105117
Some(other) => {
106118
return not_impl_err!(
107119
"TABLESAMPLE method {other} is not supported (only SYSTEM). \

datafusion/sqllogictest/test_files/tablesample.slt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/
7878
statement error TABLESAMPLE method BERNOULLI is not supported
7979
SELECT count(*) FROM sample_table TABLESAMPLE BERNOULLI (10);
8080

81+
statement error TABLESAMPLE without an explicit method is not supported
82+
SELECT count(*) FROM sample_table TABLESAMPLE (10);
83+
8184
statement error TABLESAMPLE with ROWS count is not supported
8285
SELECT count(*) FROM sample_table TABLESAMPLE SYSTEM (100 ROWS);
8386

@@ -102,7 +105,7 @@ STORED AS CSV
102105
LOCATION 'test_files/scratch/tablesample/sample_table.csv'
103106
OPTIONS ('format.has_header' 'true');
104107

105-
statement error TABLESAMPLE is not supported for this source
108+
statement error TABLESAMPLE could not be pushed down
106109
SELECT count(*) FROM sample_csv TABLESAMPLE SYSTEM (50);
107110

108111
statement ok

docs/source/user-guide/sql/select.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,11 @@ pure row-level sampling.
156156

157157
`REPEATABLE(seed)` mixes the seed into every random draw, so all
158158
levels produce the same selection across runs. The selection also
159-
depends on the file name, the row-group index within the file, and
160-
the cluster size, so different files don't accidentally see
161-
correlated samples.
159+
depends on the execution `partition_index` of each file (a stable
160+
per-file id assigned by the scan, independent of the on-disk path),
161+
the row-group index within the file, and the cluster size, so
162+
different files don't accidentally see correlated samples and the
163+
sample is reproducible across environments.
162164

163165
The sampling is visible in `EXPLAIN`:
164166

0 commit comments

Comments
 (0)