Skip to content

feat(physical-plan): add GroupColumn support for FixedSizeList / List / LargeList / Struct in multi-column GROUP BY#22706

Open
zhuqi-lucas wants to merge 5 commits into
apache:mainfrom
zhuqi-lucas:qizhu/df-nested-group-column
Open

feat(physical-plan): add GroupColumn support for FixedSizeList / List / LargeList / Struct in multi-column GROUP BY#22706
zhuqi-lucas wants to merge 5 commits into
apache:mainfrom
zhuqi-lucas:qizhu/df-nested-group-column

Conversation

@zhuqi-lucas
Copy link
Copy Markdown
Contributor

@zhuqi-lucas zhuqi-lucas commented Jun 2, 2026

Which issue does this PR close?

Closes #22682.

Rationale for this change

Today multi_group_by::supported_schema is all-or-nothing: if any single GROUP BY column is not in the supported_type allow-list, the whole grouping falls back to the byte-encoded GroupValuesRows path, even when every other column would have qualified for the fast column-wise + cross-column short-circuit path. The most common offender in real workloads is a single nested column (e.g. LargeList<Struct<Utf8, LargeUtf8>>) that drags an otherwise primitive-heavy 30+ column GROUP BY onto the slow path.

This PR adds column-native GroupColumn implementations for FixedSizeList<primitive>, Struct<...>, List<T>, and LargeList<T>, so a GROUP BY containing any of these no longer falls back from GroupValuesColumn to GroupValuesRows.

What changes are included in this PR?

  • FixedSizeListGroupValueBuilder<T> for FixedSizeList<primitive>. Uses an inner PrimitiveGroupValueBuilder for the flat child storage.
  • StructGroupValueBuilder for Struct<...> with recursive child support.
  • ListGroupValueBuilder<O: OffsetSizeTrait> for List<T> (O=i32) and LargeList<T> (O=i64) with recursive child support. Null outer rows occupy a zero-length range so the offset buffer stays monotonic without consuming child slots.
  • A recursive make_group_column(field: &Field) -> Result<Box<dyn GroupColumn>> factory in multi_group_by/mod.rs that replaces the inline match in GroupValuesColumn::intern. Walks nested fields and recursively builds children for Struct / List / LargeList.
  • supported_type recursively checks children for List / LargeList / Struct. FixedSizeList is restricted to primitive children in this PR; composing FSL with nested children can follow in a separate PR.

Are these changes tested?

Yes. 38 new tests, 68/68 group_values tests pass, 142/142 aggregates regression pass.

Per-builder:

  • append / build round trip
  • equal_to for outer nulls, inner nulls, identical, different (and for List: different lengths, empty)
  • sliced input array (offset != 0) for each builder
  • take_n boundary cases: n=0, n=len, prefix containing null rows
  • vectorized_equal_to / vectorized_append match per-row reference
  • size() grows with appends
  • build on empty builder returns empty array

Composition:

  • Struct<Struct<Utf8, Int32>> (two-level nested struct)
  • List<List<Int32>> (two-level nested list)
  • LargeList<Struct<Utf8, Int32>> end-to-end through new_group_values

Invariants:

  • supported_type accepts every primitive + nested combo and rejects Float16, Decimal256, Time64(Second), List<Float16>, Struct{Float16}, FixedSizeList<Utf8>, etc.
  • A supported_type <-> make_group_column consistency fuzz over 21 supported + 9 unsupported types asserts the biconditional. Pins the invariant against future drift between the allow-list and the dispatcher.

Are there any user-facing changes?

No public API changes. The new builders are internal to physical-plan. supported_type becomes more permissive (returns true for the new types), so existing GROUP BYs that were previously falling back to GroupValuesRows will now use GroupValuesColumn automatically; this is a performance improvement, not a behavior change.

Known limitations

  1. FixedSizeList is restricted to primitive child types. Composing FSL with Struct / List children is straightforward via the factory but not in this PR.
  2. vectorized_equal_to / vectorized_append for the new builders fall back to per-row loops. They are correctness-equivalent to the specialized vectorized paths used by the primitive builders, but a follow-up can add type-specialized batched comparators where beneficial.
  3. Map<K, V> not included; ordering semantics need a separate discussion.

Happy to split this into 4-5 smaller PRs (factory refactor, then one PR per type) if maintainers prefer reviewing the staging incrementally. The single commit is structured so the factory and the per-type builders are mutually dependent and easier to read together.

