Skip to content

Commit 2c0a38b

Browse files
alambhaohuaijin
andauthored
[branch-53] ser/de fetch in FilterExec (#20738) (#20883)
- Part of #19692 - Closes #20737 on branch-53 This PR: - Backports #20738 from @haohuaijin to the branch-53 line Co-authored-by: Huaijin <haohuaijin@gmail.com>
1 parent e922af9 commit 2c0a38b

6 files changed

Lines changed: 41 additions & 0 deletions

File tree

datafusion/physical-plan/src/filter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,10 @@ impl ExecutionPlan for FilterExec {
718718
})
719719
}
720720

721+
fn fetch(&self) -> Option<usize> {
722+
self.fetch
723+
}
724+
721725
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
722726
Some(Arc::new(Self {
723727
predicate: Arc::clone(&self.predicate),

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,7 @@ message FilterExecNode {
10301030
uint32 default_filter_selectivity = 3;
10311031
repeated uint32 projection = 9;
10321032
uint32 batch_size = 10;
1033+
optional uint32 fetch = 11;
10331034
}
10341035

10351036
message FileGroup {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ impl protobuf::PhysicalPlanNode {
691691
let filter = FilterExecBuilder::new(predicate, input)
692692
.apply_projection(projection)?
693693
.with_batch_size(filter.batch_size as usize)
694+
.with_fetch(filter.fetch.map(|f| f as usize))
694695
.build()?;
695696
match filter_selectivity {
696697
Ok(filter_selectivity) => Ok(Arc::new(
@@ -2320,6 +2321,7 @@ impl protobuf::PhysicalPlanNode {
23202321
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
23212322
}),
23222323
batch_size: exec.batch_size() as u32,
2324+
fetch: exec.fetch().map(|f| f as u32),
23232325
},
23242326
))),
23252327
})

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,19 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
794794
)?))
795795
}
796796

797+
#[test]
798+
fn roundtrip_filter_with_fetch() -> Result<()> {
799+
let field_a = Field::new("a", DataType::Boolean, false);
800+
let field_b = Field::new("b", DataType::Int64, false);
801+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
802+
let predicate = col("a", &schema)?;
803+
let filter = FilterExecBuilder::new(predicate, Arc::new(EmptyExec::new(schema)))
804+
.with_fetch(Some(10))
805+
.build()?;
806+
assert_eq!(filter.fetch(), Some(10));
807+
roundtrip_test(Arc::new(filter))
808+
}
809+
797810
#[test]
798811
fn roundtrip_sort() -> Result<()> {
799812
let field_a = Field::new("a", DataType::Boolean, false);

0 commit comments

Comments
 (0)