Skip to content

Commit f802ed1

Browse files
authored
Add protobuf serialization/deserialization support for EmptyTable scans (#20844)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> I figured it will be easier to submit PR right away as change doesn't look controversial. I'm happy to create an issue and link it here if you'd prefer. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> So short story is: in another project we'd like to use DataFusion's to "build" operations on data and then submit resulting logical plan _somewhere_ to execute (likely not using DF to actually execute the query). Since those plans never meant to be executed by DF we use `EmptyTable` as a base to bring schema to DF without any actual data. `EmptyTable` scans not being serializable prevents us from sending those plans to Python or over the wire. I believe this change makes datafusion's LogicalPlan more portable and more usable outside of datafusion's query executor. Longer story: [VegaFusion](https://github.com/vega/vegafusion) does server-side aggregation for Vega charts and is powered by DataFusion. We recently added option to [use custom query/plan executors](vega/vegafusion#573), which allows user to pass a schema (without data) to VegaFusion which will add all necessary aggregations (but not execute them) and return a logical plan to user. They can then outsource this plan to custom query executor (e.g. Spark). This is already implemented and works. However, since VegaFusion is most commonly used through Python bindings, we'd like to expose this API to Python too (and additionally as part of gPRC API too) , which requires serializing built plans to protobuf. Currently we use `EmptyTable` to bring schema without any data to DataFusion. But since it can't be converted to protobuf, we're unable to expose this API. We considered providing custom decoder/encoder, but that would work only for gRPC case, but not Python as datafusion-python doesn't allow to provide custom decoder as far as I understand. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> * Moved `EmptyTable` from `datafusion-core` into `datafusion-catalog` and added backwards compatibility re-export (following pattern for other table providers moved earlier) * Added new `EmptyTableScanNode` to protobuf definitions * Added encoding and decoding for new entity into `AsLogicalPlan for LogicalPlanNode` implementation ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> I added two roundtrip tests for the new node ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> `EmptyTable` can be imported from `datafusion-catalog` crate now, but old crate (`datafusion-core`) still re-exports it, so this shouldn't be breaking change <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> P.S. Just to be explicit, code itself was written mostly by LLM (as I'm not that proficient in Rust yet). I did review and test it though
1 parent af7904f commit f802ed1

8 files changed

Lines changed: 281 additions & 8 deletions

File tree

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ use std::sync::Arc;
2121

2222
use arrow::datatypes::*;
2323
use async_trait::async_trait;
24-
use datafusion_catalog::Session;
25-
use datafusion_common::project_schema;
26-
27-
use crate::datasource::{TableProvider, TableType};
28-
use crate::error::Result;
29-
use crate::logical_expr::Expr;
24+
use datafusion_common::{Result, project_schema};
25+
use datafusion_expr::{Expr, TableType};
3026
use datafusion_physical_plan::ExecutionPlan;
3127
use datafusion_physical_plan::empty::EmptyExec;
3228

29+
use crate::Session;
30+
use crate::TableProvider;
31+
3332
/// An empty plan that is useful for testing and generating plans
3433
/// without mapping them to actual data.
3534
#[derive(Debug)]

datafusion/catalog/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
3535
pub mod cte_worktable;
3636
pub mod default_table_source;
37+
pub mod empty;
3738
pub mod information_schema;
3839
pub mod listing_schema;
3940
pub mod memory;

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
//! [`ListingTable`]: crate::datasource::listing::ListingTable
2121
2222
pub mod dynamic_file;
23-
pub mod empty;
2423
pub mod file_format;
2524
pub mod listing;
2625
pub mod listing_table_factory;
@@ -39,6 +38,7 @@ pub use crate::catalog::TableProvider;
3938
pub use crate::logical_expr::TableType;
4039
pub use datafusion_catalog::cte_worktable;
4140
pub use datafusion_catalog::default_table_source;
41+
pub use datafusion_catalog::empty;
4242
pub use datafusion_catalog::memory;
4343
pub use datafusion_catalog::stream;
4444
pub use datafusion_catalog::view;

datafusion/proto/proto/datafusion.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ message LogicalPlanNode {
6262
RecursiveQueryNode recursive_query = 31;
6363
CteWorkTableScanNode cte_work_table_scan = 32;
6464
DmlNode dml = 33;
65+
EmptyTableScanNode empty_table_scan = 34;
6566
}
6667
}
6768

@@ -1426,6 +1427,13 @@ message CteWorkTableScanNode {
14261427
datafusion_common.Schema schema = 2;
14271428
}
14281429

1430+
message EmptyTableScanNode {
1431+
TableReference table_name = 1;
1432+
datafusion_common.Schema schema = 2;
1433+
ProjectionColumns projection = 3;
1434+
repeated LogicalExprNode filters = 4;
1435+
}
1436+
14291437
enum GenerateSeriesName {
14301438
GS_GENERATE_SERIES = 0;
14311439
GS_RANGE = 1;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 157 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 14 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
use crate::protobuf::{ToProtoError, proto_error};
3636
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
3737
use datafusion_catalog::cte_worktable::CteWorkTable;
38+
use datafusion_catalog::empty::EmptyTable;
3839
use datafusion_common::file_options::file_type::FileType;
3940
use datafusion_common::{
4041
Result, TableReference, ToDFSchema, assert_or_internal_err, context,
@@ -1065,6 +1066,35 @@ impl AsLogicalPlan for LogicalPlanNode {
10651066
)?
10661067
.build()
10671068
}
1069+
LogicalPlanType::EmptyTableScan(scan) => {
1070+
let schema: Schema = convert_required!(scan.schema)?;
1071+
let schema = Arc::new(schema);
1072+
let mut projection = None;
1073+
if let Some(columns) = &scan.projection {
1074+
let column_indices = columns
1075+
.columns
1076+
.iter()
1077+
.map(|name| schema.index_of(name))
1078+
.collect::<Result<Vec<usize>, _>>()?;
1079+
projection = Some(column_indices);
1080+
}
1081+
1082+
let filters =
1083+
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
1084+
1085+
let table_name =
1086+
from_table_reference(scan.table_name.as_ref(), "EmptyTableScan")?;
1087+
1088+
let provider = Arc::new(EmptyTable::new(Arc::clone(&schema)));
1089+
1090+
LogicalPlanBuilder::scan_with_filters(
1091+
table_name,
1092+
provider_as_source(provider),
1093+
projection,
1094+
filters,
1095+
)?
1096+
.build()
1097+
}
10681098
LogicalPlanType::Dml(dml_node) => {
10691099
Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
10701100
from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
@@ -1277,6 +1307,19 @@ impl AsLogicalPlan for LogicalPlanNode {
12771307
},
12781308
)),
12791309
})
1310+
} else if provider.downcast_ref::<EmptyTable>().is_some() {
1311+
let schema: protobuf::Schema = schema.as_ref().try_into()?;
1312+
1313+
Ok(LogicalPlanNode {
1314+
logical_plan_type: Some(LogicalPlanType::EmptyTableScan(
1315+
protobuf::EmptyTableScanNode {
1316+
table_name: Some(table_name.clone().into()),
1317+
schema: Some(schema),
1318+
projection,
1319+
filters,
1320+
},
1321+
)),
1322+
})
12801323
} else {
12811324
let schema: protobuf::Schema = schema.as_ref().try_into()?;
12821325
let mut bytes = vec![];

0 commit comments

Comments
 (0)