Skip to content

Commit 2e95f26

Browse files
authored
Minor: Make ProjectionExpr::new easier to use with constants (#19343)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change While reviewing the code from @pepijnve in #19287 I noticed it was a little awkward to create `ProjectionExpr` from constants. Let's add a nicer signature for that. Instead of ```rust ProjectionExpr::new(lit("0"), "l".to_string()), ``` Do ```rust ProjectionExpr::new(lit("0"), "l"), ``` ## What changes are included in this PR? 1. Add new method and simplicy some tests ## Are these changes tested? By CI ## Are there any user-facing changes? A slightly nicer API
1 parent 59dcc36 commit 2e95f26

4 files changed

Lines changed: 73 additions & 90 deletions

File tree

datafusion/core/tests/execution/coop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ async fn join_agg_yields(
655655

656656
let proj_expr = vec![ProjectionExpr::new(
657657
Arc::new(Column::new_with_schema("value", &input_schema)?) as _,
658-
"value".to_string(),
658+
"value",
659659
)];
660660

661661
let projection = Arc::new(ProjectionExec::try_new(proj_expr, join)?);

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 69 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ fn create_simple_csv_exec() -> Arc<dyn ExecutionPlan> {
399399
};
400400
Arc::new(CsvSource::new(schema.clone()).with_csv_options(options))
401401
})
402-
.with_file(PartitionedFile::new("x".to_string(), 100))
402+
.with_file(PartitionedFile::new("x", 100))
403403
.with_projection_indices(Some(vec![0, 1, 2, 3, 4]))
404404
.unwrap()
405405
.build();
@@ -424,7 +424,7 @@ fn create_projecting_csv_exec() -> Arc<dyn ExecutionPlan> {
424424
};
425425
Arc::new(CsvSource::new(schema.clone()).with_csv_options(options))
426426
})
427-
.with_file(PartitionedFile::new("x".to_string(), 100))
427+
.with_file(PartitionedFile::new("x", 100))
428428
.with_projection_indices(Some(vec![3, 2, 1]))
429429
.unwrap()
430430
.build();
@@ -449,8 +449,8 @@ fn test_csv_after_projection() -> Result<()> {
449449
let csv = create_projecting_csv_exec();
450450
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
451451
vec![
452-
ProjectionExpr::new(Arc::new(Column::new("b", 2)), "b".to_string()),
453-
ProjectionExpr::new(Arc::new(Column::new("d", 0)), "d".to_string()),
452+
ProjectionExpr::new(Arc::new(Column::new("b", 2)), "b"),
453+
ProjectionExpr::new(Arc::new(Column::new("d", 0)), "d"),
454454
],
455455
csv.clone(),
456456
)?);
@@ -486,9 +486,9 @@ fn test_memory_after_projection() -> Result<()> {
486486
let memory = create_projecting_memory_exec();
487487
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
488488
vec![
489-
ProjectionExpr::new(Arc::new(Column::new("d", 2)), "d".to_string()),
490-
ProjectionExpr::new(Arc::new(Column::new("e", 3)), "e".to_string()),
491-
ProjectionExpr::new(Arc::new(Column::new("a", 1)), "a".to_string()),
489+
ProjectionExpr::new(Arc::new(Column::new("d", 2)), "d"),
490+
ProjectionExpr::new(Arc::new(Column::new("e", 3)), "e"),
491+
ProjectionExpr::new(Arc::new(Column::new("a", 1)), "a"),
492492
],
493493
memory.clone(),
494494
)?);
@@ -592,9 +592,9 @@ fn test_streaming_table_after_projection() -> Result<()> {
592592
)?;
593593
let projection = Arc::new(ProjectionExec::try_new(
594594
vec![
595-
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()),
596-
ProjectionExpr::new(Arc::new(Column::new("e", 2)), "e".to_string()),
597-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()),
595+
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d"),
596+
ProjectionExpr::new(Arc::new(Column::new("e", 2)), "e"),
597+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
598598
],
599599
Arc::new(streaming_table) as _,
600600
)?) as _;
@@ -659,28 +659,25 @@ fn test_projection_after_projection() -> Result<()> {
659659
let csv = create_simple_csv_exec();
660660
let child_projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
661661
vec![
662-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
663-
ProjectionExpr::new(Arc::new(Column::new("e", 4)), "new_e".to_string()),
664-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()),
665-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "new_b".to_string()),
662+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
663+
ProjectionExpr::new(Arc::new(Column::new("e", 4)), "new_e"),
664+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
665+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "new_b"),
666666
],
667667
csv.clone(),
668668
)?);
669669
let top_projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
670670
vec![
671-
ProjectionExpr::new(Arc::new(Column::new("new_b", 3)), "new_b".to_string()),
671+
ProjectionExpr::new(Arc::new(Column::new("new_b", 3)), "new_b"),
672672
ProjectionExpr::new(
673673
Arc::new(BinaryExpr::new(
674674
Arc::new(Column::new("c", 0)),
675675
Operator::Plus,
676676
Arc::new(Column::new("new_e", 1)),
677677
)),
678-
"binary".to_string(),
679-
),
680-
ProjectionExpr::new(
681-
Arc::new(Column::new("new_b", 3)),
682-
"newest_b".to_string(),
678+
"binary",
683679
),
680+
ProjectionExpr::new(Arc::new(Column::new("new_b", 3)), "newest_b"),
684681
],
685682
child_projection.clone(),
686683
)?);
@@ -745,9 +742,9 @@ fn test_output_req_after_projection() -> Result<()> {
745742
));
746743
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
747744
vec![
748-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
749-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()),
750-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
745+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
746+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a"),
747+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
751748
],
752749
sort_req.clone(),
753750
)?);
@@ -837,9 +834,9 @@ fn test_coalesce_partitions_after_projection() -> Result<()> {
837834
Arc::new(CoalescePartitionsExec::new(csv));
838835
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
839836
vec![
840-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
841-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new".to_string()),
842-
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()),
837+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
838+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new"),
839+
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d"),
843840
],
844841
coalesce_partitions,
845842
)?);
@@ -893,9 +890,9 @@ fn test_filter_after_projection() -> Result<()> {
893890
let filter = Arc::new(FilterExec::try_new(predicate, csv)?);
894891
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
895892
vec![
896-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new".to_string()),
897-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
898-
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()),
893+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_new"),
894+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
895+
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d"),
899896
],
900897
filter.clone(),
901898
)?) as _;
@@ -987,17 +984,11 @@ fn test_join_after_projection() -> Result<()> {
987984
)?);
988985
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
989986
vec![
990-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left".to_string()),
991-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left".to_string()),
992-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left".to_string()),
993-
ProjectionExpr::new(
994-
Arc::new(Column::new("a", 5)),
995-
"a_from_right".to_string(),
996-
),
997-
ProjectionExpr::new(
998-
Arc::new(Column::new("c", 7)),
999-
"c_from_right".to_string(),
1000-
),
987+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left"),
988+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left"),
989+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left"),
990+
ProjectionExpr::new(Arc::new(Column::new("a", 5)), "a_from_right"),
991+
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c_from_right"),
1001992
],
1002993
join,
1003994
)?) as _;
@@ -1116,16 +1107,16 @@ fn test_join_after_required_projection() -> Result<()> {
11161107
)?);
11171108
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
11181109
vec![
1119-
ProjectionExpr::new(Arc::new(Column::new("a", 5)), "a".to_string()),
1120-
ProjectionExpr::new(Arc::new(Column::new("b", 6)), "b".to_string()),
1121-
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c".to_string()),
1122-
ProjectionExpr::new(Arc::new(Column::new("d", 8)), "d".to_string()),
1123-
ProjectionExpr::new(Arc::new(Column::new("e", 9)), "e".to_string()),
1124-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()),
1125-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
1126-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
1127-
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d".to_string()),
1128-
ProjectionExpr::new(Arc::new(Column::new("e", 4)), "e".to_string()),
1110+
ProjectionExpr::new(Arc::new(Column::new("a", 5)), "a"),
1111+
ProjectionExpr::new(Arc::new(Column::new("b", 6)), "b"),
1112+
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c"),
1113+
ProjectionExpr::new(Arc::new(Column::new("d", 8)), "d"),
1114+
ProjectionExpr::new(Arc::new(Column::new("e", 9)), "e"),
1115+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1116+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1117+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
1118+
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d"),
1119+
ProjectionExpr::new(Arc::new(Column::new("e", 4)), "e"),
11291120
],
11301121
join,
11311122
)?) as _;
@@ -1205,7 +1196,7 @@ fn test_nested_loop_join_after_projection() -> Result<()> {
12051196
)?) as _;
12061197

