Skip to content

PoC: Blocked state management for hash aggregation#22712

Open
2010YOUY01 wants to merge 8 commits into
apache:mainfrom
2010YOUY01:split-aggr
Open

PoC: Blocked state management for hash aggregation#22712
2010YOUY01 wants to merge 8 commits into
apache:mainfrom
2010YOUY01:split-aggr

Conversation

@2010YOUY01
Copy link
Copy Markdown
Contributor

@2010YOUY01 2010YOUY01 commented Jun 2, 2026

Which issue does this PR close?

Another attempt for #7065

Rationale for this change

This PR is motivated by two related but distinct concerns:

  1. The current aggregation implementation has become difficult to evolve and review.
  2. The existing state layout leads to higher-than-necessary peak memory usage.

It wants to show how to first refactor the existing code, then apply the optimization. This can make the implementation easier.

Refactoring Strategy

I created an issue to analyze the root cause of the existing code complexity and how to solve it by incrementally splitting the logic:

Original Issue for Blocked State Management

These issues explain the motivation and background well:

I think the main motivation is memory efficiency. Performance (~10% faster for high-cardinality cases in this PoC) is only a nice by-product.

Suppose we have buffered 1GB of state in the partial aggregation stage. If the internal states are stored in a contiguous Vec, they cannot be freed until repartitioning is done — approximately when the final-stage aggregation finishes. That means peak memory usage can become all partial states + all final states; in the worst case, this can reach 2GB.

Ideally, we should be able to stay closer to 1GB by managing memory with fixed-size blocks. Once final aggregation starts consuming partial state, the corresponding partial blocks can be freed incrementally.

Benchmark result

Query(cardinality)      PR       main      Δ
Q1(~100)                0.165s   0.144s   +14.6%
Q2(~100)                0.116s   0.139s   -16.5%
Q3(~9K)                 0.119s   0.139s   -14.4%
Q4(~18M)                0.389s   0.433s   -10.2%
Q5(~100M)               1.247s   0.772s   +61.5%

* MacBook Pro (M4 Pro), 1 warmup round, measured 2nd run

Summary: med/high cardinality is faster; low cardinality can be slower but acceptable?; high cardinality is slower due to a missing fast path, see below.

  • Clickbench has 100M rows
  • For low cardinality, blocked approach might bring some slight execution overhead. Since they're already very efficiently, so I think we can live with that.
  • For hopeless cardinality (Q5), the blocked aggregation POC is missing partial aggregation skip optimiziation datafusion.execution.skip_partial_aggregation_probe_ratio_threshold, once implemented it's also likely to get faster, according to Q4 high cardinality's current number.

Memory usage for Q4

memcurve It's becoming more efficient as expected. Note in the blocked approach it should look like a bell shape, however the memory allocator (like `mimalloc`) has cached memory for reuse, so it looks like rise-then-platue, I suppose memory allocator can give them back to OS very efficiently.
microbench.sql
-- Generated from datafusion/benchmarks
CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/hits_partitioned/';

set datafusion.execution.target_partitions=8;

-- ClickBench hits_partitioned row count: 99,997,497 rows.
--
-- Verify with EXPLAIN VERBOSE: each query should show both
-- stream=RawPartialHashAggregateStream, blocked=true and
-- stream=PartialFinalHashAggregateStream, blocked=true.

-- One group over the full table: cardinality 1 over 99,997,497 rows.
-- Plain no-GROUP-BY avg() does not use the grouped blocked path, so keep a
-- derived Int64 key that is one value for all rows.
SELECT
  g,
  avg(v) AS avg_width
FROM (
  SELECT
    CAST("OS" * 0 AS BIGINT) AS g,
    CAST("ResolutionWidth" AS DOUBLE) AS v
  FROM "hits"
)
GROUP BY g;

-- Low cardinality group key: OS has 91 groups.
-- Cast to BIGINT because the current blocked group-values path is single Int64 key only.
SELECT
  g,
  avg(v) AS avg_width
FROM (
  SELECT
    CAST("OS" AS BIGINT) AS g,
    CAST("ResolutionWidth" AS DOUBLE) AS v
  FROM "hits"
)
GROUP BY g
LIMIT 20;

-- Low/medium cardinality group key: SearchEngineID has 96 groups.
SELECT
  g,
  avg(v) AS avg_width
FROM (
  SELECT
    CAST("SearchEngineID" AS BIGINT) AS g,
    CAST("ResolutionWidth" AS DOUBLE) AS v
  FROM "hits"
)
GROUP BY g
LIMIT 20;

-- Medium cardinality group key: RegionID has 9,040 groups.
SELECT
  g,
  avg(v) AS avg_width
FROM (
  SELECT
    CAST("RegionID" AS BIGINT) AS g,
    CAST("ResolutionWidth" AS DOUBLE) AS v
  FROM "hits"
)
GROUP BY g
LIMIT 20;

-- High cardinality group key: UserID has 17,630,976 groups.
SELECT
  "UserID",
  avg("ResolutionWidth") AS avg_width
FROM "hits"
GROUP BY "UserID"
LIMIT 20;

-- Near-unique group key: WatchID has 99,997,493 groups.
SELECT
  g,
  avg(v) AS avg_width
FROM (
  SELECT
    CAST("WatchID" AS BIGINT) AS g,
    CAST("ResolutionWidth" AS DOUBLE) AS v
  FROM "hits"
)
GROUP BY g
LIMIT 20;

Implementation plan

This PR is just a PoC, it can be split into smaller patches for review.

What changes are included in this PR?

Refresher for related internal data structures

