Skip to content

Commit d8d171c

Browse files
committed
feat: add approx_top_k aggregate function
Add a new approx_top_k(expression, k) aggregate function that returns the approximate top-k most frequent values with their estimated counts, using the Filtered Space-Saving algorithm. The implementation uses a capacity multiplier of 3 (matching ClickHouse's default) and includes an alpha map for improved accuracy by filtering low-frequency noise before it enters the main summary. Return type is List(Struct({value: T, count: UInt64})) ordered by count descending, where T matches the input column type. Closes #20967
1 parent ff844be commit d8d171c

7 files changed

Lines changed: 1566 additions & 4 deletions

File tree

datafusion/core/tests/dataframe/dataframe_functions.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ use datafusion_common::test_util::batches_to_string;
3232
use datafusion_common::{DFSchema, ScalarValue};
3333
use datafusion_expr::expr::Alias;
3434
use datafusion_expr::{ExprSchemable, LogicalPlanBuilder, table_scan};
35-
use datafusion_functions_aggregate::expr_fn::{approx_median, approx_percentile_cont};
35+
use datafusion_functions_aggregate::expr_fn::{
36+
approx_median, approx_percentile_cont, approx_top_k,
37+
};
3638
use datafusion_functions_nested::map::map;
3739
use insta::assert_snapshot;
3840

@@ -409,6 +411,28 @@ async fn test_fn_approx_median() -> Result<()> {
409411
Ok(())
410412
}
411413

414+
#[tokio::test]
415+
async fn test_fn_approx_top_k() -> Result<()> {
416+
// Column b has values [1, 10, 10, 100] -- 10 appears twice, others once.
417+
// Use k=1 to avoid non-deterministic ordering among tied items.
418+
let expr = approx_top_k(vec![col("b"), lit(1)]);
419+
420+
let df = create_test_table().await?;
421+
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;
422+
423+
assert_snapshot!(
424+
batches_to_string(&batches),
425+
@r"
426+
+-------------------------------+
427+
| approx_top_k(test.b,Int32(1)) |
428+
+-------------------------------+
429+
| [{value: 10, count: 2}] |
430+
+-------------------------------+
431+
");
432+
433+
Ok(())
434+
}
435+
412436
#[tokio::test]
413437
async fn test_fn_approx_percentile_cont() -> Result<()> {
414438
let expr = approx_percentile_cont(col("b").sort(true, false), lit(0.5), None);

0 commit comments

Comments
 (0)