Skip to content

Commit 4dbb449

Browse files
authored
ser/de fetch in FilterExec (#20738)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20737 ## Rationale for this change FilterExec have fetch filed but not impl the ser/de in proto ## What changes are included in this PR? add ser/de for fetch in FilterExec ## Are these changes tested? add one test case ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 92078d9 commit 4dbb449

File tree

6 files changed

+41
-0
lines changed

6 files changed

+41
-0
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,10 @@ impl ExecutionPlan for FilterExec {
725725
})
726726
}
727727

728+
fn fetch(&self) -> Option<usize> {
729+
self.fetch
730+
}
731+
728732
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
729733
Some(Arc::new(Self {
730734
predicate: Arc::clone(&self.predicate),

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,7 @@ message FilterExecNode {
10341034
uint32 default_filter_selectivity = 3;
10351035
repeated uint32 projection = 9;
10361036
uint32 batch_size = 10;
1037+
optional uint32 fetch = 11;
10371038
}
10381039

10391040
message FileGroup {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,7 @@ impl protobuf::PhysicalPlanNode {
695695
let filter = FilterExecBuilder::new(predicate, input)
696696
.apply_projection(projection)?
697697
.with_batch_size(filter.batch_size as usize)
698+
.with_fetch(filter.fetch.map(|f| f as usize))
698699
.build()?;
699700
match filter_selectivity {
700701
Ok(filter_selectivity) => Ok(Arc::new(
@@ -2334,6 +2335,7 @@ impl protobuf::PhysicalPlanNode {
23342335
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
23352336
}),
23362337
batch_size: exec.batch_size() as u32,
2338+
fetch: exec.fetch().map(|f| f as u32),
23372339
},
23382340
))),
23392341
})

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,19 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
795795
)?))
796796
}
797797

798+
#[test]
799+
fn roundtrip_filter_with_fetch() -> Result<()> {
800+
let field_a = Field::new("a", DataType::Boolean, false);
801+
let field_b = Field::new("b", DataType::Int64, false);
802+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
803+
let predicate = col("a", &schema)?;
804+
let filter = FilterExecBuilder::new(predicate, Arc::new(EmptyExec::new(schema)))
805+
.with_fetch(Some(10))
806+
.build()?;
807+
assert_eq!(filter.fetch(), Some(10));
808+
roundtrip_test(Arc::new(filter))
809+
}
810+
798811
#[test]
799812
fn roundtrip_sort() -> Result<()> {
800813
let field_a = Field::new("a", DataType::Boolean, false);

0 commit comments

Comments
 (0)