Skip to content

Commit 2cc8d76

Browse files
adriangbclaude
andcommitted
example: rework TABLESAMPLE example to compose with built-in SYSTEM
The relation_planner/table_sample.rs example previously errored on TABLESAMPLE methods other than BERNOULLI / ROW. After commit 3 of this stack registers a built-in TableSampleSystemPlanner by default, the example's planner runs *first* in the chain, so its error short-circuits SYSTEM and prevents the built-in from handling it. Reshape the example to use the chain composition pattern instead: - The planner now returns `RelationPlanning::Original` for methods it doesn't implement (SYSTEM, BLOCK, anything else), so the built-in picks up the slack. **No SYSTEM reimplementation in the example.** - The example's `TableSampleQueryPlanner` registers both `TableSampleExtensionPlanner` (this example's `TableSamplePlanNode` → its own `SampleExec`) **and** `SamplePhysicalPlanner` (core `Sample` → core `SampleExec`), since `with_extension_planners(...)` replaces the defaults. - Adds a parquet-backed `sample_data_parquet` table so SYSTEM has a source it can push into (the existing in-memory `sample_data` doesn't implement `try_push_sample`). - New Example 7 demonstrates `TABLESAMPLE SYSTEM (50) REPEATABLE (42)` against the parquet table; the row count is asserted (the exact rows shift with the tempdir path that gets hashed into the per-file seed, but the count from the cube-root math is stable). - Module docstring rewritten to lead with the composition story. The 3 existing relation_planner example tests (match_recognize, pivot_unpivot, table_sample) still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0f361b5 commit 2cc8d76

1 file changed

Lines changed: 148 additions & 28 deletions

File tree

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 148 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,48 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! # TABLESAMPLE Example
18+
//! # TABLESAMPLE Example — adding row-level sampling on top of the built-in
1919
//!
20-
//! This example demonstrates implementing SQL `TABLESAMPLE` support using
21-
//! DataFusion's extensibility APIs.
20+
//! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the
21+
//! core `Sample` extension node by the auto-registered
22+
//! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the
23+
//! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW`
24+
//! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension,
25+
//! which is what this example demonstrates.
2226
//!
23-
//! This is a working `TABLESAMPLE` implementation that can serve as a starting
24-
//! point for your own projects. It also works as a template for adding other
25-
//! custom SQL operators, covering the full pipeline from parsing to execution.
27+
//! The key composition pattern: when our planner sees `TABLESAMPLE`, it
28+
//! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`)
29+
//! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because
30+
//! `register_relation_planner` prepends to the chain, our planner runs
31+
//! first and the built-in handles whatever we don't. **No SYSTEM
32+
//! reimplementation required.**
2633
//!
2734
//! It shows how to:
2835
//!
2936
//! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`]
3037
//! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`])
3138
//! 3. **Execute** sampling via a custom physical operator ([`SampleExec`])
39+
//! 4. **Compose** with the built-in SYSTEM planner by returning
40+
//! `RelationPlanning::Original` for methods we don't implement
3241
//!
3342
//! ## Supported Syntax
3443
//!
3544
//! ```sql
36-
//! -- Bernoulli sampling (each row has N% chance of selection)
45+
//! -- Bernoulli sampling (each row has N% chance of selection) — this example
3746
//! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT)
3847
//!
39-
//! -- Fractional sampling (0.0 to 1.0)
48+
//! -- Fractional sampling (0.0 to 1.0) — this example
4049
//! SELECT * FROM table TABLESAMPLE (0.1)
4150
//!
42-
//! -- Row count limit
51+
//! -- Row count limit — this example
4352
//! SELECT * FROM table TABLESAMPLE (100 ROWS)
4453
//!
4554
//! -- Reproducible sampling with a seed
4655
//! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)
56+
//!
57+
//! -- SYSTEM (block-level) sampling — handled by the built-in planner
58+
//! -- WITHOUT any code in this example
59+
//! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42)
4760
//! ```
4861
//!
4962
//! ## Architecture
@@ -81,6 +94,7 @@
8194
8295
use std::{
8396
fmt::{self, Debug, Formatter},
97+
fs::File,
8498
hash::{Hash, Hasher},
8599
pin::Pin,
86100
sync::Arc,
@@ -98,9 +112,12 @@ use futures::{
98112
stream::{Stream, StreamExt},
99113
};
100114
use rand::{Rng, SeedableRng, rngs::StdRng};
115+
use tempfile::TempDir;
101116
use tonic::async_trait;
102117

103118
use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal;
119+
use datafusion::parquet::arrow::ArrowWriter;
120+
use datafusion::parquet::file::properties::WriterProperties;
104121
use datafusion::{
105122
execution::{
106123
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
@@ -111,7 +128,9 @@ use datafusion::{
111128
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
112129
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
113130
},
114-
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
131+
physical_planner::{
132+
DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, SamplePhysicalPlanner,
133+
},
115134
prelude::*,
116135
};
117136
use datafusion_common::{
@@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> {
144163

145164
let ctx = SessionContext::new_with_state(state);
146165

147-
// Register custom relation planner for logical planning
166+
// Register custom relation planner for logical planning. It's prepended
167+
// to the chain (so it runs *before* the auto-registered built-in
168+
// `TableSampleSystemPlanner`); when our planner returns
169+
// `RelationPlanning::Original`, the built-in handles it.
148170
ctx.register_relation_planner(Arc::new(TableSamplePlanner))?;
149171
register_sample_data(&ctx)?;
150172

173+
// Register a parquet-backed copy of the same data so we can demonstrate
174+
// SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources;
175+
// the in-memory `sample_data` table can't absorb a `Sample` node.
176+
let _parquet_dir = register_sample_data_parquet(&ctx).await?;
177+
151178
println!("TABLESAMPLE Example");
152179
println!("===================\n");
153180

@@ -273,6 +300,29 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
273300
+---------+---------+---------+---------+
274301
");
275302

303+
// Example 7: SYSTEM sampling — handled by the built-in
304+
// `TableSampleSystemPlanner`, **not** by this example's planner.
305+
// Our planner returns `Original` for SYSTEM, so the chain falls
306+
// through. Routed against the parquet-backed copy of the table so
307+
// the `SamplePushdown` rule can absorb the sample into the scan.
308+
// The actual rows depend on the parquet file path (which is in a
309+
// tempdir, so it varies across runs); we just assert the count
310+
// is what the cube-root math predicts for a single-file /
311+
// single-row-group SYSTEM(50) input: ceil(10 × 0.5) = 5 rows.
312+
let results = run_example(
313+
ctx,
314+
"Example 7: SYSTEM(50) row count (handled by the built-in)",
315+
"SELECT count(*) FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)",
316+
)
317+
.await?;
318+
assert_snapshot!(results, @r"
319+
+----------+
320+
| count(*) |
321+
+----------+
322+
| 5 |
323+
+----------+
324+
");
325+
276326
Ok(())
277327
}
278328

@@ -301,6 +351,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
301351
Ok(())
302352
}
303353

354+
/// Register the same data as `sample_data_parquet`, backed by a tempfile
355+
/// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has
356+
/// a `ParquetSource` to push into. Returns the `TempDir` so the caller
357+
/// can keep it alive for the duration of the queries.
358+
async fn register_sample_data_parquet(ctx: &SessionContext) -> Result<TempDir> {
359+
let dir = TempDir::new().map_err(|e| plan_datafusion_err!("tempdir: {e}"))?;
360+
let path = dir.path().join("sample_data.parquet");
361+
362+
let column1: ArrayRef = Arc::new(Int32Array::from((1..=10).collect::<Vec<i32>>()));
363+
let column2: ArrayRef = Arc::new(StringArray::from(
364+
(1..=10).map(|i| format!("row_{i}")).collect::<Vec<_>>(),
365+
));
366+
let batch =
367+
RecordBatch::try_from_iter(vec![("column1", column1), ("column2", column2)])?;
368+
let file = File::create(&path).map_err(|e| plan_datafusion_err!("create: {e}"))?;
369+
let mut writer = ArrowWriter::try_new(
370+
file,
371+
batch.schema(),
372+
Some(WriterProperties::builder().build()),
373+
)
374+
.map_err(|e| plan_datafusion_err!("ArrowWriter: {e}"))?;
375+
writer
376+
.write(&batch)
377+
.map_err(|e| plan_datafusion_err!("write: {e}"))?;
378+
writer
379+
.close()
380+
.map_err(|e| plan_datafusion_err!("close: {e}"))?;
381+
382+
ctx.register_parquet(
383+
"sample_data_parquet",
384+
path.to_str().unwrap(),
385+
Default::default(),
386+
)
387+
.await?;
388+
Ok(dir)
389+
}
390+
304391
// ============================================================================
305392
// Logical Planning: TableSamplePlanner + TableSamplePlanNode
306393
// ============================================================================
@@ -318,7 +405,7 @@ impl RelationPlanner for TableSamplePlanner {
318405
) -> Result<RelationPlanning> {
319406
// Only handle Table relations with TABLESAMPLE clause
320407
let TableFactor::Table {
321-
sample: Some(sample),
408+
sample: Some(kind),
322409
alias,
323410
name,
324411
args,
@@ -333,23 +420,48 @@ impl RelationPlanner for TableSamplePlanner {
333420
return Ok(RelationPlanning::Original(Box::new(relation)));
334421
};
335422

336-
// Extract sample spec (handles both before/after alias positions)
337-
let sample = match sample {
423+
// Inspect the sample without consuming `kind` yet — we may need to
424+
// hand the relation back unchanged for methods this example doesn't
425+
// implement.
426+
let inspect = match &kind {
338427
ast::TableSampleKind::BeforeTableAlias(s)
339-
| ast::TableSampleKind::AfterTableAlias(s) => s,
428+
| ast::TableSampleKind::AfterTableAlias(s) => s.as_ref(),
340429
};
341430

342-
// Validate sampling method
343-
if let Some(method) = &sample.name
344-
&& *method != TableSampleMethod::Bernoulli
345-
&& *method != TableSampleMethod::Row
346-
{
347-
return not_impl_err!(
348-
"Sampling method {} is not supported (only BERNOULLI and ROW)",
349-
method
431+
// This example handles BERNOULLI / ROW (row-level coin flip) and
432+
// BUCKET (Hive-style modulo). Anything else — most importantly
433+
// SYSTEM / BLOCK — falls through to the next planner so the
434+
// built-in `TableSampleSystemPlanner` (auto-registered via
435+
// `SessionStateBuilder::with_default_features`) can handle it
436+
// with parquet pushdown. **No SYSTEM reimplementation here.**
437+
let we_handle = inspect.bucket.is_some()
438+
|| matches!(
439+
inspect.name,
440+
Some(TableSampleMethod::Bernoulli) | Some(TableSampleMethod::Row) | None
350441
);
442+
if !we_handle {
443+
// Reconstruct the original relation and pass it on.
444+
let original = TableFactor::Table {
445+
sample: Some(kind),
446+
alias,
447+
name,
448+
args,
449+
with_hints,
450+
version,
451+
with_ordinality,
452+
partitions,
453+
json_path,
454+
index_hints,
455+
};
456+
return Ok(RelationPlanning::Original(Box::new(original)));
351457
}
352458

459+
// Extract sample spec (handles both before/after alias positions)
460+
let sample = match kind {
461+
ast::TableSampleKind::BeforeTableAlias(s)
462+
| ast::TableSampleKind::AfterTableAlias(s) => s,
463+
};
464+
353465
// Offset sampling (ClickHouse-style) not supported
354466
if sample.offset.is_some() {
355467
return not_impl_err!(
@@ -553,8 +665,15 @@ impl Hash for HashableF64 {
553665
// Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner
554666
// ============================================================================
555667

556-
/// Custom query planner that registers [`TableSampleExtensionPlanner`] to
557-
/// convert [`TableSamplePlanNode`] into [`SampleExec`].
668+
/// Custom query planner that registers [`TableSampleExtensionPlanner`]
669+
/// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`])
670+
/// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core
671+
/// `Sample` extension node to its `SampleExec`). Both extension planners
672+
/// coexist: each only handles its own logical node type and returns
673+
/// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here,
674+
/// `TABLESAMPLE SYSTEM` queries that fall through to the built-in
675+
/// `TableSampleSystemPlanner` would fail to plan because
676+
/// `with_extension_planners(...)` *replaces* the defaults.
558677
#[derive(Debug)]
559678
struct TableSampleQueryPlanner;
560679

@@ -565,9 +684,10 @@ impl QueryPlanner for TableSampleQueryPlanner {
565684
logical_plan: &LogicalPlan,
566685
session_state: &SessionState,
567686
) -> Result<Arc<dyn ExecutionPlan>> {
568-
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
569-
TableSampleExtensionPlanner,
570-
)]);
687+
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![
688+
Arc::new(TableSampleExtensionPlanner),
689+
Arc::new(SamplePhysicalPlanner),
690+
]);
571691
planner
572692
.create_physical_plan(logical_plan, session_state)
573693
.await

0 commit comments

Comments
 (0)