Measured memory savings (regression tests)

Three new unit tests assert GroupValuesColumn::size() < GroupValuesRows::size() after interning the same data and print the ratio. Numbers from the local test run:

Shape GroupValuesColumn GroupValuesRows Savings
List<Int32>, 500 groups 28.3 KB 111.2 KB 3.9x
LargeList<Struct<Utf8, Utf8>>, 200 groups 44.5 KB 152.3 KB 3.4x
4 cheap cols (Int32 + Utf8 + Date32 + Boolean) + 1 LargeList<Struct>, 300 groups 71.2 KB 157.1 KB 2.2x

The third shape mirrors the lock-in pattern described in #22682: a single nested column drags otherwise primitive-heavy keys onto the row-encoded path. The 2.2x composite saving comes from the nested column escaping the Rows fallback alongside its cheap neighbors.

These tests pin the memory claim so future regressions in column-native storage trip a unit-level assertion before reaching users.

Adjacent fix from review

Copilot review caught a pre-existing inconsistency on main: the dispatcher in make_group_column handles Time32(Second / Millisecond) and Time64(Microsecond / Nanosecond), but supported_type did not match Time64(_) at all and matched Time32(_) permissively (including the invalid Microsecond / Nanosecond variants). This meant supported_type and the dispatcher disagreed on Time types: a schema with Time64(Microsecond) was rejected by supported_type even though the dispatcher would have handled it.

Tightened supported_type to the exact set the dispatcher supports and extended the supported_type ↔ make_group_column consistency fuzz to cover all four valid + four invalid Time combinations. The alignment is now pinned by tests.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the GroupValuesColumn (column-wise group key storage) fast path in DataFusion’s physical plan to support nested group-by key types, avoiding fallback to the row-encoded GroupValuesRows path when a GROUP BY includes FixedSizeList, List/LargeList, or Struct columns.

Changes:

  • Added GroupColumn builders for FixedSizeList<primitive>, List<T> / LargeList<T>, and Struct<...> with recursive child support.
  • Refactored GroupValuesColumn::intern to use a recursive make_group_column(Field) factory and updated supported_type to recursively validate nested types.
  • Added extensive unit/integration tests covering nested builders, slicing behavior, take/drain behavior, and dispatcher/allow-list consistency.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs Adds recursive make_group_column factory, extends supported_type for nested types, and adds tests for support/dispatcher behavior.
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/fixed_size_list.rs Introduces FixedSizeListGroupValueBuilder<T> for fixed-size list group keys.
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/list.rs Introduces ListGroupValueBuilder<O> for List / LargeList group keys with offset-based storage.
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/struct_.rs Introduces StructGroupValueBuilder for struct group keys with per-field child builders.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs Outdated
@zhuqi-lucas zhuqi-lucas requested a review from alamb June 2, 2026 06:10
… / LargeList / Struct in multi-column GROUP BY

Addresses apache#22682 by adding column-native GroupColumn implementations for
the nested types, so a multi-column GROUP BY containing any of these no
longer falls back from `GroupValuesColumn` to the byte-encoded
`GroupValuesRows` path.

## Summary

Today `multi_group_by::supported_schema` is all-or-nothing: if any single
GROUP BY column is not in the `supported_type` allow-list, the whole
grouping falls back to `GroupValuesRows`, even when every other column
would have qualified for the fast column-wise + cross-column short-circuit
path. The most common offender in real workloads is a single nested
column (e.g. `LargeList<Struct<Utf8, LargeUtf8>>`) that drags an
otherwise primitive-heavy 30+ column GROUP BY onto the slow path.

This PR adds:

- `FixedSizeListGroupValueBuilder<T>` for `FixedSizeList<primitive>`.
  Uses an inner `PrimitiveGroupValueBuilder` for the flat child storage.

- `StructGroupValueBuilder` for `Struct<...>` whose fields are themselves
  `GroupColumn`-supported. Recursive composition: each child field is
  itself constructed via the factory.

- `ListGroupValueBuilder<O>` (`O: OffsetSizeTrait`) for `List<T>` and
  `LargeList<T>`. Owns an offsets `Vec<O>` and a child `Box<dyn
  GroupColumn>` built via the factory. Null outer rows occupy a
  zero-length range so the offset buffer stays monotonic without
  consuming child slots.

- A recursive `make_group_column(field: &Field) -> Result<Box<dyn
  GroupColumn>>` factory that replaces the giant inline match in
  `GroupValuesColumn::intern`. The factory walks nested fields and
  recursively builds children for `Struct` / `List` / `LargeList` cases.

