@@ -33,6 +33,7 @@ mod test {
3333 use datafusion_functions_aggregate:: count:: count_udaf;
3434 use datafusion_physical_expr:: aggregate:: AggregateExprBuilder ;
3535 use datafusion_physical_expr:: expressions:: { binary, col, lit, Column } ;
36+ use datafusion_physical_expr:: Partitioning ;
3637 use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
3738 use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
3839 use datafusion_physical_plan:: aggregates:: {
@@ -47,6 +48,7 @@ mod test {
4748 use datafusion_physical_plan:: limit:: { GlobalLimitExec , LocalLimitExec } ;
4849 use datafusion_physical_plan:: placeholder_row:: PlaceholderRowExec ;
4950 use datafusion_physical_plan:: projection:: ProjectionExec ;
51+ use datafusion_physical_plan:: repartition:: RepartitionExec ;
5052 use datafusion_physical_plan:: sorts:: sort:: SortExec ;
5153 use datafusion_physical_plan:: union:: UnionExec ;
5254 use datafusion_physical_plan:: {
@@ -761,4 +763,104 @@ mod test {
761763
762764 Ok ( ( ) )
763765 }
766+
767+ #[ tokio:: test]
768+ async fn test_statistic_by_partition_of_repartition ( ) -> Result < ( ) > {
769+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
770+
771+ let repartition = Arc :: new ( RepartitionExec :: try_new (
772+ scan. clone ( ) ,
773+ Partitioning :: RoundRobinBatch ( 3 ) ,
774+ ) ?) ;
775+
776+ let statistics = ( 0 ..repartition. partitioning ( ) . partition_count ( ) )
777+ . map ( |idx| repartition. partition_statistics ( Some ( idx) ) )
778+ . collect :: < Result < Vec < _ > > > ( ) ?;
779+ assert_eq ! ( statistics. len( ) , 3 ) ;
780+
781+ let expected_stats = Statistics {
782+ num_rows : Precision :: Inexact ( 1 ) ,
783+ total_byte_size : Precision :: Inexact ( 73 ) ,
784+ column_statistics : vec ! [
785+ ColumnStatistics :: new_unknown( ) ,
786+ ColumnStatistics :: new_unknown( ) ,
787+ ] ,
788+ } ;
789+
790+ // All partitions should have the same statistics
791+ for stat in statistics. iter ( ) {
792+ assert_eq ! ( stat, & expected_stats) ;
793+ }
794+
795+ // Verify that the result has exactly 3 partitions
796+ let partitions = execute_stream_partitioned (
797+ repartition. clone ( ) ,
798+ Arc :: new ( TaskContext :: default ( ) ) ,
799+ ) ?;
800+ assert_eq ! ( partitions. len( ) , 3 ) ;
801+
802+ // Collect row counts from each partition
803+ let mut partition_row_counts = Vec :: new ( ) ;
804+ for partition_stream in partitions. into_iter ( ) {
805+ let results: Vec < RecordBatch > = partition_stream. try_collect ( ) . await ?;
806+ let total_rows: usize = results. iter ( ) . map ( |batch| batch. num_rows ( ) ) . sum ( ) ;
807+ partition_row_counts. push ( total_rows) ;
808+ }
809+ assert_eq ! ( partition_row_counts. len( ) , 3 ) ;
810+ assert_eq ! ( partition_row_counts[ 0 ] , 2 ) ;
811+ assert_eq ! ( partition_row_counts[ 1 ] , 2 ) ;
812+ assert_eq ! ( partition_row_counts[ 2 ] , 0 ) ;
813+
814+ Ok ( ( ) )
815+ }
816+
817+ #[ tokio:: test]
818+ async fn test_statistic_by_partition_of_repartition_invalid_partition ( ) -> Result < ( ) >
819+ {
820+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
821+
822+ let repartition = Arc :: new ( RepartitionExec :: try_new (
823+ scan. clone ( ) ,
824+ Partitioning :: RoundRobinBatch ( 2 ) ,
825+ ) ?) ;
826+
827+ let result = repartition. partition_statistics ( Some ( 2 ) ) ;
828+ assert ! ( result. is_err( ) ) ;
829+ let error = result. unwrap_err ( ) ;
830+ assert ! ( error
831+ . to_string( )
832+ . contains( "RepartitionExec invalid partition 2 (expected less than 2)" ) ) ;
833+
834+ let partitions = execute_stream_partitioned (
835+ repartition. clone ( ) ,
836+ Arc :: new ( TaskContext :: default ( ) ) ,
837+ ) ?;
838+ assert_eq ! ( partitions. len( ) , 2 ) ;
839+
840+ Ok ( ( ) )
841+ }
842+
843+ #[ tokio:: test]
844+ async fn test_statistic_by_partition_of_repartition_zero_partitions ( ) -> Result < ( ) > {
845+ let scan = create_scan_exec_with_statistics ( None , Some ( 2 ) ) . await ;
846+ let scan_schema = scan. schema ( ) ;
847+
848+ // Create a repartition with 0 partitions
849+ let repartition = Arc :: new ( RepartitionExec :: try_new (
850+ Arc :: new ( EmptyExec :: new ( scan_schema. clone ( ) ) ) ,
851+ Partitioning :: RoundRobinBatch ( 0 ) ,
852+ ) ?) ;
853+
854+ let result = repartition. partition_statistics ( Some ( 0 ) ) ?;
855+ assert_eq ! ( result, Statistics :: new_unknown( & scan_schema) ) ;
856+
857+ // Verify that the result has exactly 0 partitions
858+ let partitions = execute_stream_partitioned (
859+ repartition. clone ( ) ,
860+ Arc :: new ( TaskContext :: default ( ) ) ,
861+ ) ?;
862+ assert_eq ! ( partitions. len( ) , 0 ) ;
863+
864+ Ok ( ( ) )
865+ }
764866}
0 commit comments