12071198
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1208-
vec![ProjectionExpr::new(col_left_c, "c".to_string())],
1199+
vec![ProjectionExpr::new(col_left_c, "c")],
12091200
Arc::clone(&join),
12101201
)?) as _;
12111202
let initial = displayable(projection.as_ref()).indent(true).to_string();
@@ -1295,13 +1286,10 @@ fn test_hash_join_after_projection() -> Result<()> {
12951286
)?);
12961287
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
12971288
vec![
1298-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left".to_string()),
1299-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left".to_string()),
1300-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left".to_string()),
1301-
ProjectionExpr::new(
1302-
Arc::new(Column::new("c", 7)),
1303-
"c_from_right".to_string(),
1304-
),
1289+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c_from_left"),
1290+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_from_left"),
1291+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a_from_left"),
1292+
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c_from_right"),
13051293
],
13061294
join.clone(),
13071295
)?) as _;
@@ -1337,10 +1325,10 @@ fn test_hash_join_after_projection() -> Result<()> {
13371325

13381326
let projection = Arc::new(ProjectionExec::try_new(
13391327
vec![
1340-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()),
1341-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
1342-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
1343-
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c".to_string()),
1328+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1329+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1330+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
1331+
ProjectionExpr::new(Arc::new(Column::new("c", 7)), "c"),
13441332
],
13451333
join.clone(),
13461334
)?);
@@ -1381,9 +1369,9 @@ fn test_repartition_after_projection() -> Result<()> {
13811369
)?);
13821370
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
13831371
vec![
1384-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_new".to_string()),
1385-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a".to_string()),
1386-
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d_new".to_string()),
1372+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b_new"),
1373+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1374+
ProjectionExpr::new(Arc::new(Column::new("d", 3)), "d_new"),
13871375
],
13881376
repartition,
13891377
)?) as _;
@@ -1450,9 +1438,9 @@ fn test_sort_after_projection() -> Result<()> {
14501438
);
14511439
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
14521440
vec![
1453-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
1454-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()),
1455-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
1441+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
1442+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a"),
1443+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
14561444
],
14571445
Arc::new(sort_exec),
14581446
)?) as _;
@@ -1503,9 +1491,9 @@ fn test_sort_preserving_after_projection() -> Result<()> {
15031491
);
15041492
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
15051493
vec![
1506-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
1507-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()),
1508-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
1494+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
1495+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a"),
1496+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
15091497
],
15101498
Arc::new(sort_exec),
15111499
)?) as _;
@@ -1545,9 +1533,9 @@ fn test_union_after_projection() -> Result<()> {
15451533
let union = UnionExec::try_new(vec![csv.clone(), csv.clone(), csv])?;
15461534
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
15471535
vec![
1548-
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()),
1549-
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a".to_string()),
1550-
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b".to_string()),
1536+
ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c"),
1537+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "new_a"),
1538+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
15511539
],
15521540
union.clone(),
15531541
)?) as _;
@@ -1607,7 +1595,7 @@ fn partitioned_data_source() -> Arc<DataSourceExec> {
16071595
ObjectStoreUrl::parse("test:///").unwrap(),
16081596
Arc::new(CsvSource::new(table_schema).with_csv_options(options)),
16091597
)
1610-
.with_file(PartitionedFile::new("x".to_string(), 100))
1598+
.with_file(PartitionedFile::new("x", 100))
16111599
.with_projection_indices(Some(vec![0, 1, 2]))
16121600
.unwrap()
16131601
.build();
@@ -1624,16 +1612,13 @@ fn test_partition_col_projection_pushdown() -> Result<()> {
16241612
vec![
16251613
ProjectionExpr::new(
16261614
col("string_col", partitioned_schema.as_ref())?,
1627-
"string_col".to_string(),
1615+
"string_col",
16281616
),
16291617
ProjectionExpr::new(
16301618
col("partition_col", partitioned_schema.as_ref())?,
1631-
"partition_col".to_string(),
1632-
),
1633-
ProjectionExpr::new(
1634-
col("int_col", partitioned_schema.as_ref())?,
1635-
"int_col".to_string(),
1619+
"partition_col",
16361620
),
1621+
ProjectionExpr::new(col("int_col", partitioned_schema.as_ref())?, "int_col"),
16371622
],
16381623
source,
16391624
)?);
@@ -1662,7 +1647,7 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> {
16621647
vec![
16631648
ProjectionExpr::new(
16641649
col("string_col", partitioned_schema.as_ref())?,
1665-
"string_col".to_string(),
1650+
"string_col",
16661651
),
16671652
ProjectionExpr::new(
16681653
// CAST(partition_col, Utf8View)
@@ -1671,12 +1656,9 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> {
16711656
partitioned_schema.as_ref(),
16721657
DataType::Utf8View,
16731658
)?,
1674-
"partition_col".to_string(),
1675-
),
1676-
ProjectionExpr::new(
1677-
col("int_col", partitioned_schema.as_ref())?,
1678-
"int_col".to_string(),
1659+
"partition_col",
16791660
),
1661+
ProjectionExpr::new(col("int_col", partitioned_schema.as_ref())?, "int_col"),
16801662
],
16811663
source,
16821664
)?);

datafusion/datasource/src/file_scan_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1852,7 +1852,7 @@ mod tests {
18521852
// the new projection won't include the filtered column.
18531853
let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
18541854
col("c1", &file_schema).unwrap(),
1855-
"c1".to_string(),
1855+
"c1",
18561856
)]);
18571857
let data_source = config
18581858
.try_swapping_with_projection(&exprs)

datafusion/physical-expr/src/projection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ impl std::fmt::Display for ProjectionExpr {
7676

7777
impl ProjectionExpr {
7878
/// Create a new projection expression
79-
pub fn new(expr: Arc<dyn PhysicalExpr>, alias: String) -> Self {
79+
pub fn new(expr: Arc<dyn PhysicalExpr>, alias: impl Into<String>) -> Self {
80+
let alias = alias.into();
8081
Self { expr, alias }
8182
}
8283

0 commit comments

Comments
 (0)