- `supported_type` recursively checks children for `List` / `LargeList`
  / `Struct`. `FixedSizeList` remains restricted to primitive children
  in this PR; composing FSL with nested children can follow in a
  separate PR.

## Tests (38 new, 68/68 group_values pass)

Per-builder:

- append / build round trip
- `equal_to` for outer nulls, inner nulls, identical, different (and
  for List: different lengths, empty)
- **sliced input array** (offset != 0) — critical real-world case where
  the input array carries a non-zero slice offset
- `take_n` boundary cases: `n=0`, `n=len`, prefix-contains-nulls
- `vectorized_equal_to` / `vectorized_append` match per-row reference
- `size()` grows with appends
- `build` on an empty builder returns an empty array

Composition:

- `Struct<Struct<...>>` (two-level nested struct)
- `List<List<Int32>>` (two-level nested list)
- `LargeList<Struct<Utf8, Int32>>` (mirrors the production-shape nested
  type that motivated this PR)

Dispatcher and invariants:

- Each builder is exercised end-to-end via `new_group_values` to prove
  the dispatcher routes the schema to `GroupValuesColumn` (not
  `GroupValuesRows`) and dedup is correct across batches.
- `supported_type` accepts every primitive + nested combination listed
  above and rejects `Float16`, `Decimal256`, `Time64(Second)`,
  `List<Float16>`, `Struct{Float16}`, `FixedSizeList<Utf8>`, etc.
- A `supported_type` <-> `make_group_column` consistency fuzz over 21
  supported + 9 unsupported types asserts the biconditional. Locks the
  invariant against future drift between the allow-list and the
  dispatcher.

## Known limitations

- `FixedSizeList<T>` is restricted to primitive `T`. Composing FSL with
  `Struct` or `List` children can follow in a separate PR.
- `vectorized_equal_to` / `vectorized_append` for the new builders fall
  back to per-row loops. They are correctness-equivalent to the
  specialized vectorized paths used by the primitive builders, but a
  follow-up can add type-specialized batched comparators where
  beneficial.
- `Map` support is not in this PR; ordering semantics need a separate
  discussion.

## No regression on existing types

The factory refactor preserves the existing specialization order:
primitive / byte / byte-view / boolean / decimal128 builders are still
matched first. Only types that were previously rejected fall through to
the new builders. All existing aggregate tests (115/115) pass unchanged.
…n row-encoded

For three representative shapes, assert that GroupValuesColumn::size()
is strictly smaller than GroupValuesRows::size() after interning the
same data. Pins the memory-savings claim of this PR so a future
regression in column-native storage trips a unit test before reaching
users.

Measured savings (printed via eprintln on each run):

- List<Int32>, 500 groups:                              3.9x smaller
- LargeList<Struct<Utf8, Utf8>>, 200 groups:            3.4x smaller
- 4 cheap cols + 1 LargeList<Struct>, 300 groups:       2.2x smaller

