perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088
perf: default multi COUNT(DISTINCT) logical optimizer rewrite#21088ydgandhi wants to merge 12 commits intoapache:mainfrom
Conversation
|
Hi @Dandandan - thanks for this work; the cross-join split for multiple distinct aggregates with no I’ve been working on a related but different pattern: Here is a concrete example from our internal benchmark suite on an SELECT
seller_name,
COUNT(*) as total_orders,
COUNT(DISTINCT delivery_city) as cities_served,
COUNT(DISTINCT state) as states_served,
SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) as completed_orders,
SUM(CASE WHEN order_status = 'Cancelled' THEN 1 ELSE 0 END) as cancelled_orders,
ROUND(100.0 * SUM(CASE WHEN order_status = 'Completed' THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM ecommerce_orders
GROUP BY seller_name
HAVING COUNT(*) > 100
ORDER BY total_orders DESC
LIMIT 100;This is not “global” multi-distinct: it’s per In my opinion, they’re complementary: different predicates, different plans, and they can coexist in the optimizer pipeline (we’d want to sanity-check rule order so we don’t double-rewrite the same node). Happy to align naming, tests, and placement with you and the maintainers. |
|
I personally think this PR is a more generalized rewrite approach. #20940 might cover a limited range of scenarios, but it is more rigorous. Could the current PR add some test cases? |
|
Thanks for the review. I asked cursor to add a few tests Tests for
|
| Test | What it asserts |
|---|---|
rewrites_two_count_distinct |
GROUP BY a + COUNT(DISTINCT b), COUNT(DISTINCT c) → inner joins, per-branch null filters on b/c, mdc_base + two mdc_d aliases. |
rewrites_global_three_count_distinct |
No GROUP BY, three COUNT(DISTINCT …) → cross/inner join rewrite; no mdc_base (global-only path). |
rewrites_two_count_distinct_with_non_distinct_count |
Grouped BI-style: two distincts + COUNT(a) → join rewrite with mdc_base holding the non-distinct agg. |
does_not_rewrite_two_count_distinct_same_column |
Two COUNT(DISTINCT b) with different aliases → no rewrite (duplicate distinct key). |
does_not_rewrite_single_count_distinct |
Only one COUNT(DISTINCT …) → no rewrite (rule needs ≥2 distincts). |
rewrites_three_count_distinct_grouped |
Three grouped COUNT(DISTINCT …) on b, c, a → two inner joins + mdc_base. |
rewrites_interleaved_non_distinct_between_distincts |
Order COUNT(DISTINCT b), COUNT(a), COUNT(DISTINCT c) → rewrite + mdc_base for the middle non-distinct agg (projection order / interleaving). |
rewrites_count_distinct_on_cast_exprs |
COUNT(DISTINCT CAST(b AS Int64)), same for c → rewrite + null filters on the cast expressions. |
does_not_rewrite_grouping_sets_multi_distinct |
GROUPING SETS aggregate with two COUNT(DISTINCT …) → no rewrite (rule bails on grouping sets). |
does_not_rewrite_mixed_agg |
COUNT(DISTINCT b) + COUNT(c) → no rewrite (only one COUNT(DISTINCT …); rule requires at least two). |
SQL integration — datafusion/core/tests/sql/aggregates/multi_distinct_count_rewrite.rs
| Test | What it asserts |
|---|---|
multi_count_distinct_matches_expected_with_nulls |
End-to-end grouped two COUNT(DISTINCT …) with NULLs in distinct columns; exact sorted batch string vs expected counts. |
multi_count_distinct_with_count_star_matches_expected |
COUNT(*) plus two COUNT(DISTINCT …) per group (BI-style); exact result table. |
multi_count_distinct_two_group_keys_matches_expected |
GROUP BY g1, g2 + two distincts; verifies joins line up on all group keys and numerics match. |
a7348f6 to
4321c50
Compare
|
I think a rewrite like this might be useful, but I think it can also hurt performance because of the join on grouping keys. I am also wondering if mostly for memory usage a |
@Dandandan Thank you for the explanation. It’s true that this would add a hash join, but if aggregation can be performed in parallel, there might be advantages in scenarios with two or more COUNT(DISTINCT) operations. I agree to run performance tests across multiple scenarios to evaluate the actual results. |
34467ff to
274eeb6
Compare
|
Thanks @Dandandan @xiedeyantu for the discussion. Join cost / default behavior: Agreed that joining on grouping keys isn’t free. We’ve added Benchmarks: We’re aligned that we should measure latency and memory vs baseline across scenarios (e.g. multiple GroupsAccumulator / execution-layer improvements: That work is complementary: better distinct accumulators help how each aggregate runs; this rule changes logical plan shape when several large distincts share one aggregate. Both can coexist on the roadmap. Tests: We added optimizer + SQL integration coverage (including cases with
|
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics. ✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
…gate ✅ Omit base Aggregate when GROUP BY is empty and only COUNT(DISTINCT) branches exist (matches clickbench extended global queries). ✅ First distinct branch seeds the plan; subsequent branches join (empty keys → Cross Join in plan). ✅ Add rewrites_global_three_count_distinct unit test. ❌ Previous shape could error: Aggregate with no grouping and no aggregate expressions.
… tests ✅ Fix projection after join so output columns match the original aggregate list when COUNT(DISTINCT …) and non-distinct aggs are interleaved (schema-compatible with mixed BI-style queries). ✅ Add internal_err guard for inconsistent aggregate index mapping. ✅ Optimizer tests: three grouped COUNT(DISTINCT), non-distinct between distincts, CAST(distinct) args, no rewrite for GROUPING SETS. ✅ SQL integration: COUNT(*) + two COUNT(DISTINCT); two GROUP BY keys with expected results. ❌ Grouping-set / filtered-distinct cases remain explicitly out of scope for this rule (covered by unchanged-plan tests where applicable). Made-with: Cursor
… rewrite ✅ End-to-end: COUNT(DISTINCT lower(b)) with 'Abc'/'aBC' plus second distinct on c (case collapse = 1). ✅ End-to-end: COUNT(DISTINCT CAST(x AS INT)) with 1.2/1.3 vs second CAST distinct on y (int collision = 1). ✅ docs: REPLY_PR_20940_DANDANDAN.md — integration table rows + note on safe distinct args (lower/upper/CAST). ❌ No optimizer or engine behavior change; asserts semantics match non-rewritten aggregation. Made-with: Cursor
…t off) ✅ Add datafusion.optimizer.enable_multi_distinct_count_rewrite (default false). ✅ MultiDistinctCountRewrite no-ops when disabled; OptimizerContext::with_enable_multi_distinct_count_rewrite for tests. ✅ SQL integration tests enable the flag via session helper; unit test skips_rewrite_when_config_disabled. ✅ Document option in user-guide configs.md. ❌ Does not change rewrite semantics when enabled. Made-with: Cursor
a25e831 to
c64db4b
Compare
|
Yes, I can see it would improve a lot of queries
Sounds good - agreed. |
|
Following up with additional real-data benchmark results for Dataset generation / setupI used local TPCH SF1 parquet generated with: cargo install tpchgen-cli
tpchgen-cli --scale-factor 1 --format parquet --parquet-compression 'ZSTD(1)' --parts 1 --output-dir benchmarks/data/tpch_sf1Data path used in all queries: For timing/memory, each run used: /usr/bin/time -l datafusion-cli -q --format csv -c "SET datafusion.optimizer.enable_multi_distinct_count_rewrite = <true|false>;" -c "<QUERY>"(Reporting wall time + max RSS from Queries testedScenario 1: 3 distincts (grouped)SELECT SUM(d1) AS s1, SUM(d2) AS s2, SUM(d3) AS s3
FROM (
SELECT l_suppkey,
COUNT(DISTINCT l_orderkey) AS d1,
COUNT(DISTINCT l_partkey) AS d2,
COUNT(DISTINCT l_linenumber) AS d3
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey
);Scenario 2: CAST-based distinctsSELECT SUM(cx) AS sx, SUM(cy) AS sy
FROM (
SELECT l_suppkey,
COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
COUNT(DISTINCT CAST(l_discount * 100 AS INT)) AS cy
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey
);Scenario 3: lower() + string distinctSELECT SUM(c1) AS s1, SUM(c2) AS s2
FROM (
SELECT l_returnflag,
COUNT(DISTINCT lower(l_shipmode)) AS c1,
COUNT(DISTINCT l_shipinstruct) AS c2
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_returnflag
);Results (latency + max RSS)
All paired runs returned identical query results. TakeawayThe rewrite can improve latency in some multi-distinct grouped cases, but can also increase peak memory and/or be neutral/slightly slower depending on cardinality and expression shape. Keeping this rule opt-in (default off) remains appropriate while we expand benchmark coverage.
|
|
Thank you for the detailed test! But I have a few questions.
|
|
Hi! @xiedeyantu, great feedback. Let me explain my thought process for some of the design choices regarding the tests. See inline
The outer
Agreed for a clean test: both distincts should use
The We'll follow up with updated query text and numbers when we rerun. |
|
Hi @xiedeyantu update on the benchmarking. The new benchmarks use the grouped aggregate only (no outer Queries tested - original runsScenario 1: three distincts (grouped)SELECT l_suppkey,
COUNT(DISTINCT l_orderkey) AS d1,
COUNT(DISTINCT l_partkey) AS d2,
COUNT(DISTINCT l_linenumber) AS d3
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;Rerun: reviewer-aligned changesCAST (Scenario 2): both distincts use SELECT l_suppkey,
COUNT(DISTINCT CAST(l_extendedprice AS INT)) AS cx,
COUNT(DISTINCT CAST(l_discount AS INT)) AS cy
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;High cardinality (Scenario 3): same aggregates as before, but SELECT l_suppkey,
COUNT(DISTINCT lower(l_shipmode)) AS c1,
COUNT(DISTINCT l_shipinstruct) AS c2
FROM 'benchmarks/data/tpch_sf1/lineitem/*.parquet'
GROUP BY l_suppkey;Results (mean ± sample stddev, n=10 per cell)Measured with wall (s) mean ± stddev
max RSS (MB) mean ± stddev
Row outputs matched OFF vs ON for every scenario (same grouped aggregate result). Wall time from |
Thank you for the detailed testing! The test results look excellent to me. I have no further suggestions at this time. |
|
I have one minor question. In the scenario of rewriting |
| | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | ||
| | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | | ||
| | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | ||
| | datafusion.optimizer.enable_multi_distinct_count_rewrite | false | When set to true, the optimizer may rewrite a single aggregate with multiple `COUNT(DISTINCT …)` (with `GROUP BY`) into joins of per-distinct sub-aggregates. This can reduce peak memory but adds join work; default off until benchmarks support enabling broadly. | |
There was a problem hiding this comment.
"This can reduce peak memory"
Could this description be further optimized? Aggregation parallelism does not always reduce peak memory usage.
There was a problem hiding this comment.
I have made small changes to this in my latest commit, Hope this helps.
|
You’re right that injectivity isn’t the right lens here. COUNT(DISTINCT f(a)) is defined over the values of f(a), so the semantics-preserving rewrite is to compute distinctness on the same expression, i.e. GROUP BY g, f(a) (not GROUP BY g, a). For example, lower('Abc') and lower('aBC') must collapse to the same distinct bucket, and CAST(1.2 AS INT) / CAST(1.3 AS INT) can intentionally collide as well. The current lower/upper/cast allowlist is a conservative scope guard, not a correctness requirement: it keeps the initial (default-off) rule limited to a small set of deterministic, well-covered expression shapes while we broaden test/benchmark coverage and validate edge cases across types / functions.
|
✅ Document `datafusion.optimizer.enable_multi_distinct_count_rewrite` as potentially reducing *aggregate-state* memory in some cases ❌ Avoid implying peak memory reduction is guaranteed (extra joins can add overhead) Made-with: Cursor
|
Perhaps the next PR could consider a simplification approach for injective functions, which might yield better performance gains. If we keep the original design, I personally don’t see the need to impose restrictions. We cannot guarantee that a rule is beneficial in all cases—cost-based selection should be integrated. Of course, this is just my personal suggestion. |
xiedeyantu
left a comment
There was a problem hiding this comment.
I've only left a few suggestions for your reference. Otherwise, I have no further input.
Add MultiDistinctCountRewrite in datafusion-optimizer and register it in Optimizer::new() after SingleDistinctToGroupBy. Rewrites 2+ simple COUNT(DISTINCT) on different args into a join of two-phase aggregates; filter distinct_arg IS NOT NULL on each branch for correct NULL semantics.
✅ Unit tests in datafusion-optimizer; ✅ SQL integration test (NULLs) in core_integration.
Which issue does this PR close?
Rationale for this change
Queries with multiple COUNT(DISTINCT col_i) in the same GROUP BY can force independent distinct state per aggregate (e.g. separate hash sets), which scales poorly in memory when several high-cardinality distinct columns appear together.
DataFusion already optimizes the single shared distinct field case via SingleDistinctToGroupBy. This PR adds a conservative logical rewrite for multiple distinct COUNT(DISTINCT …) arguments by splitting work into per-distinct branches joined on the group keys, which reduces peak memory for eligible plans.
COUNT(DISTINCT x) must ignore NULL x; the rewrite applies x IS NOT NULL on each distinct branch before inner grouping so semantics stay aligned with count_distinct behavior.
What changes are included in this PR?
New module:
Tests:
Are these changes tested?
Yes.
Are there any user-facing changes?
Behavior / plans: For queries that match the rule, logical plans (and thus EXPLAIN output) can differ: eligible multi–COUNT(DISTINCT) aggregates may appear as joins of sub-aggregates instead of a single Aggregate with multiple distinct counts.
Results: Intended to be semantics-preserving for supported patterns (including NULL handling via filters).
Public API: No intentional breaking changes to public Rust APIs; this is an internal optimizer rule enabled by default.
Docs: No user guide update required unless maintainers want an “optimizer behavior” note; can add if requested.