1515// specific language governing permissions and limitations
1616// under the License.
1717
18- //! # TABLESAMPLE Example
18+ //! # TABLESAMPLE Example — adding row-level sampling on top of the built-in
1919//!
20- //! This example demonstrates implementing SQL `TABLESAMPLE` support using
21- //! DataFusion's extensibility APIs.
20+ //! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the
21+ //! core `Sample` extension node by the auto-registered
22+ //! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the
23+ //! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW`
24+ //! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension,
25+ //! which is what this example demonstrates.
2226//!
23- //! This is a working `TABLESAMPLE` implementation that can serve as a starting
24- //! point for your own projects. It also works as a template for adding other
25- //! custom SQL operators, covering the full pipeline from parsing to execution.
27+ //! The key composition pattern: when our planner sees `TABLESAMPLE`, it
28+ //! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`)
29+ //! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because
30+ //! `register_relation_planner` prepends to the chain, our planner runs
31+ //! first and the built-in handles whatever we don't. **No SYSTEM
32+ //! reimplementation required.**
2633//!
2734//! It shows how to:
2835//!
2936//! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`]
3037//! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`])
3138//! 3. **Execute** sampling via a custom physical operator ([`SampleExec`])
39+ //! 4. **Compose** with the built-in SYSTEM planner by returning
40+ //! `RelationPlanning::Original` for methods we don't implement
3241//!
3342//! ## Supported Syntax
3443//!
3544//! ```sql
36- //! -- Bernoulli sampling (each row has N% chance of selection)
45+ //! -- Bernoulli sampling (each row has N% chance of selection) — this example
3746//! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT)
3847//!
39- //! -- Fractional sampling (0.0 to 1.0)
48+ //! -- Fractional sampling (0.0 to 1.0) — this example
4049//! SELECT * FROM table TABLESAMPLE (0.1)
4150//!
42- //! -- Row count limit
51+ //! -- Row count limit — this example
4352//! SELECT * FROM table TABLESAMPLE (100 ROWS)
4453//!
4554//! -- Reproducible sampling with a seed
4655//! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)
56+ //!
57+ //! -- SYSTEM (block-level) sampling — handled by the built-in planner
58+ //! -- WITHOUT any code in this example
59+ //! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42)
4760//! ```
4861//!
4962//! ## Architecture
8194
8295use std:: {
8396 fmt:: { self , Debug , Formatter } ,
97+ fs:: File ,
8498 hash:: { Hash , Hasher } ,
8599 pin:: Pin ,
86100 sync:: Arc ,
@@ -98,9 +112,12 @@ use futures::{
98112 stream:: { Stream , StreamExt } ,
99113} ;
100114use rand:: { Rng , SeedableRng , rngs:: StdRng } ;
115+ use tempfile:: TempDir ;
101116use tonic:: async_trait;
102117
103118use datafusion:: optimizer:: simplify_expressions:: simplify_literal:: parse_literal;
119+ use datafusion:: parquet:: arrow:: ArrowWriter ;
120+ use datafusion:: parquet:: file:: properties:: WriterProperties ;
104121use datafusion:: {
105122 execution:: {
106123 RecordBatchStream , SendableRecordBatchStream , SessionState , SessionStateBuilder ,
@@ -111,7 +128,9 @@ use datafusion::{
111128 DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties ,
112129 metrics:: { BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet , RecordOutput } ,
113130 } ,
114- physical_planner:: { DefaultPhysicalPlanner , ExtensionPlanner , PhysicalPlanner } ,
131+ physical_planner:: {
132+ DefaultPhysicalPlanner , ExtensionPlanner , PhysicalPlanner , SamplePhysicalPlanner ,
133+ } ,
115134 prelude:: * ,
116135} ;
117136use datafusion_common:: {
@@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> {
144163
145164 let ctx = SessionContext :: new_with_state ( state) ;
146165
147- // Register custom relation planner for logical planning
166+ // Register custom relation planner for logical planning. It's prepended
167+ // to the chain (so it runs *before* the auto-registered built-in
168+ // `TableSampleSystemPlanner`); when our planner returns
169+ // `RelationPlanning::Original`, the built-in handles it.
148170 ctx. register_relation_planner ( Arc :: new ( TableSamplePlanner ) ) ?;
149171 register_sample_data ( & ctx) ?;
150172
173+ // Register a parquet-backed copy of the same data so we can demonstrate
174+ // SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources;
175+ // the in-memory `sample_data` table can't absorb a `Sample` node.
176+ let _parquet_dir = register_sample_data_parquet ( & ctx) . await ?;
177+
151178 println ! ( "TABLESAMPLE Example" ) ;
152179 println ! ( "===================\n " ) ;
153180
@@ -273,6 +300,32 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
273300 +---------+---------+---------+---------+
274301 " ) ;
275302
303+ // Example 7: SYSTEM sampling — handled by the built-in
304+ // `TableSampleSystemPlanner`, **not** by this example's planner.
305+ // Our planner returns `Original` for SYSTEM, so the chain falls
306+ // through. Routed against the parquet-backed copy of the table so
307+ // the `SamplePushdown` rule can absorb the sample into the scan.
308+ // `REPEATABLE(42)` makes the rows deterministic across runs and
309+ // across machines (the seed dominates the file path in the
310+ // sampler's hash input).
311+ let results = run_example (
312+ ctx,
313+ "Example 7: SYSTEM (handled by the built-in, not this example)" ,
314+ "SELECT * FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)" ,
315+ )
316+ . await ?;
317+ assert_snapshot ! ( results, @r"
318+ +---------+---------+
319+ | column1 | column2 |
320+ +---------+---------+
321+ | 5 | row_5 |
322+ | 6 | row_6 |
323+ | 7 | row_7 |
324+ | 8 | row_8 |
325+ | 9 | row_9 |
326+ +---------+---------+
327+ " ) ;
328+
276329 Ok ( ( ) )
277330}
278331
@@ -301,6 +354,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
301354 Ok ( ( ) )
302355}
303356
357+ /// Register the same data as `sample_data_parquet`, backed by a tempfile
358+ /// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has
359+ /// a `ParquetSource` to push into. Returns the `TempDir` so the caller
360+ /// can keep it alive for the duration of the queries.
361+ async fn register_sample_data_parquet ( ctx : & SessionContext ) -> Result < TempDir > {
362+ let dir = TempDir :: new ( ) . map_err ( |e| plan_datafusion_err ! ( "tempdir: {e}" ) ) ?;
363+ let path = dir. path ( ) . join ( "sample_data.parquet" ) ;
364+
365+ let column1: ArrayRef = Arc :: new ( Int32Array :: from ( ( 1 ..=10 ) . collect :: < Vec < i32 > > ( ) ) ) ;
366+ let column2: ArrayRef = Arc :: new ( StringArray :: from (
367+ ( 1 ..=10 ) . map ( |i| format ! ( "row_{i}" ) ) . collect :: < Vec < _ > > ( ) ,
368+ ) ) ;
369+ let batch =
370+ RecordBatch :: try_from_iter ( vec ! [ ( "column1" , column1) , ( "column2" , column2) ] ) ?;
371+ let file = File :: create ( & path) . map_err ( |e| plan_datafusion_err ! ( "create: {e}" ) ) ?;
372+ let mut writer = ArrowWriter :: try_new (
373+ file,
374+ batch. schema ( ) ,
375+ Some ( WriterProperties :: builder ( ) . build ( ) ) ,
376+ )
377+ . map_err ( |e| plan_datafusion_err ! ( "ArrowWriter: {e}" ) ) ?;
378+ writer
379+ . write ( & batch)
380+ . map_err ( |e| plan_datafusion_err ! ( "write: {e}" ) ) ?;
381+ writer
382+ . close ( )
383+ . map_err ( |e| plan_datafusion_err ! ( "close: {e}" ) ) ?;
384+
385+ ctx. register_parquet (
386+ "sample_data_parquet" ,
387+ path. to_str ( ) . unwrap ( ) ,
388+ Default :: default ( ) ,
389+ )
390+ . await ?;
391+ Ok ( dir)
392+ }
393+
304394// ============================================================================
305395// Logical Planning: TableSamplePlanner + TableSamplePlanNode
306396// ============================================================================
@@ -318,7 +408,7 @@ impl RelationPlanner for TableSamplePlanner {
318408 ) -> Result < RelationPlanning > {
319409 // Only handle Table relations with TABLESAMPLE clause
320410 let TableFactor :: Table {
321- sample : Some ( sample ) ,
411+ sample : Some ( kind ) ,
322412 alias,
323413 name,
324414 args,
@@ -333,23 +423,48 @@ impl RelationPlanner for TableSamplePlanner {
333423 return Ok ( RelationPlanning :: Original ( Box :: new ( relation) ) ) ;
334424 } ;
335425
336- // Extract sample spec (handles both before/after alias positions)
337- let sample = match sample {
426+ // Inspect the sample without consuming `kind` yet — we may need to
427+ // hand the relation back unchanged for methods this example doesn't
428+ // implement.
429+ let inspect = match & kind {
338430 ast:: TableSampleKind :: BeforeTableAlias ( s)
339- | ast:: TableSampleKind :: AfterTableAlias ( s) => s,
431+ | ast:: TableSampleKind :: AfterTableAlias ( s) => s. as_ref ( ) ,
340432 } ;
341433
342- // Validate sampling method
343- if let Some ( method) = & sample. name
344- && * method != TableSampleMethod :: Bernoulli
345- && * method != TableSampleMethod :: Row
346- {
347- return not_impl_err ! (
348- "Sampling method {} is not supported (only BERNOULLI and ROW)" ,
349- method
434+ // This example handles BERNOULLI / ROW (row-level coin flip) and
435+ // BUCKET (Hive-style modulo). Anything else — most importantly
436+ // SYSTEM / BLOCK — falls through to the next planner so the
437+ // built-in `TableSampleSystemPlanner` (auto-registered via
438+ // `SessionStateBuilder::with_default_features`) can handle it
439+ // with parquet pushdown. **No SYSTEM reimplementation here.**
440+ let we_handle = inspect. bucket . is_some ( )
441+ || matches ! (
442+ inspect. name,
443+ Some ( TableSampleMethod :: Bernoulli ) | Some ( TableSampleMethod :: Row ) | None
350444 ) ;
445+ if !we_handle {
446+ // Reconstruct the original relation and pass it on.
447+ let original = TableFactor :: Table {
448+ sample : Some ( kind) ,
449+ alias,
450+ name,
451+ args,
452+ with_hints,
453+ version,
454+ with_ordinality,
455+ partitions,
456+ json_path,
457+ index_hints,
458+ } ;
459+ return Ok ( RelationPlanning :: Original ( Box :: new ( original) ) ) ;
351460 }
352461
462+ // Extract sample spec (handles both before/after alias positions)
463+ let sample = match kind {
464+ ast:: TableSampleKind :: BeforeTableAlias ( s)
465+ | ast:: TableSampleKind :: AfterTableAlias ( s) => s,
466+ } ;
467+
353468 // Offset sampling (ClickHouse-style) not supported
354469 if sample. offset . is_some ( ) {
355470 return not_impl_err ! (
@@ -553,8 +668,15 @@ impl Hash for HashableF64 {
553668// Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner
554669// ============================================================================
555670
556- /// Custom query planner that registers [`TableSampleExtensionPlanner`] to
557- /// convert [`TableSamplePlanNode`] into [`SampleExec`].
671+ /// Custom query planner that registers [`TableSampleExtensionPlanner`]
672+ /// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`])
673+ /// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core
674+ /// `Sample` extension node to its `SampleExec`). Both extension planners
675+ /// coexist: each only handles its own logical node type and returns
676+ /// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here,
677+ /// `TABLESAMPLE SYSTEM` queries that fall through to the built-in
678+ /// `TableSampleSystemPlanner` would fail to plan because
679+ /// `with_extension_planners(...)` *replaces* the defaults.
558680#[ derive( Debug ) ]
559681struct TableSampleQueryPlanner ;
560682
@@ -565,9 +687,10 @@ impl QueryPlanner for TableSampleQueryPlanner {
565687 logical_plan : & LogicalPlan ,
566688 session_state : & SessionState ,
567689 ) -> Result < Arc < dyn ExecutionPlan > > {
568- let planner = DefaultPhysicalPlanner :: with_extension_planners ( vec ! [ Arc :: new(
569- TableSampleExtensionPlanner ,
570- ) ] ) ;
690+ let planner = DefaultPhysicalPlanner :: with_extension_planners ( vec ! [
691+ Arc :: new( TableSampleExtensionPlanner ) ,
692+ Arc :: new( SamplePhysicalPlanner ) ,
693+ ] ) ;
571694 planner
572695 . create_physical_plan ( logical_plan, session_state)
573696 . await
0 commit comments