The third shape mirrors the production-shape lock-in described in the
issue: a single nested column drags otherwise primitive-heavy keys onto
the row-encoded path. The 2.2x figure shows the composite saving when
the nested column escapes the Rows fallback alongside its cheap
neighbors.
1. fixed_size_list: use `FixedSizeListArray::value_offset(row)` instead
   of manual `(offset + row) * list_len` arithmetic. Correctness-equivalent
   today (FSL's `offset()` returns 0 and `values()` is already sliced),
   but the new form avoids double-applying any future offset behavior and
   matches the arrow-array convention.

2. list: guard `push_offset` with `checked_add` before the `O::from_usize`
   range check. Defense in depth against usize wrap for List<i32> at
   pathological cumulative lengths.

3. supported_type vs dispatcher alignment for Time32 / Time64. The
   dispatcher only handles Time32(Second/Millisecond) and
   Time64(Microsecond/Nanosecond), but supported_type previously did not
   match Time64 at all and matched Time32(_) (including invalid units).
   Tighten supported_type to the exact set the dispatcher supports, and
   extend the supported_type <-> make_group_column consistency fuzz to
   cover all four valid Time variants and the four invalid ones. Locks
   the alignment going forward.
- mod.rs: remove unnecessary `#[expect(non_snake_case)]` on `mod struct_`
  (snake_case-compliant module name, attribute was unfulfilled)
- mod.rs: `schema.clone()` -> `Arc::clone(&schema)` per project lint
- mod.rs: drop redundant `.to_string()` after `format!`
- mod.rs: inline format args in eprintln!
- fixed_size_list.rs: change `new(data_type: DataType)` ->
  `new(data_type: &DataType)` (clippy::needless_pass_by_value); the
  function only inspects the data_type by reference. Update test
  callers to pass `&data_type(true)`. Update the dispatcher call site
  in mod.rs to drop `.clone()` (it already had a reference).
- list.rs: elide explicit lifetimes on `list_array` helper
  (clippy::needless_lifetimes)
- list.rs: introduce `NotesRow` and `MatrixRow` type aliases in tests
  to satisfy clippy::type_complexity
- struct_.rs: introduce `OuterRow` type alias in nested_struct_of_struct
  test, same reason.
@zhuqi-lucas zhuqi-lucas force-pushed the qizhu/df-nested-group-column branch from 854a86e to af12254 Compare June 2, 2026 06:55
@zhuqi-lucas zhuqi-lucas requested a review from adriangb June 2, 2026 06:59
@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented Jun 2, 2026

Before I review the 2k LOC a couple of questions:

The most common offender in real workloads is a single nested column (e.g. LargeList<Struct<Utf8, LargeUtf8>>) that drags an otherwise primitive-heavy 30+ column GROUP BY onto the slow path.

Is "most common offender" from your production data? Are there any other unsupported cases (e.g. are all dates, etc. supported)? A table / confirmation of what is and what is not supported would be great.

This PR adds column-native GroupColumn implementations for FixedSizeList, Struct<...>, List, and LargeList, so a GROUP BY containing any of these no longer falls back from GroupValuesColumn to GroupValuesRows.

Since you're adding support for multiple things my immediate ask would be: can we split this into 1 PR prepping whatever refactoring is needed + 1 PR per type we are adding support for? You can prototype this by refactoring this PR into stacked commits then we can split it once that looks good.

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

zhuqi-lucas commented Jun 2, 2026

Before I review the 2k LOC a couple of questions:

The most common offender in real workloads is a single nested column (e.g. LargeList<Struct<Utf8, LargeUtf8>>) that drags an otherwise primitive-heavy 30+ column GROUP BY onto the slow path.

Is "most common offender" from your production data? Are there any other unsupported cases (e.g. are all dates, etc. supported)? A table / confirmation of what is and what is not supported would be great.

This PR adds column-native GroupColumn implementations for FixedSizeList, Struct<...>, List, and LargeList, so a GROUP BY containing any of these no longer falls back from GroupValuesColumn to GroupValuesRows.

Since you're adding support for multiple things my immediate ask would be: can we split this into 1 PR prepping whatever refactoring is needed + 1 PR per type we are adding support for? You can prototype this by refactoring this PR into stacked commits then we can split it once that looks good.

Thanks @adriangb taking a look.

Q1: source of "most common offender" + supported-types table

Yes, from production observation. The shape is a wide multi-column GROUP BY (~40 columns) on a SEC filings materialized view, where a single LargeList<Struct<Utf8, LargeUtf8>> column for filing notes was causing the entire grouping to fall back to GroupValuesRows. Without that column the grouping completes at ~15 GiB peak; with it the path switches to row encoding and peak grows past 25 GiB. The PR's memory regression tests synthesize that shape and show 2.2x to 3.9x savings on smaller versions of the same pattern.

Below is the multi-column GROUP BY supported set after this PR. (Single-column GROUP BY has separate specializations in single_group_by/ that this PR does not touch.)

Supported

  • Int8..Int64, UInt8..UInt64, Float32, Float64
  • Decimal128
  • Utf8, LargeUtf8, Utf8View
  • Binary, LargeBinary, BinaryView
  • Boolean
  • Date32, Date64
  • Time32(Second / Millisecond)
  • Time64(Microsecond / Nanosecond). This PR closes a pre-existing inconsistency on main where the dispatcher accepted these but supported_type did not. Copilot review caught it.
  • Timestamp(any unit)
  • NEW: FixedSizeList<primitive> (primitive child only)
  • NEW: List<T> / LargeList<T> (recursive: child must be a supported type)
  • NEW: Struct<...> (recursive: all fields must be supported types)

Still falls back to GroupValuesRows

  • Float16, Decimal256
  • FixedSizeList with non-primitive child
  • Map<K, V> (entry-ordering semantics deserve their own discussion)
  • Union, Dictionary, FixedSizeBinary
  • Interval, Duration

The PR pins the supported_typemake_group_column biconditional as a unit-test fuzz (supported_type_and_make_group_column_stay_in_sync) over 21 supported + 12 unsupported cases, so future drift between the allow-list and the dispatcher trips immediately.

Q2: split into smaller PRs

Happy to. Proposed stacked sequence:

  1. Refactor only. Extract the inline match in GroupValuesColumn::intern into a recursive make_group_column(field: &Field) -> Result<Box<dyn GroupColumn>> factory and make supported_type recursive (the recursion is needed up front so subsequent PRs can plug in nested children cleanly). No new builders. Also includes the Time32 / Time64 alignment fix and the consistency fuzz so the invariant is locked from the start.
  2. FixedSizeList<primitive> support. Smallest new builder, no recursive child handling needed.
  3. Struct<...> support. Recursive child via PR 1's factory.
  4. List<T> and LargeList<T> support. Recursive child via PR 1's factory.
  5. Memory regression tests + composition coverage. LargeList<Struct>, List<List>, Struct<Struct> end-to-end, plus the GroupValuesColumn-vs-GroupValuesRows size assertions that pin the 2.2x to 3.9x savings.

I'll prototype the split as stacked commits on this branch first so you can sanity-check the boundaries before I open the actual sequence of PRs.

@adriangb
Copy link
Copy Markdown
Contributor

adriangb commented Jun 2, 2026

Would you be open to creating an EPIC for the other types as well? Interval, Duration, Float16, Decimal256, Dictionary and FixedSizeBinary in particular seem like pretty common primitives that we should be able to handle.

You bring up a good point with memory regression tests: would it make sense to also merge some dhat tests in one of the setup PRs so we can measure before / after memory usage? E.g. you write a test and set the limit to 8GB, then when you add support for a type lower that to 2GB.

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

Both make sense, agree on both.

EPIC for remaining type coverage

Opened #22715 as the broader tracking EPIC. It includes the existing #22682 (nested types, in flight via this PR sequence) plus the 6 additional primitives you called out (FixedSizeBinary, Float16, Duration, Interval, Decimal256, Dictionary), ordered easiest-first. Dictionary is flagged "needs discussion before implementation" because of the encoded-vs-decoded key trade-off. Each remaining-primitive item blocks on PR 1 of this PR sequence (the factory + recursive supported_type); after that they're independently mergeable.

dhat memory regression tests

Good call. The size()-based assertions I have today report what each builder self-claims, which is honest for column-native (each Vec / null buffer is accounted) but a weak signal for "did this PR actually move real heap."

Plan: in the refactor / setup PR (PR 1 of the split), add a small dhat-based harness gated behind a dhat-heap feature so the default test run does not pay the profiler cost. The harness wraps GroupValuesColumn::intern on a representative dataset in dhat::Profiler::builder().testing().build() and asserts dhat::HeapStats::get().max_bytes < N. Threshold is set generously at first; each subsequent type-support PR lowers it to reflect the actual win from that PR landing. Same harness can be reused by the EPIC items in #22715 as they land.

self.outer_len
}

fn size(&self) -> usize {
Copy link
Copy Markdown
Contributor

@nathanb9 nathanb9 Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_of_val from std::mem does len * size_of::<O>() not giving current vec allocation (capacity * size_of::<O>())
instead could use:

fn size(&self) -> usize {
    self.offsets.allocated_size()
        + self.outer_nulls.allocated_size()
        + self.child.size()
}

helper from datafusion: use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nathanb9 for review, addressed in this PR, but we want to split to more PRs, i will address there too.

…counting

Per nathanb9 on apache#22706: `size_of_val(slice)` returns `len * size_of::<T>()`,
not the underlying Vec's allocation, so size() under-reports the heap
footprint that the memory pool uses for spill decisions. Switch to the
existing `VecAllocExt::allocated_size()` helper used throughout
multi_group_by (primitive.rs, bytes.rs, mod.rs):

- list.rs: offsets: Vec<O> -> offsets.allocated_size()
- struct_.rs: also account for children: Vec<Box<dyn GroupColumn>>'s
  own capacity (previously only child contents were counted)

fixed_size_list.rs has no owned Vec fields beyond outer_nulls (already
correct) so it does not need a change.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add GroupColumn support for List / Struct / Map / FixedSizeList in multi-column GROUP BY

4 participants