The simplified metal model for hash aggregation is HashTable: group_key -> group_state, in reality group values and group states are all stored as contiguous vector for efficiency.
image

Key Changes

Split out the partial and final aggregation logic

See #22710 for the idea, there are 2 execution paths split to finish the micro bench queries above:

  • RawPartialHashAggregateStream
  • PartialFinalHashAggregateStream

They're only responsible for repartition-based 2 stage hash aggregation.

Support blocked memory management for states

This PoC only target to make the following workload work for blocked memory management

-- primitive key + avg accumulator
select v1%10 as g, avg(v1)
from generate_series(1000000) as t1(v1)
group by g;

So in order to support blocked stage management:

  • impl<T> GroupValues for GroupValuesPrimitiveBlock<T>
  • impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>

The idea is to replace the internal contiguous vector with fixed size blocks (see above figure).
They're implemented with new structs just to make PoC simpler, it's possible to replace the existing implementation with this blocked approach.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions Bot added logical-expr Logical plan and expressions functions Changes to functions implementation ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate labels Jun 2, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 2, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-expr-common v53.1.0 (current)
       Built [  24.389s] (current)
     Parsing datafusion-expr-common v53.1.0 (current)
      Parsed [   0.019s] (current)
    Building datafusion-expr-common v53.1.0 (baseline)
       Built [  18.985s] (baseline)
     Parsing datafusion-expr-common v53.1.0 (baseline)
      Parsed [   0.019s] (baseline)
    Checking datafusion-expr-common v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.214s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure enum_variant_added: enum variant added on exhaustive enum ---

Description:
A publicly-visible enum without #[non_exhaustive] has a new variant.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#enum-variant-new
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/enum_variant_added.ron

Failed in:
  variant EmitTo:Block in /home/runner/work/datafusion/datafusion/datafusion/expr-common/src/groups_accumulator.rs:36

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  44.461s] datafusion-expr-common
    Building datafusion-ffi v53.1.0 (current)
       Built [  58.988s] (current)
     Parsing datafusion-ffi v53.1.0 (current)
      Parsed [   0.060s] (current)
    Building datafusion-ffi v53.1.0 (baseline)
       Built [  59.501s] (baseline)
     Parsing datafusion-ffi v53.1.0 (baseline)
      Parsed [   0.060s] (baseline)
    Checking datafusion-ffi v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.222s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [ 120.129s] datafusion-ffi
    Building datafusion-functions-aggregate v53.1.0 (current)
       Built [  30.605s] (current)
     Parsing datafusion-functions-aggregate v53.1.0 (current)
      Parsed [   0.044s] (current)
    Building datafusion-functions-aggregate v53.1.0 (baseline)
       Built [  29.955s] (baseline)
     Parsing datafusion-functions-aggregate v53.1.0 (baseline)
      Parsed [   0.046s] (baseline)
    Checking datafusion-functions-aggregate v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.208s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  61.820s] datafusion-functions-aggregate
    Building datafusion-functions-aggregate-common v53.1.0 (current)
       Built [  20.334s] (current)
     Parsing datafusion-functions-aggregate-common v53.1.0 (current)
      Parsed [   0.018s] (current)
    Building datafusion-functions-aggregate-common v53.1.0 (baseline)
       Built [  20.456s] (baseline)
     Parsing datafusion-functions-aggregate-common v53.1.0 (baseline)
      Parsed [   0.019s] (baseline)
    Checking datafusion-functions-aggregate-common v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.123s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  41.677s] datafusion-functions-aggregate-common
    Building datafusion-physical-plan v53.1.0 (current)
       Built [  35.393s] (current)
     Parsing datafusion-physical-plan v53.1.0 (current)
      Parsed [   0.128s] (current)
    Building datafusion-physical-plan v53.1.0 (baseline)
       Built [  36.078s] (baseline)
     Parsing datafusion-physical-plan v53.1.0 (baseline)
      Parsed [   0.125s] (baseline)
    Checking datafusion-physical-plan v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.524s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  73.394s] datafusion-physical-plan

@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jun 2, 2026
@2010YOUY01
Copy link
Copy Markdown
Contributor Author

The goal of this PoC is to demonstrate the refactor is necessary, and also do some experiment with blocked aggregation state management.

The next step would be to create a refactor-only PR.

|group_index, value| {
debug_assert!(group_index < len);
let block_idx = group_index / block_size;
let value_idx = group_index % block_size;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to avoid % and / (two integer divisions!) by enforcing power of two.

sums.push(values.value(row));
nulls.append_non_null();
} else {
counts.push(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can use collect rather than push

self.len = 0;

for chunk in values.chunks(self.block_size) {
let mut block =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to do an unnecessary zero allocation

@ariel-miculas
Copy link
Copy Markdown
Contributor

  1. The current aggregation implementation has become difficult to evolve and review.
  2. The existing state layout leads to higher-than-necessary peak memory usage.

I think the overaccounting issue is also worth mentioning, as it leads to performance degradation in downstream operators due to excessive spilling.

self.release_map();

let emit_len = self.len.min(self.block_size);
let block = self.blocks.remove(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will have a negative performance impact similar to the drain + collect identified in #19906

let null_idx = self.take_null_for_emit(n);
let output = self.values_range(0, n);
let remaining = self.values_range(n, self.len - n);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because values_range always allocates, output and remaining allocate together an additional n elements. See #22165 where I reduce the allocation overhead for partial aggregation.

data_type: DataType,
map: HashTable<(usize, u64)>,
null_group: Option<usize>,
blocks: Vec<Box<[T::Native]>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the performance overhead of removing the first elements from a Vec, I would consider other approaches, maybe VecDeque

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants