refactor(physical-plan): extract make_group_column factory + eager init at try_new + tighten Time variants#22751
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors GroupValuesColumn’s per-field builder dispatch into a dedicated make_group_column factory to make upcoming nested-type GroupColumn work easier to extend, tightens Time32/Time64 variant allow-listing to match Arrow-valid combinations, and adds feature-gated heap regression testing via dhat.
Changes:
- Extracted
GroupValuesColumn::intern’s type dispatch intomake_group_column(field: &Field) -> Result<Box<dyn GroupColumn>>. - Aligned
supported_typewith the exactTime32/Time64combinations actually supported, and added tests to pinsupported_type↔ dispatcher consistency. - Added optional
dhat-heapfeature with adhat-based heap regression harness.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| datafusion/physical-plan/src/lib.rs | Adds feature-gated dhat global allocator wiring for heap profiling. |
| datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs | Extracts make_group_column, tightens time-type handling, adds sync tests and dhat heap regression test module. |
| datafusion/physical-plan/Cargo.toml | Introduces dhat-heap feature and optional dhat dependency. |
| Cargo.lock | Locks newly introduced transitive dependencies from dhat. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fb7de55 to
5c9cbc8
Compare
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you for working on this.
Since part of the motivation is to reduce large memory consumption, you might also be interested in:
The TL;DR is that we currently allocate a large contiguous Vec in GroupValues. This keeps memory for groups in the partial aggregation stage resident until the final aggregation finishes.
I think this PR can reduce memory usage by roughly half, and the work mentioned above could save another ~2x in the worst case.
| return not_impl_err!("{dt} not supported in GroupValuesColumn"); | ||
| } | ||
| } | ||
| v.push(make_group_column(f.as_ref())?); |
There was a problem hiding this comment.
I think the right refactor is moving this GroupValues initialization into the constructor?
Lazily initializing a field during the evaluation is a bit unusual.
There was a problem hiding this comment.
Good point @2010YOUY01 , addressed. Moved the per-field builder construction into a private helper build_group_columns and now call it from three places: try_new (initial construction), the EmitTo::All branch of emit (replaces the drained Vec with a fresh one via mem::replace), and clear_shrink (rebuilds after the Vec is cleared). intern no longer has the lazy if-empty check.
One consequence is that an unsupported schema now fails at try_new time instead of at the first intern call. In production this changes nothing because the new_group_values factory in aggregates/group_values/mod.rs only calls GroupValuesColumn::try_new after multi_group_by::supported_schema returns true, and unsupported schemas still fall back to GroupValuesRows. The fail-fast just surfaces a programming error sooner for callers that bypass the factory.
Updated the unit test from intern_returns_not_impl_for_unsupported_top_level_type to try_new_returns_not_impl_for_unsupported_top_level_type. 30 group_values tests + dhat test still pass.
5c9cbc8 to
73cb7e8
Compare
| datafusion-physical-expr-common = { workspace = true } | ||
| datafusion-proto-common = { workspace = true, optional = true } | ||
| datafusion-proto-models = { workspace = true, optional = true } | ||
| # Optional heap profiler used by the `dhat-heap` feature for memory |
There was a problem hiding this comment.
I am super worried about the long term cost of this dependency :
The crates page in https://crates.io/crates/dhat says
Warning: This crate is experimental. It relies on implementation techniques that are hard to keep working for 100% of configurations. It may work fine for you, or it may crash, hang, or otherwise do the wrong thing. Its maintenance is not a high priority of the author. Support requests such as issues and pull requests may receive slow responses, or no response at all. Sorry!
It also seems like the tests that need this are not actually run in CI / automatically
Do we really need to add it in this PR?
Maybe we can keep the profiling tests in their own branch or something and run them manually (or in an outside repo)?
There was a problem hiding this comment.
Good point @alamb, thanks for review, I agree. The dhat dep is shaky to take on upstream when the tests it backs are not in CI. Will drop the dhat-heap feature, the dhat dependency, the global_allocator wiring in lib.rs, and the dhat_tests.
The factory extraction, the Time32 / Time64 supported_type tightening, and the supported_type / make_group_column consistency fuzz test will all stay.
For the subsequent PRs in the #22706 split (FixedSizeList, Struct, List, LargeList, composition tests), I will measure memory savings via GroupColumn::size() head-to-head against GroupValuesRows (the
pattern that #22706 already uses for column_path_uses_less_memory_than_rows_for_* tests). Those run in normal cargo test so they will be exercised by CI.
The dhat-based profiling will live on my local for manual investigation only. And later, we can add those to example or benchmark.
There was a problem hiding this comment.
Updated, removed the dhat related testing in latest PR.
73cb7e8 to
0f39e06
Compare
…Time variants Preparation PR for apache#22682 / apache#22715 (full GroupValuesColumn type coverage). No new builders; this PR only refactors the dispatch so subsequent PRs that add FixedSizeList / Struct / List support can plug into one well-defined factory instead of growing the inline match in GroupValuesColumn::intern, and tightens the Time32 / Time64 variant allow-listing to match the actually-supported Arrow combinations. ## What changed 1. Factory extraction. The inline match in GroupValuesColumn::intern that maps each schema field to a Box<dyn GroupColumn> builder moves into a free function make_group_column(field: &Field). The eager construction now happens at try_new time, so per-field builders are ready before any intern call and any unsupported schema fails fast. emit(EmitTo::All) and clear_shrink both refresh the builder vector via the same helper so the post-condition (one builder per schema field) holds across the aggregator's lifetime. 2. Time32 / Time64 supported_type alignment. Previously supported_type matched Time32(_) (admitting the invalid Microsecond / Nanosecond combinations) and did not match Time64(_) at all. Tighten supported_type to the exact set the dispatcher constructs. The dispatcher's wildcard arms for invalid Time variants now return not_impl_err instead of silently producing an empty builder vector. 3. supported_type ↔ make_group_column consistency fuzz. New unit test iterates a representative set of 20 supported and 6 unsupported DataType values and asserts the biconditional. Pins the alignment so future contributors who add a type to one side without the other trip a unit test immediately.
0f39e06 to
1c0f4d0
Compare
Which issue does this PR close?
PR 1 of 5 from the split agreed on #22706 (comment). Related to #22682 / #22715 (full
GroupValuesColumntype coverage). Lays the dispatcher foundation; closes nothing on its own.Rationale for this change
Preparation for the nested-type
GroupColumnwork. No new builders here. The goal is to refactor the per-field builder dispatch inGroupValuesColumnso subsequent PRs that addFixedSizeList/Struct/List/LargeListsupport can plug into one well-defined factory instead of growing the inline match inGroupValuesColumn::intern. Also includes two adjacent correctness fixes around theTime32/Time64variants that came up in the upstream thread.What changes are included in this PR
Factory extraction. The inline match that maps each schema field to a
Box<dyn GroupColumn>builder moves out ofGroupValuesColumn::interninto a free functionmake_group_column(field: &Field) -> Result<Box<dyn GroupColumn>>. Subsequent nested-type specializations can recursively call this factory for child field construction without enumerating every combination inline.Eager construction at
try_new. Per @2010YOUY01's review, the per-field builder vector is now built in the constructor via a privatebuild_group_columnshelper.emit(EmitTo::All)usesmem::replaceto swap in a fresh vector after draining the old one;clear_shrinkrebuilds the same way. The post-conditionself.group_values.len() == self.schema.fields().len()holds across the aggregator's lifetime, sointernno longer carries a lazy-init branch. Unsupported schemas now fail fast attry_newrather than at the firstinterncall. In production this changes nothing becausenew_group_valuesinaggregates/group_values/mod.rsonly callsGroupValuesColumn::try_newaftermulti_group_by::supported_schemareturns true.Time32/Time64supported_typealignment. Previouslysupported_typematchedTime32(_)(admitting the invalid Microsecond / Nanosecond combinations) and did not matchTime64(_)at all, while the dispatcher acceptedTime32(Second / Millisecond)andTime64(Microsecond / Nanosecond). Tightensupported_typeto the exact set the dispatcher constructs. The dispatcher's wildcard arms for invalidTimevariants now returnnot_impl_errinstead of silently producing an empty builder vector.supported_type↔make_group_columnconsistency fuzz. New unit testsupported_type_and_make_group_column_stay_in_synciterates a representative set of 20 supported and 6 unsupportedDataTypevalues and asserts the biconditional. Pins the alignment so future contributors who add a type to one side without the other trip a unit test immediately.What this PR is NOT doing
GroupColumnbuilders.FixedSizeList,Struct,List,LargeListcome in subsequent PRs of the feat: add GroupColumn support for List / Struct / Map / FixedSizeList in multi-column GROUP BY #22682 sequence.supported_type's allow-list (theTimetightening removes invalid combinations rather than adding new ones).dhat-heapfeature and thedhatdependency were dropped; memory savings in follow-up PRs will be measured viaGroupColumn::size()head-to-head againstGroupValuesRows, matching the existingcolumn_path_uses_less_memory_than_rows_for_*tests pattern in feat(physical-plan): add GroupColumn support for FixedSizeList / List / LargeList / Struct in multi-column GROUP BY #22706.Are these changes tested
cargo test -p datafusion-physical-plan --lib aggregates::group_values: 30 tests pass (27 existing + 3 new: the consistency fuzz, the mixed-schema rejection, and thetry_new-time NotImpl propagation).cargo test -p datafusion-physical-plan --lib aggregates: 105 tests pass (no regression in the broader aggregate suite).cargo clippy -p datafusion-physical-plan --lib --tests -- -D warnings: clean.cargo fmt --check: clean.Are there any user-facing changes
No semantic change for any schema that already used
GroupValuesColumnon main. The factory is the same dispatch logic pulled into a function; theTimechanges only affect schemas that are semantically invalid Arrow types anyway; the eager construction surfacesnot_impl_errfrom a different call site for those defensive paths.