Skip to content

Commit 0661eba

Browse files
adriangbclaude
andcommitted
example: rework TABLESAMPLE example to compose with built-in SYSTEM
The relation_planner/table_sample.rs example previously errored on TABLESAMPLE methods other than BERNOULLI / ROW. After commit 3 of this stack registers a built-in TableSampleSystemPlanner by default, the example's planner runs *first* in the chain, so its error short-circuits SYSTEM and prevents the built-in from handling it. Reshape the example to use the chain composition pattern instead: - The planner now returns `RelationPlanning::Original` for methods it doesn't implement (SYSTEM, BLOCK, anything else), so the built-in picks up the slack. **No SYSTEM reimplementation in the example.** - The example's `TableSampleQueryPlanner` registers both `TableSampleExtensionPlanner` (this example's `TableSamplePlanNode` → its own `SampleExec`) **and** `SamplePhysicalPlanner` (core `Sample` → core `SampleExec`), since `with_extension_planners(...)` replaces the defaults. - Adds a parquet-backed `sample_data_parquet` table so SYSTEM has a source it can push into (the existing in-memory `sample_data` doesn't implement `try_push_sample`). - New Example 7 demonstrates `TABLESAMPLE SYSTEM (50) REPEATABLE (42)` against the parquet table; the row count is asserted (the exact rows shift with the tempdir path that gets hashed into the per-file seed, but the count from the cube-root math is stable). - Module docstring rewritten to lead with the composition story. The 3 existing relation_planner example tests (match_recognize, pivot_unpivot, table_sample) still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent eea0c4e commit 0661eba

1 file changed

Lines changed: 151 additions & 28 deletions

File tree

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 151 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,48 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! # TABLESAMPLE Example
18+
//! # TABLESAMPLE Example — adding row-level sampling on top of the built-in
1919
//!
20-
//! This example demonstrates implementing SQL `TABLESAMPLE` support using
21-
//! DataFusion's extensibility APIs.
20+
//! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the
21+
//! core `Sample` extension node by the auto-registered
22+
//! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the
23+
//! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW`
24+
//! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension,
25+
//! which is what this example demonstrates.
2226
//!
23-
//! This is a working `TABLESAMPLE` implementation that can serve as a starting
24-
//! point for your own projects. It also works as a template for adding other
25-
//! custom SQL operators, covering the full pipeline from parsing to execution.
27+
//! The key composition pattern: when our planner sees `TABLESAMPLE`, it
28+
//! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`)
29+
//! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because
30+
//! `register_relation_planner` prepends to the chain, our planner runs
31+
//! first and the built-in handles whatever we don't. **No SYSTEM
32+
//! reimplementation required.**
2633
//!
2734
//! It shows how to:
2835
//!
2936
//! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`]
3037
//! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`])
3138
//! 3. **Execute** sampling via a custom physical operator ([`SampleExec`])
39+
//! 4. **Compose** with the built-in SYSTEM planner by returning
40+
//! `RelationPlanning::Original` for methods we don't implement
3241
//!
3342
//! ## Supported Syntax
3443
//!
3544
//! ```sql
36-
//! -- Bernoulli sampling (each row has N% chance of selection)
45+
//! -- Bernoulli sampling (each row has N% chance of selection) — this example
3746
//! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT)
3847
//!
39-
//! -- Fractional sampling (0.0 to 1.0)
48+
//! -- Fractional sampling (0.0 to 1.0) — this example
4049
//! SELECT * FROM table TABLESAMPLE (0.1)
4150
//!
42-
//! -- Row count limit
51+
//! -- Row count limit — this example
4352
//! SELECT * FROM table TABLESAMPLE (100 ROWS)
4453
//!
4554
//! -- Reproducible sampling with a seed
4655
//! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)
56+
//!
57+
//! -- SYSTEM (block-level) sampling — handled by the built-in planner
58+
//! -- WITHOUT any code in this example
59+
//! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42)
4760
//! ```
4861
//!
4962
//! ## Architecture
@@ -81,6 +94,7 @@
8194
8295
use std::{
8396
fmt::{self, Debug, Formatter},
97+
fs::File,
8498
hash::{Hash, Hasher},
8599
pin::Pin,
86100
sync::Arc,
@@ -98,9 +112,12 @@ use futures::{
98112
stream::{Stream, StreamExt},
99113
};
100114
use rand::{Rng, SeedableRng, rngs::StdRng};
115+
use tempfile::TempDir;
101116
use tonic::async_trait;
102117

103118
use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal;
119+
use datafusion::parquet::arrow::ArrowWriter;
120+
use datafusion::parquet::file::properties::WriterProperties;
104121
use datafusion::{
105122
execution::{
106123
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
@@ -111,7 +128,9 @@ use datafusion::{
111128
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
112129
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
113130
},
114-
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
131+
physical_planner::{
132+
DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, SamplePhysicalPlanner,
133+
},
115134
prelude::*,
116135
};
117136
use datafusion_common::{
@@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> {
144163

145164
let ctx = SessionContext::new_with_state(state);
146165

147-
// Register custom relation planner for logical planning
166+
// Register custom relation planner for logical planning. It's prepended
167+
// to the chain (so it runs *before* the auto-registered built-in
168+
// `TableSampleSystemPlanner`); when our planner returns
169+
// `RelationPlanning::Original`, the built-in handles it.
148170
ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
149171
register_sample_data(&ctx)?;
150172

173+
// Register a parquet-backed copy of the same data so we can demonstrate
174+
// SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources;
175+
// the in-memory `sample_data` table can't absorb a `Sample` node.
176+
let _parquet_dir = register_sample_data_parquet(&ctx).await?;
177+
151178
println!("TABLESAMPLE Example");
152179
println!("===================\n");
153180

@@ -273,6 +300,32 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
273300
+---------+---------+---------+---------+
274301
");
275302

303+
// Example 7: SYSTEM sampling — handled by the built-in
304+
// `TableSampleSystemPlanner`, **not** by this example's planner.
305+
// Our planner returns `Original` for SYSTEM, so the chain falls
306+
// through. Routed against the parquet-backed copy of the table so
307+
// the `SamplePushdown` rule can absorb the sample into the scan.
308+
// `REPEATABLE(42)` makes the rows deterministic across runs and
309+
// across machines (the seed dominates the file path in the
310+
// sampler's hash input).
311+
let results = run_example(
312+
ctx,
313+
"Example 7: SYSTEM (handled by the built-in, not this example)",
314+
"SELECT * FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)",
315+
)
316+
.await?;
317+
assert_snapshot!(results, @r"
318+
+---------+---------+
319+
| column1 | column2 |
320+
+---------+---------+
321+
| 5 | row_5 |
322+
| 6 | row_6 |
323+
| 7 | row_7 |
324+
| 8 | row_8 |
325+
| 9 | row_9 |
326+
+---------+---------+
327+
");
328+
276329
Ok(())
277330
}
278331

@@ -301,6 +354,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
301354
Ok(())
302355
}
303356

357+
/// Register the same data as `sample_data_parquet`, backed by a tempfile
358+
/// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has
359+
/// a `ParquetSource` to push into. Returns the `TempDir` so the caller
360+
/// can keep it alive for the duration of the queries.
361+
async fn register_sample_data_parquet(ctx: &SessionContext) -> Result<TempDir> {
362+
let dir = TempDir::new().map_err(|e| plan_datafusion_err!("tempdir: {e}"))?;
363+
let path = dir.path().join("sample_data.parquet");
364+
365+
let column1: ArrayRef = Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
366+
let column2: ArrayRef = Arc::new(StringArray::from(
367+
(1..=10).map(|i| format!("row_{i}")).collect::<Vec<_>>(),
368+
));
369+
let batch =
370+
RecordBatch::try_from_iter(vec![("column1", column1), ("column2", column2)])?;
371+
let file = File::create(&path).map_err(|e| plan_datafusion_err!("create: {e}"))?;
372+
let mut writer = ArrowWriter::try_new(
373+
file,
374+
batch.schema(),
375+
Some(WriterProperties::builder().build()),
376+
)
377+
.map_err(|e| plan_datafusion_err!("ArrowWriter: {e}"))?;
378+
writer
379+
.write(&batch)
380+
.map_err(|e| plan_datafusion_err!("write: {e}"))?;
381+
writer
382+
.close()
383+
.map_err(|e| plan_datafusion_err!("close: {e}"))?;
384+
385+
ctx.register_parquet(
386+
"sample_data_parquet",
387+
path.to_str().unwrap(),
388+
Default::default(),
389+
)
390+
.await?;
391+
Ok(dir)
392+
}
393+
304394
// ============================================================================
305395
// Logical Planning: TableSamplePlanner + TableSamplePlanNode
306396
// ============================================================================
@@ -318,7 +408,7 @@ impl RelationPlanner for TableSamplePlanner {
318408
) -> Result<RelationPlanning> {
319409
// Only handle Table relations with TABLESAMPLE clause
320410
let TableFactor::Table {
321-
sample: Some(sample),
411+
sample: Some(kind),
322412
alias,
323413
name,
324414
args,
@@ -333,23 +423,48 @@ impl RelationPlanner for TableSamplePlanner {
333423
return Ok(RelationPlanning::Original(Box::new(relation)));
334424
};
335425

336-
// Extract sample spec (handles both before/after alias positions)
337-
let sample = match sample {
426+
// Inspect the sample without consuming `kind` yet — we may need to
427+
// hand the relation back unchanged for methods this example doesn't
428+
// implement.
429+
let inspect = match &kind {
338430
ast::TableSampleKind::BeforeTableAlias(s)
339-
| ast::TableSampleKind::AfterTableAlias(s) => s,
431+
| ast::TableSampleKind::AfterTableAlias(s) => s.as_ref(),
340432
};
341433

342-
// Validate sampling method
343-
if let Some(method) = &sample.name
344-
&& *method != TableSampleMethod::Bernoulli
345-
&& *method != TableSampleMethod::Row
346-
{
347-
return not_impl_err!(
348-
"Sampling method {} is not supported (only BERNOULLI and ROW)",
349-
method
434+
// This example handles BERNOULLI / ROW (row-level coin flip) and
435+
// BUCKET (Hive-style modulo). Anything else — most importantly
436+
// SYSTEM / BLOCK — falls through to the next planner so the
437+
// built-in `TableSampleSystemPlanner` (auto-registered via
438+
// `SessionStateBuilder::with_default_features`) can handle it
439+
// with parquet pushdown. **No SYSTEM reimplementation here.**
440+
let we_handle = inspect.bucket.is_some()
441+
|| matches!(
442+
inspect.name,
443+
Some(TableSampleMethod::Bernoulli) | Some(TableSampleMethod::Row) | None
350444
);
445+
if !we_handle {
446+
// Reconstruct the original relation and pass it on.
447+
let original = TableFactor::Table {
448+
sample: Some(kind),
449+
alias,
450+
name,
451+
args,
452+
with_hints,
453+
version,
454+
with_ordinality,
455+
partitions,
456+
json_path,
457+
index_hints,
458+
};
459+
return Ok(RelationPlanning::Original(Box::new(original)));
351460
}
352461

462+
// Extract sample spec (handles both before/after alias positions)
463+
let sample = match kind {
464+
ast::TableSampleKind::BeforeTableAlias(s)
465+
| ast::TableSampleKind::AfterTableAlias(s) => s,
466+
};
467+
353468
// Offset sampling (ClickHouse-style) not supported
354469
if sample.offset.is_some() {
355470
return not_impl_err!(
@@ -553,8 +668,15 @@ impl Hash for HashableF64 {
553668
// Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner
554669
// ============================================================================
555670

556-
/// Custom query planner that registers [`TableSampleExtensionPlanner`] to
557-
/// convert [`TableSamplePlanNode`] into [`SampleExec`].
671+
/// Custom query planner that registers [`TableSampleExtensionPlanner`]
672+
/// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`])
673+
/// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core
674+
/// `Sample` extension node to its `SampleExec`). Both extension planners
675+
/// coexist: each only handles its own logical node type and returns
676+
/// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here,
677+
/// `TABLESAMPLE SYSTEM` queries that fall through to the built-in
678+
/// `TableSampleSystemPlanner` would fail to plan because
679+
/// `with_extension_planners(...)` *replaces* the defaults.
558680
#[derive(Debug)]
559681
struct TableSampleQueryPlanner;
560682

@@ -565,9 +687,10 @@ impl QueryPlanner for TableSampleQueryPlanner {
565687
logical_plan: &LogicalPlan,
566688
session_state: &SessionState,
567689
) -> Result<Arc<dyn ExecutionPlan>> {
568-
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
569-
TableSampleExtensionPlanner,
570-
)]);
690+
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![
691+
Arc::new(TableSampleExtensionPlanner),
692+
Arc::new(SamplePhysicalPlanner),
693+
]);
571694
planner
572695
.create_physical_plan(logical_plan, session_state)
573696
.await

0 commit comments

Comments
 (0)