diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/fixed_size_list.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/fixed_size_list.rs new file mode 100644 index 0000000000000..c6807c6fa7e79 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/fixed_size_list.rs @@ -0,0 +1,631 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GroupColumn`] implementation for `FixedSizeList`. +//! +//! Storage: outer null bitmap + a child [`PrimitiveGroupValueBuilder`] that +//! holds every element flat (length = outer_len * list_len). The j-th element +//! of the i-th outer row lives at child index `i * list_len + j`. + +use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; +use crate::aggregates::group_values::multi_group_by::{GroupColumn, nulls_equal_to}; +use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; + +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, BooleanBufferBuilder, FixedSizeListArray, +}; +use arrow::datatypes::{DataType, FieldRef}; +use datafusion_common::Result; +use std::sync::Arc; + +/// A [`GroupColumn`] for `FixedSizeList` where `T` is a primitive type. +pub struct FixedSizeListGroupValueBuilder { + /// Child field, cached for `build` / `take_n`. + field: FieldRef, + /// List length per outer row. + list_len: i32, + /// Outer-level null bitmap. + outer_nulls: MaybeNullBufferBuilder, + /// Number of outer rows accumulated. + outer_len: usize, + /// Flat storage for child elements; always treated as nullable so it can + /// hold child nulls regardless of the outer row's nullability. + child: PrimitiveGroupValueBuilder, +} + +impl FixedSizeListGroupValueBuilder { + pub fn new(data_type: &DataType) -> Self { + let (field, list_len) = match data_type { + DataType::FixedSizeList(f, n) => (Arc::clone(f), *n), + other => unreachable!( + "FixedSizeListGroupValueBuilder built with non-FixedSizeList type {other:?}" + ), + }; + let child = PrimitiveGroupValueBuilder::::new(field.data_type().clone()); + Self { + field, + list_len, + outer_nulls: MaybeNullBufferBuilder::new(), + outer_len: 0, + child, + } + } + + fn list_len_usize(&self) -> usize { + self.list_len as usize + } +} + +impl GroupColumn for FixedSizeListGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let lhs_null = self.outer_nulls.is_null(lhs_row); + let rhs_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(lhs_null, rhs_null) { + return result; + } + + let list_array = array + .as_any() + .downcast_ref::() + .expect("FixedSizeListGroupValueBuilder called with non-FixedSizeList array"); + let rhs_sublist: ArrayRef = list_array.value(rhs_row); + let list_len = self.list_len_usize(); + let lhs_base = lhs_row * list_len; + for j in 0..list_len { + if !self.child.equal_to(lhs_base + j, &rhs_sublist, j) { + return false; + } + } + true + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> { + let list_array = array + .as_any() + .downcast_ref::() + .expect("FixedSizeListGroupValueBuilder called with non-FixedSizeList array"); + self.outer_nulls.append(list_array.is_null(row)); + self.outer_len += 1; + let child_array = list_array.values(); + let list_len = self.list_len_usize(); + // Use the array's own `value_offset` rather than computing `(offset + // + row) * list_len` ourselves. For sliced FixedSizeListArrays the + // `values()` slice is already advanced, so doing the arithmetic + // manually risks double-applying any future offset behavior. + let start = list_array.value_offset(row) as usize; + for j in 0..list_len { + self.child.append_val(child_array, start + j)?; + } + Ok(()) + } + + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder, + ) { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { + continue; + } + if !self.equal_to(lhs_row, array, rhs_row) { + equal_to_results.set_bit(idx, false); + } + } + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> { + for &row in rows { + self.append_val(array, row)?; + } + Ok(()) + } + + fn len(&self) -> usize { + self.outer_len + } + + fn size(&self) -> usize { + self.outer_nulls.allocated_size() + self.child.size() + } + + fn build(self: Box) -> ArrayRef { + let Self { + field, + list_len, + mut outer_nulls, + outer_len: _, + child, + } = *self; + let outer_nulls = outer_nulls_take_build(&mut outer_nulls); + let child_array = Box::new(child).build(); + Arc::new(FixedSizeListArray::new( + field, + list_len, + child_array, + outer_nulls, + )) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + let list_len = self.list_len_usize(); + let first_n_outer_nulls = self.outer_nulls.take_n(n); + // Drain a `PrimitiveGroupValueBuilder` cleanly through its + // `GroupColumn` impl; this also shifts the remaining elements down. + let first_n_child: ArrayRef = + as GroupColumn>::take_n( + &mut self.child, + n * list_len, + ); + self.outer_len -= n; + Arc::new(FixedSizeListArray::new( + Arc::clone(&self.field), + self.list_len, + first_n_child, + first_n_outer_nulls, + )) + } +} + +/// Helper: consume the inner builder once on `build` to release its buffer. +fn outer_nulls_take_build( + nulls: &mut MaybeNullBufferBuilder, +) -> Option { + std::mem::replace(nulls, MaybeNullBufferBuilder::new()).build() +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + ArrayRef, FixedSizeListArray, Int32Array, builder::FixedSizeListBuilder, + builder::Int32Builder, + }; + use arrow::datatypes::Int32Type; + use std::sync::Arc; + + /// Build a FixedSizeList array from a sequence of optional rows. + /// `None` outer rows are null lists. Within a non-null row, inner values come + /// from the given `[Option; 2]`. + fn fsl_array(rows: &[Option<[Option; 2]>]) -> ArrayRef { + let inner = Int32Builder::new(); + let mut builder = FixedSizeListBuilder::new(inner, 2); + for row in rows { + match row { + None => { + // Null outer row: push placeholder child values so the + // child array has the right length, then mark the outer + // row null via `append(false)`. + builder.values().append_value(0); + builder.values().append_value(0); + builder.append(false); + } + Some([a, b]) => { + match a { + Some(v) => builder.values().append_value(*v), + None => builder.values().append_null(), + } + match b { + Some(v) => builder.values().append_value(*v), + None => builder.values().append_null(), + } + builder.append(true); + } + } + } + Arc::new(builder.finish()) + } + + fn data_type(child_nullable: bool) -> DataType { + DataType::FixedSizeList( + Arc::new(arrow::datatypes::Field::new( + "item", + DataType::Int32, + child_nullable, + )), + 2, + ) + } + + #[test] + fn append_and_build_round_trips_values() { + let input = fsl_array(&[ + Some([Some(1), Some(2)]), + None, + Some([Some(3), Some(4)]), + Some([Some(1), Some(2)]), + ]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + assert_eq!(builder.len(), 4); + + let out = Box::new(builder).build(); + let out_fsl = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out_fsl.len(), 4); + assert!(out_fsl.is_null(1)); + assert!(!out_fsl.is_null(0)); + assert!(!out_fsl.is_null(2)); + assert!(!out_fsl.is_null(3)); + + let values = out_fsl + .values() + .as_any() + .downcast_ref::() + .unwrap(); + // Row 0: [1, 2] + assert_eq!(values.value(0), 1); + assert_eq!(values.value(1), 2); + // Row 2: [3, 4] (row 1 is null, but child slot positions are 2..4) + assert_eq!(values.value(4), 3); + assert_eq!(values.value(5), 4); + // Row 3: [1, 2] + assert_eq!(values.value(6), 1); + assert_eq!(values.value(7), 2); + } + + #[test] + fn equal_to_matches_identical_rows_and_rejects_different() { + let stored = fsl_array(&[Some([Some(1), Some(2)]), Some([Some(3), Some(4)])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + builder.append_val(&stored, 0).unwrap(); + builder.append_val(&stored, 1).unwrap(); + + let probe = fsl_array(&[ + Some([Some(1), Some(2)]), // == stored row 0 + Some([Some(3), Some(5)]), // != stored row 1 (last element differs) + Some([Some(3), Some(4)]), // == stored row 1 + ]); + + assert!(builder.equal_to(0, &probe, 0), "[1,2] == [1,2]"); + assert!( + !builder.equal_to(1, &probe, 1), + "[3,4] != [3,5] (one element differs)" + ); + assert!(builder.equal_to(1, &probe, 2), "[3,4] == [3,4]"); + } + + #[test] + fn equal_to_handles_outer_nulls() { + let stored = fsl_array(&[None, Some([Some(1), Some(2)])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + builder.append_val(&stored, 0).unwrap(); + builder.append_val(&stored, 1).unwrap(); + + let probe = + fsl_array(&[None, Some([Some(1), Some(2)]), Some([Some(0), Some(0)])]); + + assert!(builder.equal_to(0, &probe, 0), "null == null"); + assert!(!builder.equal_to(0, &probe, 1), "null != [1,2]"); + assert!(!builder.equal_to(1, &probe, 0), "[1,2] != null"); + assert!(builder.equal_to(1, &probe, 1), "[1,2] == [1,2]"); + assert!(!builder.equal_to(1, &probe, 2), "[1,2] != [0,0]"); + } + + #[test] + fn equal_to_handles_inner_nulls() { + let stored = fsl_array(&[Some([Some(1), None]), Some([None, None])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + builder.append_val(&stored, 0).unwrap(); + builder.append_val(&stored, 1).unwrap(); + + let probe = fsl_array(&[ + Some([Some(1), None]), + Some([Some(1), Some(2)]), + Some([None, None]), + ]); + + assert!(builder.equal_to(0, &probe, 0), "[1, null] == [1, null]"); + assert!( + !builder.equal_to(0, &probe, 1), + "[1, null] != [1, 2] (null vs value)" + ); + assert!( + builder.equal_to(1, &probe, 2), + "[null, null] == [null, null]" + ); + } + + #[test] + fn dispatcher_routes_fixed_size_list_to_group_values_column() { + // End-to-end: feed a schema with a FixedSizeList column + // through `new_group_values` and prove that `GroupValuesColumn` (not + // `GroupValuesRows`) handles the intern. The behavioral signal is that + // dedup works correctly across batches. + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::datatypes::{Field, Schema}; + use datafusion_expr::EmitTo; + + let schema = + Arc::new(Schema::new(vec![Field::new("tags", data_type(true), true)])); + + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + + // Batch 1: three rows, two distinct values. + let batch1: ArrayRef = fsl_array(&[ + Some([Some(1), Some(2)]), + Some([Some(3), Some(4)]), + Some([Some(1), Some(2)]), + ]); + let mut groups = Vec::new(); + gv.intern(&[batch1], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 1, 0], + "first batch: dedup [1,2] across rows" + ); + + // Batch 2: revisit existing groups + introduce a new one. + let batch2: ArrayRef = fsl_array(&[ + Some([Some(3), Some(4)]), // existing group 1 + Some([Some(5), Some(6)]), // new group 2 + Some([Some(1), Some(2)]), // existing group 0 + ]); + let mut groups = Vec::new(); + gv.intern(&[batch2], &mut groups).unwrap(); + assert_eq!( + groups, + vec![1, 2, 0], + "second batch: dedup hits across batches" + ); + + assert_eq!(gv.len(), 3, "exactly three distinct group keys retained"); + + // Emit and verify the materialized group keys. + let arrays = gv.emit(EmitTo::All).unwrap(); + assert_eq!(arrays.len(), 1); + let out = arrays[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(out.len(), 3); + let values = out.values().as_any().downcast_ref::().unwrap(); + assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6]); + } + + #[test] + fn take_n_returns_prefix_and_shifts_remainder() { + let input = fsl_array(&[ + Some([Some(1), Some(2)]), + Some([Some(3), Some(4)]), + Some([Some(5), Some(6)]), + ]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + assert_eq!(builder.len(), 3); + + let first_two = builder.take_n(2); + let first_two_fsl = first_two + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(first_two_fsl.len(), 2); + let v = first_two_fsl + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(v.values(), &[1, 2, 3, 4]); + + // Remaining builder should now contain only [5, 6]. + assert_eq!(builder.len(), 1); + let rest = Box::new(builder).build(); + let rest_fsl = rest.as_any().downcast_ref::().unwrap(); + assert_eq!(rest_fsl.len(), 1); + let v = rest_fsl + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(v.values(), &[5, 6]); + } + + #[test] + fn handles_sliced_input_array() { + // Build a 5-row array, slice off the first 2 rows, and verify + // append_val + equal_to operate on the correct logical positions. + let full = fsl_array(&[ + Some([Some(99), Some(99)]), // sliced off + Some([Some(98), Some(98)]), // sliced off + Some([Some(1), Some(2)]), + None, + Some([Some(3), Some(4)]), + ]); + let sliced = full.slice(2, 3); + + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + for i in 0..sliced.len() { + builder.append_val(&sliced, i).unwrap(); + } + + // Probe with an UNSLICED equivalent. + let probe = fsl_array(&[ + Some([Some(1), Some(2)]), + None, + Some([Some(3), Some(4)]), + Some([Some(99), Some(99)]), + ]); + assert!(builder.equal_to(0, &probe, 0), "sliced row 0 == [1,2]"); + assert!(builder.equal_to(1, &probe, 1), "sliced row 1 == null"); + assert!(builder.equal_to(2, &probe, 2), "sliced row 2 == [3,4]"); + assert!( + !builder.equal_to(0, &probe, 3), + "sliced row 0 ([1,2]) != [99,99]" + ); + + // Probe with the SAME sliced array to make sure equal_to with sliced + // rhs also works. + for i in 0..sliced.len() { + assert!( + builder.equal_to(i, &sliced, i), + "row {i} of sliced array must equal itself" + ); + } + + let out = Box::new(builder).build(); + let out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out.len(), 3); + assert!(out.is_null(1)); + let v = out.values().as_any().downcast_ref::().unwrap(); + // Output stores: [1,2], placeholder for null, [3,4]. Placeholder is + // whatever the source had — for our null builder we pushed actual + // values 0,0 from the test fixture, so the child layout is [1,2,0,0,3,4]. + assert_eq!(v.value(0), 1); + assert_eq!(v.value(1), 2); + assert_eq!(v.value(4), 3); + assert_eq!(v.value(5), 4); + } + + #[test] + fn take_n_zero_and_full() { + let input = fsl_array(&[Some([Some(1), Some(2)]), Some([Some(3), Some(4)])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + + // take_n(0): empty prefix, remainder unchanged. + let none = builder.take_n(0); + let none_fsl = none.as_any().downcast_ref::().unwrap(); + assert_eq!(none_fsl.len(), 0); + assert_eq!(builder.len(), 2); + + // take_n(len): full prefix, empty remainder. + let all = builder.take_n(2); + let all_fsl = all.as_any().downcast_ref::().unwrap(); + assert_eq!(all_fsl.len(), 2); + let v = all_fsl + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(v.values(), &[1, 2, 3, 4]); + assert_eq!(builder.len(), 0, "builder drained after take_n(len)"); + } + + #[test] + fn take_n_with_null_outer_in_prefix() { + let input = + fsl_array(&[Some([Some(1), Some(2)]), None, Some([Some(3), Some(4)])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + + let first_two = builder.take_n(2); + let first_two = first_two + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(first_two.len(), 2); + assert!(!first_two.is_null(0)); + assert!(first_two.is_null(1), "null row preserved in take_n prefix"); + + // Remaining = row 2 only. + let rest = Box::new(builder).build(); + let rest = rest.as_any().downcast_ref::().unwrap(); + assert_eq!(rest.len(), 1); + assert!(!rest.is_null(0)); + let v = rest.values().as_any().downcast_ref::().unwrap(); + assert_eq!(v.values(), &[3, 4]); + } + + #[test] + fn vectorized_methods_match_per_row() { + // Build two builders, one via per-row append/equal, one via vectorized + // append/equal, and prove they produce the same output and decisions. + let input = fsl_array(&[ + Some([Some(1), Some(2)]), + None, + Some([Some(3), Some(4)]), + Some([Some(1), Some(2)]), + ]); + + let mut per_row: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + for i in 0..input.len() { + per_row.append_val(&input, i).unwrap(); + } + + let mut vec_b: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + vec_b.vectorized_append(&input, &[0, 1, 2, 3]).unwrap(); + assert_eq!(vec_b.len(), per_row.len()); + + // Vectorized equal_to should produce the same per-row decisions. + let lhs = vec![0usize, 1, 2, 3]; + let rhs = vec![0usize, 1, 2, 0]; + let mut bb = BooleanBufferBuilder::new(4); + bb.append_n(4, true); + vec_b.vectorized_equal_to(&lhs, &input, &rhs, &mut bb); + for idx in 0..4 { + let expected = per_row.equal_to(lhs[idx], &input, rhs[idx]); + assert_eq!( + bb.get_bit(idx), + expected, + "row {idx}: vectorized={} per-row={expected}", + bb.get_bit(idx), + ); + } + + // A pre-set false entry must NOT be flipped back to true. + let mut bb = BooleanBufferBuilder::new(4); + bb.append_n(4, true); + bb.set_bit(0, false); + vec_b.vectorized_equal_to(&lhs, &input, &rhs, &mut bb); + assert!(!bb.get_bit(0), "pre-set false must stay false"); + } + + #[test] + fn size_grows_with_appends() { + let input = fsl_array(&[Some([Some(1), Some(2)])]); + let mut builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + let s0 = builder.size(); + for _ in 0..16 { + builder.append_val(&input, 0).unwrap(); + } + let s1 = builder.size(); + assert!(s1 > s0, "size should grow after appends ({s0} -> {s1})"); + } + + #[test] + fn build_empty_builder_returns_empty_array() { + let builder: FixedSizeListGroupValueBuilder = + FixedSizeListGroupValueBuilder::new(&data_type(true)); + let out = Box::new(builder).build(); + let out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out.len(), 0); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/list.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/list.rs new file mode 100644 index 0000000000000..cd71f8f51dd36 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/list.rs @@ -0,0 +1,844 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GroupColumn`] implementation for `List` and `LargeList` group keys. +//! +//! Storage: +//! - `offsets: Vec` of length `outer_len + 1`, with `offsets[0] = 0`. The +//! children of outer row `i` live at child positions +//! `[offsets[i] .. offsets[i+1])`. +//! - `child: Box` for the element type. +//! - `outer_nulls`: outer-row null bitmap. +//! +//! Null outer rows are stored with a zero-length range (i.e. +//! `offsets[i+1] == offsets[i]`). No child slots are pushed for null rows. + +use crate::aggregates::group_values::multi_group_by::{GroupColumn, nulls_equal_to}; +use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; + +use arrow::array::{ + Array, ArrayRef, BooleanBufferBuilder, GenericListArray, OffsetSizeTrait, +}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::FieldRef; +use datafusion_common::{Result, internal_datafusion_err}; +use datafusion_execution::memory_pool::proxy::VecAllocExt; +use std::sync::Arc; + +/// A [`GroupColumn`] for `List` (`O = i32`) and `LargeList` (`O = i64`). +pub struct ListGroupValueBuilder { + field: FieldRef, + offsets: Vec, + child: Box, + outer_nulls: MaybeNullBufferBuilder, + outer_len: usize, +} + +impl ListGroupValueBuilder { + pub fn new(field: FieldRef, child: Box) -> Self { + Self { + field, + offsets: vec![O::usize_as(0)], + child, + outer_nulls: MaybeNullBufferBuilder::new(), + outer_len: 0, + } + } + + fn list_array(array: &ArrayRef) -> &GenericListArray { + array + .as_any() + .downcast_ref::>() + .expect("ListGroupValueBuilder called with non-List/LargeList array") + } + + fn current_end(&self) -> O { + *self.offsets.last().expect("offsets is never empty") + } + + fn push_offset(&mut self, additional: usize) -> Result<()> { + // Guard against usize overflow first (cheap, defense in depth: a + // List with cumulative length close to i32::MAX could wrap + // before the `O::from_usize` range check below would fire). + let next = self + .current_end() + .as_usize() + .checked_add(additional) + .ok_or_else(|| { + internal_datafusion_err!( + "List offset usize overflow: current={} additional={}", + self.current_end().as_usize(), + additional + ) + })?; + let next_o = O::from_usize(next) + .ok_or_else(|| internal_datafusion_err!("List offset overflows {}", next))?; + self.offsets.push(next_o); + Ok(()) + } +} + +impl GroupColumn for ListGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let lhs_null = self.outer_nulls.is_null(lhs_row); + let rhs_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(lhs_null, rhs_null) { + return result; + } + + let l = Self::list_array(array); + // sliced child covering exactly the rhs row's elements + let rhs_sublist: ArrayRef = l.value(rhs_row); + let lhs_start = self.offsets[lhs_row].as_usize(); + let lhs_end = self.offsets[lhs_row + 1].as_usize(); + let lhs_len = lhs_end - lhs_start; + if lhs_len != rhs_sublist.len() { + return false; + } + for j in 0..lhs_len { + if !self.child.equal_to(lhs_start + j, &rhs_sublist, j) { + return false; + } + } + true + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> { + let l = Self::list_array(array); + if l.is_null(row) { + self.outer_nulls.append(true); + // Zero-length range for null outer rows: do not push any child + // elements, and the offset for the next row stays the same. + let end = self.current_end(); + self.offsets.push(end); + } else { + self.outer_nulls.append(false); + let sublist: ArrayRef = l.value(row); + let n = sublist.len(); + for j in 0..n { + self.child.append_val(&sublist, j)?; + } + self.push_offset(n)?; + } + self.outer_len += 1; + Ok(()) + } + + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder, + ) { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { + continue; + } + if !self.equal_to(lhs_row, array, rhs_row) { + equal_to_results.set_bit(idx, false); + } + } + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> { + for &row in rows { + self.append_val(array, row)?; + } + Ok(()) + } + + fn len(&self) -> usize { + self.outer_len + } + + fn size(&self) -> usize { + self.offsets.allocated_size() + + self.outer_nulls.allocated_size() + + self.child.size() + } + + fn build(self: Box) -> ArrayRef { + let Self { + field, + offsets, + child, + mut outer_nulls, + outer_len: _, + } = *self; + let outer_nulls = + std::mem::replace(&mut outer_nulls, MaybeNullBufferBuilder::new()).build(); + let child_array = child.build(); + // SAFETY: offsets are constructed monotonically by `push_offset` / + // initial `[0]`, and child_array length matches the final offset. + let offset_buffer = OffsetBuffer::::new(ScalarBuffer::::from(offsets)); + Arc::new(GenericListArray::::new( + field, + offset_buffer, + child_array, + outer_nulls, + )) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + // Number of child elements consumed by the first n outer rows. + let cut = self.offsets[n].as_usize(); + let cut_offset = self.offsets[n]; + + // First-n offsets: 0, off[1], ..., off[n]. + let first_n_offsets: Vec = self.offsets[..=n].to_vec(); + + // Remaining offsets shifted so that what was offsets[n] becomes 0. + let mut remaining = Vec::with_capacity(self.offsets.len() - n); + for i in n..self.offsets.len() { + remaining.push(self.offsets[i] - cut_offset); + } + self.offsets = remaining; + + let first_n_outer_nulls = self.outer_nulls.take_n(n); + let first_n_child = self.child.take_n(cut); + self.outer_len -= n; + + let offset_buffer = + OffsetBuffer::::new(ScalarBuffer::::from(first_n_offsets)); + Arc::new(GenericListArray::::new( + Arc::clone(&self.field), + offset_buffer, + first_n_child, + first_n_outer_nulls, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; + use arrow::array::{ + Int32Array, LargeListArray, ListArray, builder::Int32Builder, + builder::LargeListBuilder, builder::ListBuilder, + }; + use arrow::datatypes::{DataType, Field, Int32Type}; + + fn child_field() -> FieldRef { + Arc::new(Field::new("item", DataType::Int32, true)) + } + + fn list_array(rows: &[Option>>]) -> ArrayRef { + let mut b = ListBuilder::new(Int32Builder::new()); + for row in rows { + match row { + None => b.append(false), + Some(items) => { + for v in items { + b.values().append_option(*v); + } + b.append(true); + } + } + } + Arc::new(b.finish()) + } + + fn large_list_array(rows: &[Option>>]) -> ArrayRef { + let mut b = LargeListBuilder::new(Int32Builder::new()); + for row in rows { + match row { + None => b.append(false), + Some(items) => { + for v in items { + b.values().append_option(*v); + } + b.append(true); + } + } + } + Arc::new(b.finish()) + } + + fn make_child() -> Box { + Box::new(PrimitiveGroupValueBuilder::::new( + DataType::Int32, + )) + } + + #[test] + fn list_append_equal_build_round_trip() { + let input = list_array(&[ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3)]), + None, + Some(vec![Some(1), Some(2)]), + Some(vec![]), + ]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + assert_eq!(builder.len(), 5); + + // Verify equality matrix. + let probe = list_array(&[ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(1), Some(3)]), + None, + Some(vec![]), + ]); + assert!(builder.equal_to(0, &probe, 0), "[1,2] == [1,2]"); + assert!(!builder.equal_to(0, &probe, 1), "[1,2] != [1,3]"); + assert!(builder.equal_to(2, &probe, 2), "null == null"); + assert!(!builder.equal_to(0, &probe, 2), "[1,2] != null"); + assert!(builder.equal_to(4, &probe, 3), "[] == []"); + assert!(!builder.equal_to(0, &probe, 3), "[1,2] != []"); + + let out = Box::new(builder).build(); + let out_list = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out_list.len(), 5); + assert!(out_list.is_null(2)); + // Verify values for non-null rows. + let v0 = out_list.value(0); + let v0 = v0.as_any().downcast_ref::().unwrap(); + assert_eq!(v0.values(), &[1, 2]); + let v1 = out_list.value(1); + let v1 = v1.as_any().downcast_ref::().unwrap(); + assert_eq!(v1.values(), &[3]); + let v3 = out_list.value(3); + let v3 = v3.as_any().downcast_ref::().unwrap(); + assert_eq!(v3.values(), &[1, 2]); + assert_eq!(out_list.value_length(4), 0); + } + + #[test] + fn large_list_round_trip() { + let input = large_list_array(&[ + Some(vec![Some(7), Some(8), Some(9)]), + None, + Some(vec![Some(10)]), + ]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + let out = Box::new(builder).build(); + let out_list = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out_list.len(), 3); + assert!(out_list.is_null(1)); + let v0 = out_list.value(0); + let v0 = v0.as_any().downcast_ref::().unwrap(); + assert_eq!(v0.values(), &[7, 8, 9]); + let v2 = out_list.value(2); + let v2 = v2.as_any().downcast_ref::().unwrap(); + assert_eq!(v2.values(), &[10]); + } + + #[test] + fn list_take_n_splits_offsets_and_child() { + let input = list_array(&[ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + ]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + let first = builder.take_n(2); + let first_list = first.as_any().downcast_ref::().unwrap(); + assert_eq!(first_list.len(), 2); + let v0 = first_list.value(0); + let v0 = v0.as_any().downcast_ref::().unwrap(); + assert_eq!(v0.values(), &[1, 2]); + let v1 = first_list.value(1); + let v1 = v1.as_any().downcast_ref::().unwrap(); + assert_eq!(v1.values(), &[3]); + + // Remaining = row 2 only. + assert_eq!(builder.len(), 1); + let rest = Box::new(builder).build(); + let rest_list = rest.as_any().downcast_ref::().unwrap(); + assert_eq!(rest_list.len(), 1); + let v = rest_list.value(0); + let v = v.as_any().downcast_ref::().unwrap(); + assert_eq!(v.values(), &[4, 5, 6]); + } + + #[test] + fn large_list_of_struct_round_trip_through_new_group_values() { + // Composition test: LargeList>, the same + // shape as a SEC Form 4 `footnotes` column (`LargeList>`). Proves the recursive factory wires + // List/Struct together correctly and that intern/emit round-trips + // through `GroupValuesColumn` rather than `GroupValuesRows`. + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::array::{ + Int32Array, LargeListArray, StringArray, StructArray, builder::Int32Builder, + builder::LargeListBuilder, builder::StringBuilder, builder::StructBuilder, + }; + use arrow::datatypes::{Fields, Schema}; + use datafusion_expr::EmitTo; + + let struct_fields = Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("n", DataType::Int32, true), + ]); + let element_field = Arc::new(Field::new( + "element", + DataType::Struct(struct_fields.clone()), + true, + )); + let column_field = Field::new( + "notes", + DataType::LargeList(Arc::clone(&element_field)), + true, + ); + let schema = Arc::new(Schema::new(vec![column_field])); + + // Build LargeList> arrays. + // + // NotesRow models one outer LargeList row of Struct: + // None -> null outer list + // Some(vec![]) -> empty list + // Some(vec![(id, n), ...]) -> list with struct entries + type NotesRow<'a> = Option, Option)>>; + let notes_v = |rows: &[NotesRow<'_>]| -> ArrayRef { + let struct_builder = StructBuilder::new( + struct_fields.clone(), + vec![ + Box::new(StringBuilder::new()), + Box::new(Int32Builder::new()), + ], + ); + let mut list_builder = LargeListBuilder::new(struct_builder) + .with_field(Arc::clone(&element_field)); + for row in rows { + match row { + None => list_builder.append(false), + Some(items) => { + for (id, n) in items { + let s = list_builder.values(); + s.field_builder::(0) + .unwrap() + .append_option(id.map(|x| x.to_string())); + s.field_builder::(1) + .unwrap() + .append_option(*n); + s.append(true); + } + list_builder.append(true); + } + } + } + Arc::new(list_builder.finish()) + }; + + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + + // Batch 1: a mix of duplicate / distinct / null lists. + let batch1 = notes_v(&[ + Some(vec![(Some("a"), Some(1))]), // 0 + Some(vec![(Some("a"), Some(1))]), // dup of 0 + Some(vec![(Some("a"), Some(2))]), // distinct (n differs) + None, // null + Some(vec![(Some("a"), Some(1)), (Some("b"), Some(2))]), // distinct (length differs) + None, // dup of null + Some(vec![]), // empty list + ]); + let mut groups = Vec::new(); + gv.intern(&[batch1], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 0, 1, 2, 3, 2, 4], + "composition dedup: same struct list -> same group; null and empty are distinct" + ); + + // Batch 2: cross-batch dedup including a brand-new struct value. + let batch2 = notes_v(&[ + Some(vec![(Some("a"), Some(1))]), // existing 0 + Some(vec![(Some("a"), Some(1)), (Some("b"), Some(2))]), // existing 3 + Some(vec![(Some("z"), Some(99))]), // new 5 + None, // existing 2 + ]); + let mut groups = Vec::new(); + gv.intern(&[batch2], &mut groups).unwrap(); + assert_eq!(groups, vec![0, 3, 5, 2]); + assert_eq!(gv.len(), 6, "five distinct lists + one null group"); + + // Emit and sanity-check the materialized shape. + let out = gv.emit(EmitTo::All).unwrap(); + let ll = out[0].as_any().downcast_ref::().unwrap(); + assert_eq!(ll.len(), 6); + assert!(ll.is_null(2), "row 2 was the null-list group"); + // row 0 == [(a, 1)] + let row0 = ll.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + assert_eq!(row0.len(), 1); + let ids = row0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ns = row0 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), "a"); + assert_eq!(ns.value(0), 1); + // row 3 == [(a, 1), (b, 2)] + let row3 = ll.value(3); + let row3 = row3.as_any().downcast_ref::().unwrap(); + assert_eq!(row3.len(), 2); + // row 5 == [(z, 99)] (the new value from batch 2) + let row5 = ll.value(5); + let row5 = row5.as_any().downcast_ref::().unwrap(); + let ids = row5 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ns = row5 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), "z"); + assert_eq!(ns.value(0), 99); + } + + #[test] + fn list_dispatcher_round_trip_through_new_group_values() { + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::datatypes::Schema; + use datafusion_expr::EmitTo; + + let schema = Arc::new(Schema::new(vec![Field::new( + "tags", + DataType::List(child_field()), + true, + )])); + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + + // Batch 1. + let batch1: ArrayRef = list_array(&[ + Some(vec![Some(1), Some(2)]), + None, + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3)]), + None, + ]); + let mut groups = Vec::new(); + gv.intern(&[batch1], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 1, 0, 2, 1], + "list dedup: [1,2] -> 0, null -> 1, [3] -> 2" + ); + + // Batch 2 hits existing keys + adds one new. + let batch2: ArrayRef = + list_array(&[Some(vec![Some(3)]), Some(vec![Some(4), Some(5)])]); + let mut groups = Vec::new(); + gv.intern(&[batch2], &mut groups).unwrap(); + assert_eq!(groups, vec![2, 3]); + + assert_eq!(gv.len(), 4); + + let out = gv.emit(EmitTo::All).unwrap(); + let out_list = out[0].as_any().downcast_ref::().unwrap(); + assert_eq!(out_list.len(), 4); + assert!(out_list.is_null(1)); + } + + #[test] + fn handles_sliced_input_list_array() { + // Build 5 rows then slice off the first 2. append_val/equal_to must + // operate on logical positions of the slice, not the underlying array. + let full = list_array(&[ + Some(vec![Some(99), Some(99)]), // sliced off + Some(vec![Some(98)]), // sliced off + Some(vec![Some(1), Some(2)]), + None, + Some(vec![Some(3)]), + ]); + let sliced = full.slice(2, 3); + + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..sliced.len() { + builder.append_val(&sliced, i).unwrap(); + } + + let probe = list_array(&[ + Some(vec![Some(1), Some(2)]), + None, + Some(vec![Some(3)]), + Some(vec![Some(99), Some(99)]), + ]); + assert!(builder.equal_to(0, &probe, 0)); + assert!(builder.equal_to(1, &probe, 1)); + assert!(builder.equal_to(2, &probe, 2)); + assert!( + !builder.equal_to(0, &probe, 3), + "sliced[0]=[1,2] != [99,99]" + ); + + // Equal_to against the SAME sliced array. + for i in 0..sliced.len() { + assert!(builder.equal_to(i, &sliced, i)); + } + + let out = Box::new(builder).build(); + let out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out.len(), 3); + assert!(out.is_null(1)); + let v0 = out.value(0); + let v0 = v0.as_any().downcast_ref::().unwrap(); + assert_eq!(v0.values(), &[1, 2]); + let v2 = out.value(2); + let v2 = v2.as_any().downcast_ref::().unwrap(); + assert_eq!(v2.values(), &[3]); + } + + #[test] + fn handles_sliced_input_large_list_array() { + let full = large_list_array(&[ + Some(vec![Some(99)]), + Some(vec![Some(98)]), + Some(vec![Some(7), Some(8)]), + None, + ]); + let sliced = full.slice(2, 2); + + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..sliced.len() { + builder.append_val(&sliced, i).unwrap(); + } + + let probe = + large_list_array(&[Some(vec![Some(7), Some(8)]), None, Some(vec![Some(99)])]); + assert!(builder.equal_to(0, &probe, 0)); + assert!(builder.equal_to(1, &probe, 1)); + assert!(!builder.equal_to(0, &probe, 2), "sliced[0]=[7,8] != [99]"); + } + + #[test] + fn take_n_zero_and_full() { + let input = list_array(&[Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + + let none = builder.take_n(0); + assert_eq!(none.len(), 0); + assert_eq!(builder.len(), 2, "remainder unchanged after take_n(0)"); + + let all = builder.take_n(2); + let all = all.as_any().downcast_ref::().unwrap(); + assert_eq!(all.len(), 2); + assert_eq!(builder.len(), 0, "builder drained after take_n(len)"); + } + + #[test] + fn take_n_with_nulls_and_empty_rows() { + let input = list_array(&[ + Some(vec![Some(1), Some(2)]), + None, + Some(vec![]), + Some(vec![Some(3), Some(4), Some(5)]), + ]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + + // Take the first three (one normal + null + empty). + let three = builder.take_n(3); + let three = three.as_any().downcast_ref::().unwrap(); + assert_eq!(three.len(), 3); + assert!(!three.is_null(0)); + assert!(three.is_null(1), "null carried over"); + assert!(!three.is_null(2)); + assert_eq!(three.value_length(2), 0, "empty list carried over"); + + // Remaining = row 3 only. + let rest = Box::new(builder).build(); + let rest = rest.as_any().downcast_ref::().unwrap(); + assert_eq!(rest.len(), 1); + let v = rest.value(0); + let v = v.as_any().downcast_ref::().unwrap(); + assert_eq!(v.values(), &[3, 4, 5]); + } + + #[test] + fn vectorized_methods_match_per_row() { + let input = list_array(&[ + Some(vec![Some(1), Some(2)]), + None, + Some(vec![Some(3)]), + Some(vec![Some(1), Some(2)]), + ]); + + let mut per_row = ListGroupValueBuilder::::new(child_field(), make_child()); + for i in 0..input.len() { + per_row.append_val(&input, i).unwrap(); + } + + let mut vec_b = ListGroupValueBuilder::::new(child_field(), make_child()); + vec_b.vectorized_append(&input, &[0, 1, 2, 3]).unwrap(); + assert_eq!(vec_b.len(), per_row.len()); + + let lhs = vec![0usize, 1, 2, 3]; + let rhs = vec![0usize, 1, 2, 0]; + let mut bb = BooleanBufferBuilder::new(4); + bb.append_n(4, true); + vec_b.vectorized_equal_to(&lhs, &input, &rhs, &mut bb); + for idx in 0..4 { + assert_eq!( + bb.get_bit(idx), + per_row.equal_to(lhs[idx], &input, rhs[idx]), + "row {idx}" + ); + } + + // Pre-set false bit must not flip back to true. + let mut bb = BooleanBufferBuilder::new(4); + bb.append_n(4, true); + bb.set_bit(0, false); + vec_b.vectorized_equal_to(&lhs, &input, &rhs, &mut bb); + assert!(!bb.get_bit(0), "pre-set false stays false"); + } + + #[test] + fn size_grows_with_appends() { + let input = list_array(&[Some(vec![Some(1), Some(2), Some(3)])]); + let mut builder = ListGroupValueBuilder::::new(child_field(), make_child()); + let s0 = builder.size(); + for _ in 0..16 { + builder.append_val(&input, 0).unwrap(); + } + let s1 = builder.size(); + assert!(s1 > s0, "size should grow ({s0} -> {s1})"); + } + + #[test] + fn build_empty_builder_returns_empty_list() { + let builder = ListGroupValueBuilder::::new(child_field(), make_child()); + let out = Box::new(builder).build(); + let out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out.len(), 0); + } + + #[test] + fn nested_list_of_list_int32() { + // List>: outer list whose elements are themselves lists of + // ints. Exercises a recursive child GroupColumn built via the + // dispatcher. + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::array::{ListArray, builder::Int32Builder, builder::ListBuilder}; + use arrow::datatypes::Schema; + use datafusion_expr::EmitTo; + + let inner_field = Arc::new(Field::new("item", DataType::Int32, true)); + let outer_field = Arc::new(Field::new( + "item", + DataType::List(Arc::clone(&inner_field)), + true, + )); + let schema = Arc::new(Schema::new(vec![Field::new( + "matrix", + DataType::List(Arc::clone(&outer_field)), + true, + )])); + + // MatrixRow models one outer List> row: + // None -> null outer list + // Some(vec![...]) -> list of sublists, each sublist itself + // Option>> + type MatrixRow = Option>>>>; + let mk = |rows: &[MatrixRow]| -> ArrayRef { + let inner = ListBuilder::new(Int32Builder::new()) + .with_field(Arc::clone(&inner_field)); + let mut outer = ListBuilder::new(inner).with_field(Arc::clone(&outer_field)); + for row in rows { + match row { + None => outer.append(false), + Some(sublists) => { + for sub in sublists { + match sub { + None => outer.values().append(false), + Some(items) => { + for v in items { + outer.values().values().append_option(*v); + } + outer.values().append(true); + } + } + } + outer.append(true); + } + } + } + Arc::new(outer.finish()) + }; + + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + + // Three groups: [[1,2],[3]], its duplicate, a distinct value, and a null. + let batch = mk(&[ + Some(vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(4)])]), + None, + Some(vec![Some(vec![Some(1), Some(2)]), None]), + ]); + let mut groups = Vec::new(); + gv.intern(&[batch], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 0, 1, 2, 3], + "List> dedup across rows" + ); + + let out = gv.emit(EmitTo::All).unwrap(); + let ll = out[0].as_any().downcast_ref::().unwrap(); + assert_eq!(ll.len(), 4); + assert!(ll.is_null(2), "row 2 was the null outer group"); + + // Inspect row 0: should be [[1,2],[3]]. + let row0 = ll.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + assert_eq!(row0.len(), 2); + let r0_0 = row0.value(0); + let r0_0 = r0_0.as_any().downcast_ref::().unwrap(); + assert_eq!(r0_0.values(), &[1, 2]); + let r0_1 = row0.value(1); + let r0_1 = r0_1.as_any().downcast_ref::().unwrap(); + assert_eq!(r0_1.values(), &[3]); + + // Row 3 had an inner-null sublist: [[1,2], null]. + let row3 = ll.value(3); + let row3 = row3.as_any().downcast_ref::().unwrap(); + assert_eq!(row3.len(), 2); + assert!(!row3.is_null(0)); + assert!(row3.is_null(1), "inner null sublist preserved"); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index ee2d300d9bff8..6e143d8a8c158 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -20,9 +20,13 @@ mod boolean; mod bytes; pub mod bytes_view; +mod fixed_size_list; +mod list; pub mod primitive; +mod struct_; use std::mem::{self, size_of}; +use std::sync::Arc; use crate::aggregates::group_values::GroupValues; use crate::aggregates::group_values::multi_group_by::{ @@ -32,7 +36,7 @@ use crate::aggregates::group_values::multi_group_by::{ use arrow::array::{Array, ArrayRef, BooleanBufferBuilder}; use arrow::compute::cast; use arrow::datatypes::{ - BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type, + BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Field, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, Schema, SchemaRef, StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, @@ -898,168 +902,171 @@ macro_rules! instantiate_primitive { }; } +/// Recursively build a [`GroupColumn`] for a single schema field. +/// +/// Handles primitive, byte, byte-view, boolean, and the nested types +/// `FixedSizeList`, `List`, `LargeList`, and `Struct<...>` +/// (where every child is itself a supported type). Returns +/// `Err(not_impl_err!(...))` for any unsupported case. +fn make_group_column(field: &Field) -> Result> { + let nullable = field.is_nullable(); + let data_type = field.data_type(); + let mut v: Vec> = Vec::with_capacity(1); + match data_type { + &DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type, data_type), + &DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type, data_type), + &DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type, data_type), + &DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type, data_type), + &DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type, data_type), + &DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type, data_type), + &DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type, data_type), + &DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type, data_type), + &DataType::Float32 => { + instantiate_primitive!(v, nullable, Float32Type, data_type) + } + &DataType::Float64 => { + instantiate_primitive!(v, nullable, Float64Type, data_type) + } + &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type, data_type), + &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type, data_type), + &DataType::Time32(t) => match t { + TimeUnit::Second => { + instantiate_primitive!(v, nullable, Time32SecondType, data_type) + } + TimeUnit::Millisecond => { + instantiate_primitive!(v, nullable, Time32MillisecondType, data_type) + } + _ => return not_impl_err!("{data_type} not supported in GroupValuesColumn"), + }, + &DataType::Time64(t) => match t { + TimeUnit::Microsecond => { + instantiate_primitive!(v, nullable, Time64MicrosecondType, data_type) + } + TimeUnit::Nanosecond => { + instantiate_primitive!(v, nullable, Time64NanosecondType, data_type) + } + _ => return not_impl_err!("{data_type} not supported in GroupValuesColumn"), + }, + &DataType::Timestamp(t, _) => match t { + TimeUnit::Second => { + instantiate_primitive!(v, nullable, TimestampSecondType, data_type) + } + TimeUnit::Millisecond => { + instantiate_primitive!(v, nullable, TimestampMillisecondType, data_type) + } + TimeUnit::Microsecond => { + instantiate_primitive!(v, nullable, TimestampMicrosecondType, data_type) + } + TimeUnit::Nanosecond => { + instantiate_primitive!(v, nullable, TimestampNanosecondType, data_type) + } + }, + &DataType::Decimal128(_, _) => { + instantiate_primitive!(v, nullable, Decimal128Type, data_type) + } + &DataType::Utf8 => { + v.push(Box::new(ByteGroupValueBuilder::::new( + OutputType::Utf8, + ))); + } + &DataType::LargeUtf8 => { + v.push(Box::new(ByteGroupValueBuilder::::new( + OutputType::Utf8, + ))); + } + &DataType::Binary => { + v.push(Box::new(ByteGroupValueBuilder::::new( + OutputType::Binary, + ))); + } + &DataType::LargeBinary => { + v.push(Box::new(ByteGroupValueBuilder::::new( + OutputType::Binary, + ))); + } + &DataType::Utf8View => { + v.push(Box::new(ByteViewGroupValueBuilder::::new())); + } + &DataType::BinaryView => { + v.push(Box::new(ByteViewGroupValueBuilder::::new())); + } + &DataType::Boolean => { + if nullable { + v.push(Box::new(BooleanGroupValueBuilder::::new())); + } else { + v.push(Box::new(BooleanGroupValueBuilder::::new())); + } + } + DataType::FixedSizeList(child_field, _) => { + macro_rules! instantiate_fsl { + ($t:ty) => {{ + let b = fixed_size_list::FixedSizeListGroupValueBuilder::<$t>::new( + data_type, + ); + v.push(Box::new(b) as _); + }}; + } + match child_field.data_type() { + DataType::Int8 => instantiate_fsl!(Int8Type), + DataType::Int16 => instantiate_fsl!(Int16Type), + DataType::Int32 => instantiate_fsl!(Int32Type), + DataType::Int64 => instantiate_fsl!(Int64Type), + DataType::UInt8 => instantiate_fsl!(UInt8Type), + DataType::UInt16 => instantiate_fsl!(UInt16Type), + DataType::UInt32 => instantiate_fsl!(UInt32Type), + DataType::UInt64 => instantiate_fsl!(UInt64Type), + DataType::Float32 => instantiate_fsl!(Float32Type), + DataType::Float64 => instantiate_fsl!(Float64Type), + DataType::Date32 => instantiate_fsl!(Date32Type), + DataType::Date64 => instantiate_fsl!(Date64Type), + other => { + return not_impl_err!( + "FixedSizeList<{other}> not supported in GroupValuesColumn" + ); + } + } + } + DataType::List(child_field) => { + let child = make_group_column(child_field.as_ref())?; + v.push(Box::new(list::ListGroupValueBuilder::::new( + Arc::clone(child_field), + child, + ))); + } + DataType::LargeList(child_field) => { + let child = make_group_column(child_field.as_ref())?; + v.push(Box::new(list::ListGroupValueBuilder::::new( + Arc::clone(child_field), + child, + ))); + } + DataType::Struct(fields) => { + let mut children: Vec> = + Vec::with_capacity(fields.len()); + for f in fields { + children.push(make_group_column(f.as_ref())?); + } + v.push(Box::new(struct_::StructGroupValueBuilder::new( + fields.clone(), + children, + ))); + } + _ => return not_impl_err!("{data_type} not supported in GroupValuesColumn"), + } + debug_assert_eq!( + v.len(), + 1, + "make_group_column must push exactly one builder" + ); + Ok(v.into_iter().next().unwrap()) +} + impl GroupValues for GroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { if self.group_values.is_empty() { - let mut v = Vec::with_capacity(cols.len()); - + let mut v: Vec> = Vec::with_capacity(cols.len()); for f in self.schema.fields().iter() { - let nullable = f.is_nullable(); - let data_type = f.data_type(); - match data_type { - &DataType::Int8 => { - instantiate_primitive!(v, nullable, Int8Type, data_type) - } - &DataType::Int16 => { - instantiate_primitive!(v, nullable, Int16Type, data_type) - } - &DataType::Int32 => { - instantiate_primitive!(v, nullable, Int32Type, data_type) - } - &DataType::Int64 => { - instantiate_primitive!(v, nullable, Int64Type, data_type) - } - &DataType::UInt8 => { - instantiate_primitive!(v, nullable, UInt8Type, data_type) - } - &DataType::UInt16 => { - instantiate_primitive!(v, nullable, UInt16Type, data_type) - } - &DataType::UInt32 => { - instantiate_primitive!(v, nullable, UInt32Type, data_type) - } - &DataType::UInt64 => { - instantiate_primitive!(v, nullable, UInt64Type, data_type) - } - &DataType::Float32 => { - instantiate_primitive!(v, nullable, Float32Type, data_type) - } - &DataType::Float64 => { - instantiate_primitive!(v, nullable, Float64Type, data_type) - } - &DataType::Date32 => { - instantiate_primitive!(v, nullable, Date32Type, data_type) - } - &DataType::Date64 => { - instantiate_primitive!(v, nullable, Date64Type, data_type) - } - &DataType::Time32(t) => match t { - TimeUnit::Second => { - instantiate_primitive!( - v, - nullable, - Time32SecondType, - data_type - ) - } - TimeUnit::Millisecond => { - instantiate_primitive!( - v, - nullable, - Time32MillisecondType, - data_type - ) - } - _ => {} - }, - &DataType::Time64(t) => match t { - TimeUnit::Microsecond => { - instantiate_primitive!( - v, - nullable, - Time64MicrosecondType, - data_type - ) - } - TimeUnit::Nanosecond => { - instantiate_primitive!( - v, - nullable, - Time64NanosecondType, - data_type - ) - } - _ => {} - }, - &DataType::Timestamp(t, _) => match t { - TimeUnit::Second => { - instantiate_primitive!( - v, - nullable, - TimestampSecondType, - data_type - ) - } - TimeUnit::Millisecond => { - instantiate_primitive!( - v, - nullable, - TimestampMillisecondType, - data_type - ) - } - TimeUnit::Microsecond => { - instantiate_primitive!( - v, - nullable, - TimestampMicrosecondType, - data_type - ) - } - TimeUnit::Nanosecond => { - instantiate_primitive!( - v, - nullable, - TimestampNanosecondType, - data_type - ) - } - }, - &DataType::Decimal128(_, _) => { - instantiate_primitive! { - v, - nullable, - Decimal128Type, - data_type - } - } - &DataType::Utf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); - v.push(Box::new(b) as _) - } - &DataType::LargeUtf8 => { - let b = ByteGroupValueBuilder::::new(OutputType::Utf8); - v.push(Box::new(b) as _) - } - &DataType::Binary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); - v.push(Box::new(b) as _) - } - &DataType::LargeBinary => { - let b = ByteGroupValueBuilder::::new(OutputType::Binary); - v.push(Box::new(b) as _) - } - &DataType::Utf8View => { - let b = ByteViewGroupValueBuilder::::new(); - v.push(Box::new(b) as _) - } - &DataType::BinaryView => { - let b = ByteViewGroupValueBuilder::::new(); - v.push(Box::new(b) as _) - } - &DataType::Boolean => { - if nullable { - let b = BooleanGroupValueBuilder::::new(); - v.push(Box::new(b) as _) - } else { - let b = BooleanGroupValueBuilder::::new(); - v.push(Box::new(b) as _) - } - } - dt => { - return not_impl_err!("{dt} not supported in GroupValuesColumn"); - } - } + v.push(make_group_column(f.as_ref())?); } self.group_values = v; } @@ -1221,31 +1228,55 @@ pub fn supported_schema(schema: &Schema) -> bool { /// In order to be supported, there must be a specialized implementation of /// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`] fn supported_type(data_type: &DataType) -> bool { - matches!( - *data_type, - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float32 - | DataType::Float64 - | DataType::Decimal128(_, _) - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Binary - | DataType::LargeBinary - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Timestamp(_, _) - | DataType::Utf8View - | DataType::BinaryView - | DataType::Boolean - ) + match data_type { + DataType::FixedSizeList(child_field, _) => matches!( + child_field.data_type(), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Date32 + | DataType::Date64 + ), + DataType::List(child_field) | DataType::LargeList(child_field) => { + supported_type(child_field.data_type()) + } + DataType::Struct(fields) => fields.iter().all(|f| supported_type(f.data_type())), + _ => matches!( + *data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(TimeUnit::Second) + | DataType::Time32(TimeUnit::Millisecond) + | DataType::Time64(TimeUnit::Microsecond) + | DataType::Time64(TimeUnit::Nanosecond) + | DataType::Timestamp(_, _) + | DataType::Utf8View + | DataType::BinaryView + | DataType::Boolean + ), + } } ///Shows how many `null`s there are in an array @@ -1272,7 +1303,507 @@ mod tests { GroupValues, multi_group_by::GroupValuesColumn, }; - use super::GroupIndexView; + use super::{GroupIndexView, supported_schema, supported_type}; + + #[test] + fn supported_type_accepts_supported_primitives_and_strings() { + assert!(supported_type(&DataType::Int32)); + assert!(supported_type(&DataType::Int64)); + assert!(supported_type(&DataType::Float64)); + assert!(supported_type(&DataType::Boolean)); + assert!(supported_type(&DataType::Decimal128(38, 10))); + assert!(supported_type(&DataType::Utf8)); + assert!(supported_type(&DataType::LargeUtf8)); + assert!(supported_type(&DataType::Utf8View)); + } + + #[test] + fn supported_type_rejects_unsupported_primitives() { + // Float16 and Decimal256 are not in the supported set. + assert!(!supported_type(&DataType::Float16)); + assert!(!supported_type(&DataType::Decimal256(76, 10))); + // Time64(Second) and Time64(Millisecond) are not valid Arrow types + // (Time64 is defined only for Microsecond/Nanosecond), but the + // TimeUnit enum allows constructing them. supported_type rejects + // them so the dispatcher and the allow-list stay in lockstep. + assert!(!supported_type(&DataType::Time64( + arrow::datatypes::TimeUnit::Second + ))); + assert!(!supported_type(&DataType::Time64( + arrow::datatypes::TimeUnit::Millisecond + ))); + // Symmetric for Time32: only Second / Millisecond are valid; the + // higher-precision variants are rejected. + assert!(!supported_type(&DataType::Time32( + arrow::datatypes::TimeUnit::Microsecond + ))); + assert!(!supported_type(&DataType::Time32( + arrow::datatypes::TimeUnit::Nanosecond + ))); + } + + #[test] + fn supported_type_accepts_valid_time_variants() { + // Pin the corollary: only the dispatcher-supported Time variants + // pass supported_type, so make_group_column never gets a request + // for a type it would reject. + assert!(supported_type(&DataType::Time32( + arrow::datatypes::TimeUnit::Second + ))); + assert!(supported_type(&DataType::Time32( + arrow::datatypes::TimeUnit::Millisecond + ))); + assert!(supported_type(&DataType::Time64( + arrow::datatypes::TimeUnit::Microsecond + ))); + assert!(supported_type(&DataType::Time64( + arrow::datatypes::TimeUnit::Nanosecond + ))); + } + + #[test] + fn supported_type_handles_nested_types_recursively() { + // List -> supported (child is supported primitive). + let int_field = Arc::new(Field::new("item", DataType::Int32, true)); + assert!(supported_type(&DataType::List(Arc::clone(&int_field)))); + assert!(supported_type(&DataType::LargeList(Arc::clone(&int_field)))); + + // List -> NOT supported (child rejected). + let f16_field = Arc::new(Field::new("item", DataType::Float16, true)); + assert!(!supported_type(&DataType::List(Arc::clone(&f16_field)))); + assert!(!supported_type(&DataType::LargeList(Arc::clone( + &f16_field + )))); + + // Struct -> supported. + let struct_supported = arrow::datatypes::Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("n", DataType::Int32, true), + ]); + assert!(supported_type(&DataType::Struct(struct_supported))); + + // Struct -> NOT supported (one child rejected). + let struct_unsupported = arrow::datatypes::Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Float16, true), + ]); + assert!(!supported_type(&DataType::Struct(struct_unsupported))); + + // LargeList> -> supported (the atlas footnotes shape). + let inner_fields = arrow::datatypes::Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("desc", DataType::LargeUtf8, true), + ]); + let element = + Arc::new(Field::new("element", DataType::Struct(inner_fields), true)); + assert!(supported_type(&DataType::LargeList(element))); + + // List> -> supported (nested recursion). + let inner_list_field = Arc::new(Field::new( + "item", + DataType::List(Arc::clone(&int_field)), + true, + )); + assert!(supported_type(&DataType::List(inner_list_field))); + } + + #[test] + fn supported_type_for_fixed_size_list_restricts_to_primitive_children() { + // Currently FixedSizeList only supports primitive (numeric / Date) + // children. Anything else falls back to GroupValuesRows. + let int_field = Arc::new(Field::new("item", DataType::Int32, true)); + assert!(supported_type(&DataType::FixedSizeList( + Arc::clone(&int_field), + 4 + ))); + + let utf8_field = Arc::new(Field::new("item", DataType::Utf8, true)); + assert!( + !supported_type(&DataType::FixedSizeList(utf8_field, 4)), + "FixedSizeList is not yet supported" + ); + + let struct_field = Arc::new(Field::new( + "item", + DataType::Struct(arrow::datatypes::Fields::from(vec![Field::new( + "n", + DataType::Int32, + true, + )])), + true, + )); + assert!( + !supported_type(&DataType::FixedSizeList(struct_field, 4)), + "FixedSizeList not yet supported (POC primitive-only)" + ); + } + + #[test] + fn supported_schema_rejects_mix_of_supported_and_unsupported() { + // A multi-column schema where most are supported but one column is + // Float16 -> the whole schema must be rejected. + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float16, true), // unsupported + ]); + assert!(!supported_schema(&schema)); + + // Same schema without Float16 -> accepted. + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Boolean, true), + ]); + assert!(supported_schema(&schema)); + } + + #[test] + fn supported_type_and_make_group_column_stay_in_sync() { + // CRITICAL invariant: if `supported_type(t)` returns true the + // dispatcher must accept that type at intern time, and conversely + // if `supported_type(t)` returns false the planner must NOT route + // it through `GroupValuesColumn`. A divergence here would let the + // planner select GroupValuesColumn for a type whose dispatcher arm + // is missing -> runtime panic / not_impl error in prod. + // + // This test fuzzes a representative cross-section of nested and + // non-nested types and asserts both directions of the biconditional. + use super::make_group_column; + + let utf8 = || Field::new("v", DataType::Utf8, true); + let int32 = || Field::new("v", DataType::Int32, true); + let f16 = || Field::new("v", DataType::Float16, true); + + let supported_cases: Vec = vec![ + DataType::Int8, + DataType::Int64, + DataType::UInt64, + DataType::Float32, + DataType::Float64, + DataType::Decimal128(38, 10), + DataType::Utf8, + DataType::LargeUtf8, + DataType::Utf8View, + DataType::Boolean, + DataType::Date32, + DataType::Time32(arrow::datatypes::TimeUnit::Second), + DataType::Time32(arrow::datatypes::TimeUnit::Millisecond), + DataType::Time64(arrow::datatypes::TimeUnit::Microsecond), + DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond), + DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None), + // Nested + DataType::FixedSizeList(Arc::new(int32()), 4), + DataType::List(Arc::new(int32())), + DataType::LargeList(Arc::new(int32())), + DataType::List(Arc::new(utf8())), + DataType::List(Arc::new(Field::new( + "v", + DataType::List(Arc::new(int32())), + true, + ))), + DataType::Struct(arrow::datatypes::Fields::from(vec![int32(), utf8()])), + DataType::LargeList(Arc::new(Field::new( + "element", + DataType::Struct(arrow::datatypes::Fields::from(vec![utf8(), int32()])), + true, + ))), + ]; + + for dt in &supported_cases { + assert!( + supported_type(dt), + "expected supported_type=true for {dt:?}" + ); + // Building a top-level Field and feeding it through the factory + // must succeed for every supported case. + let field = Field::new("col", dt.clone(), true); + make_group_column(&field).unwrap_or_else(|e| { + panic!( + "supported_type accepted {dt:?} but make_group_column rejected: {e}" + ) + }); + } + + let unsupported_cases: Vec = vec![ + DataType::Float16, + DataType::Decimal256(76, 10), + // Invalid Time-unit combinations: Time32 is only Second/Millisecond, + // Time64 is only Microsecond/Nanosecond. These pin that + // supported_type and the dispatcher reject the same set. + DataType::Time64(arrow::datatypes::TimeUnit::Second), + DataType::Time64(arrow::datatypes::TimeUnit::Millisecond), + DataType::Time32(arrow::datatypes::TimeUnit::Microsecond), + DataType::Time32(arrow::datatypes::TimeUnit::Nanosecond), + // Nested with an unsupported leaf + DataType::List(Arc::new(f16())), + DataType::LargeList(Arc::new(f16())), + DataType::Struct(arrow::datatypes::Fields::from(vec![int32(), f16()])), + DataType::FixedSizeList(Arc::new(utf8()), 4), + DataType::FixedSizeList( + Arc::new(Field::new( + "v", + DataType::Struct(arrow::datatypes::Fields::from(vec![int32()])), + true, + )), + 4, + ), + // Deeply nested unsupported + DataType::List(Arc::new(Field::new( + "v", + DataType::Struct(arrow::datatypes::Fields::from(vec![f16()])), + true, + ))), + ]; + + for dt in &unsupported_cases { + assert!( + !supported_type(dt), + "expected supported_type=false for {dt:?}" + ); + // And the dispatcher must return an error rather than silently + // succeed (otherwise supported_type=false but dispatcher accepts + // -> planner missed an optimization but still correct; the + // worse direction is supported_type=true but dispatcher fails, + // covered by the loop above). + let field = Field::new("col", dt.clone(), true); + assert!( + make_group_column(&field).is_err(), + "supported_type rejected {dt:?} but make_group_column accepted it" + ); + } + } + + #[test] + fn intern_returns_not_impl_for_unsupported_top_level_type() { + // `make_group_column` (via intern) must surface a clean NotImpl error + // when called with an unsupported type. We construct the + // GroupValuesColumn manually with a Float16 column so the dispatcher + // hits the fallback arm. + use crate::aggregates::group_values::multi_group_by::GroupValuesColumn; + + let schema = + Arc::new(Schema::new(vec![Field::new("x", DataType::Float16, true)])); + let mut gv = GroupValuesColumn::::try_new(schema).unwrap(); + let array: ArrayRef = + Arc::new(arrow::array::Float16Array::from(vec![half::f16::from_f32( + 0.0, + )])); + let mut groups = Vec::new(); + let err = gv.intern(&[array], &mut groups).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("not supported in GroupValuesColumn"), + "expected NotImpl error from dispatcher, got: {msg}" + ); + } + + /// Quantitative regression: `GroupValuesColumn` must report **smaller** + /// `size()` than `GroupValuesRows` (the byte-encoded fallback) for the + /// shapes the nested-type specializations were added to optimize. + /// + /// This pins the memory-savings claim of the PR. If a future change + /// regresses the column-native storage so that it grows past the row + /// encoding, this test fails before the regression reaches users. + fn assert_column_smaller_than_rows( + schema: SchemaRef, + cols: &[ArrayRef], + label: &str, + ) { + use super::super::row::GroupValuesRows; + + let mut col_gv = + GroupValuesColumn::::try_new(Arc::clone(&schema)).unwrap(); + let mut groups = Vec::new(); + col_gv.intern(cols, &mut groups).unwrap(); + let col_size = col_gv.size(); + + let mut row_gv = GroupValuesRows::try_new(schema).unwrap(); + let mut groups = Vec::new(); + row_gv.intern(cols, &mut groups).unwrap(); + let row_size = row_gv.size(); + + // Column-native must be strictly smaller. Print the ratio so a + // CI run shows the magnitude of the win. + let ratio = row_size as f64 / col_size as f64; + assert!( + col_size < row_size, + "{label}: GroupValuesColumn must use less memory than GroupValuesRows; \ + col_size={col_size}, row_size={row_size}, ratio={ratio:.2}x", + ); + eprintln!( + "{label}: col_size={col_size} B, row_size={row_size} B, savings={ratio:.1}x" + ); + } + + #[test] + fn column_path_uses_less_memory_than_rows_for_list_int32() { + use arrow::array::{Int32Builder, ListBuilder}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "tags", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + )])); + + // 500 unique list values: each row is [i, i+1, i+2]. Enough rows + // to amortize the per-builder fixed overhead so the comparison + // reflects steady-state storage. + let mut b = ListBuilder::new(Int32Builder::new()); + for i in 0..500i32 { + b.values().append_value(i); + b.values().append_value(i + 1); + b.values().append_value(i + 2); + b.append(true); + } + let cols: Vec = vec![Arc::new(b.finish())]; + assert_column_smaller_than_rows(schema, &cols, "List"); + } + + #[test] + fn column_path_uses_less_memory_than_rows_for_large_list_of_struct() { + // The shape that motivated the PR: LargeList> + // representing a nested attribute carried in the GROUP BY of a + // wide multi-column key. Column-native must beat row-encoded + // since the latter pays per-value null tags and chunked-escape + // bytes for every variable-length value. + use arrow::array::{ + Int32Builder, LargeListBuilder, StringBuilder, StructBuilder, + }; + use arrow::datatypes::Fields; + + let inner_fields = Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("desc", DataType::Utf8, true), + ]); + let element_field = Arc::new(Field::new( + "element", + DataType::Struct(inner_fields.clone()), + true, + )); + let schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new( + "notes", + DataType::LargeList(Arc::clone(&element_field)), + true, + ), + ])); + + // Build 200 unique groups, each with 2 nested struct entries + // averaging ~30 chars of content. + let struct_builder = StructBuilder::new( + inner_fields.clone(), + vec![ + Box::new(StringBuilder::new()), + Box::new(StringBuilder::new()), + ], + ); + let mut list_b = + LargeListBuilder::new(struct_builder).with_field(Arc::clone(&element_field)); + let mut k_b = Int32Builder::new(); + for i in 0..200i32 { + k_b.append_value(i); + let s = list_b.values(); + s.field_builder::(0) + .unwrap() + .append_value(format!("id-{i}-aaaaaaaaaaaaaaaa")); + s.field_builder::(1) + .unwrap() + .append_value(format!("description text for entry {i}........")); + s.append(true); + s.field_builder::(0) + .unwrap() + .append_value(format!("id-{i}-bbbbbbbbbbbbbbbb")); + s.field_builder::(1) + .unwrap() + .append_value(format!("second description for entry {i}.....")); + s.append(true); + list_b.append(true); + } + let cols: Vec = vec![Arc::new(k_b.finish()), Arc::new(list_b.finish())]; + assert_column_smaller_than_rows(schema, &cols, "LargeList>"); + } + + #[test] + fn column_path_uses_less_memory_than_rows_for_wide_group_by_with_one_nested() { + // The actual production shape: many cheap columns + one nested + // column. Without this PR, the single nested column drags every + // cheap column onto the row-encoded path. This test asserts the + // composite saving is larger than either alone. + use arrow::array::{ + BooleanBuilder, Date32Builder, Int32Builder, Int32Builder as I32B, + LargeListBuilder, StringBuilder, StructBuilder, + }; + use arrow::datatypes::Fields; + + let inner_fields = Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("desc", DataType::Utf8, true), + ]); + let element_field = Arc::new(Field::new( + "element", + DataType::Struct(inner_fields.clone()), + true, + )); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Date32, false), + Field::new("d", DataType::Boolean, false), + Field::new( + "footnotes", + DataType::LargeList(Arc::clone(&element_field)), + true, + ), + ])); + + let n: i32 = 300; + let mut a_b = Int32Builder::new(); + let mut b_b = StringBuilder::new(); + let mut c_b = Date32Builder::new(); + let mut d_b = BooleanBuilder::new(); + let inner_struct = StructBuilder::new( + inner_fields.clone(), + vec![ + Box::new(StringBuilder::new()), + Box::new(StringBuilder::new()), + ], + ); + let mut notes_b = + LargeListBuilder::new(inner_struct).with_field(Arc::clone(&element_field)); + for i in 0..n { + a_b.append_value(i); + b_b.append_value(format!("ticker-{i:04}")); + c_b.append_value(20000 + i); + d_b.append_value(i % 2 == 0); + let s = notes_b.values(); + s.field_builder::(0) + .unwrap() + .append_value(format!("note-id-{i}")); + s.field_builder::(1) + .unwrap() + .append_value(format!( + "description for note {i}, somewhat lengthy to mimic SEC footnotes" + )); + s.append(true); + notes_b.append(true); + } + // suppress unused warning if I32B alias not used + let _ = std::marker::PhantomData::; + let cols: Vec = vec![ + Arc::new(a_b.finish()), + Arc::new(b_b.finish()), + Arc::new(c_b.finish()), + Arc::new(d_b.finish()), + Arc::new(notes_b.finish()), + ]; + assert_column_smaller_than_rows( + schema, + &cols, + "wide(Int32+Utf8+Date32+Boolean)+LargeList", + ); + } #[test] fn test_intern_for_vectorized_group_values() { diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/struct_.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/struct_.rs new file mode 100644 index 0000000000000..670342d03e154 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/struct_.rs @@ -0,0 +1,548 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GroupColumn`] implementation for `Struct<...>` group keys. + +use crate::aggregates::group_values::multi_group_by::{GroupColumn, nulls_equal_to}; +use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; + +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, StructArray}; +use arrow::datatypes::Fields; +use datafusion_common::Result; +use datafusion_execution::memory_pool::proxy::VecAllocExt; +use std::sync::Arc; + +/// A [`GroupColumn`] for `Struct<...>` whose children are themselves +/// `GroupColumn`-supported. +/// +/// Each child builder stores values for a single struct field. Per Arrow +/// semantics, child slots exist even when the outer struct row is null, so +/// `append_val` always advances every child once per outer row regardless of +/// the outer null bit. +pub struct StructGroupValueBuilder { + fields: Fields, + children: Vec>, + outer_nulls: MaybeNullBufferBuilder, + outer_len: usize, +} + +impl StructGroupValueBuilder { + pub fn new(fields: Fields, children: Vec>) -> Self { + assert_eq!( + fields.len(), + children.len(), + "StructGroupValueBuilder: field count must match child column count" + ); + Self { + fields, + children, + outer_nulls: MaybeNullBufferBuilder::new(), + outer_len: 0, + } + } +} + +impl GroupColumn for StructGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let lhs_null = self.outer_nulls.is_null(lhs_row); + let rhs_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(lhs_null, rhs_null) { + return result; + } + + let s = array + .as_any() + .downcast_ref::() + .expect("StructGroupValueBuilder called with non-Struct array"); + for (i, child) in self.children.iter().enumerate() { + let child_array = s.column(i); + if !child.equal_to(lhs_row, child_array, rhs_row) { + return false; + } + } + true + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> { + let s = array + .as_any() + .downcast_ref::() + .expect("StructGroupValueBuilder called with non-Struct array"); + self.outer_nulls.append(s.is_null(row)); + for (i, child) in self.children.iter_mut().enumerate() { + let child_array = s.column(i); + child.append_val(child_array, row)?; + } + self.outer_len += 1; + Ok(()) + } + + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder, + ) { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { + continue; + } + if !self.equal_to(lhs_row, array, rhs_row) { + equal_to_results.set_bit(idx, false); + } + } + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> { + for &row in rows { + self.append_val(array, row)?; + } + Ok(()) + } + + fn len(&self) -> usize { + self.outer_len + } + + fn size(&self) -> usize { + self.outer_nulls.allocated_size() + + self.children.allocated_size() + + self.children.iter().map(|c| c.size()).sum::() + } + + fn build(self: Box) -> ArrayRef { + let Self { + fields, + children, + mut outer_nulls, + outer_len: _, + } = *self; + let outer_nulls = + std::mem::replace(&mut outer_nulls, MaybeNullBufferBuilder::new()).build(); + let child_arrays: Vec = + children.into_iter().map(|c| c.build()).collect(); + Arc::new(StructArray::new(fields, child_arrays, outer_nulls)) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + let first_n_outer_nulls = self.outer_nulls.take_n(n); + let first_n_children: Vec = + self.children.iter_mut().map(|c| c.take_n(n)).collect(); + self.outer_len -= n; + Arc::new(StructArray::new( + self.fields.clone(), + first_n_children, + first_n_outer_nulls, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder; + use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; + use arrow::array::{Int32Array, StringArray, StructArray}; + use arrow::datatypes::{DataType, Field, Int32Type}; + use datafusion_physical_expr::binary_map::OutputType; + use std::sync::Arc; + + fn struct_fields() -> Fields { + Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("n", DataType::Int32, true), + ]) + } + + fn make_builder() -> StructGroupValueBuilder { + let children: Vec> = vec![ + Box::new(ByteGroupValueBuilder::::new(OutputType::Utf8)), + Box::new(PrimitiveGroupValueBuilder::::new( + DataType::Int32, + )), + ]; + StructGroupValueBuilder::new(struct_fields(), children) + } + + fn struct_array(rows: &[Option<(Option<&str>, Option)>]) -> ArrayRef { + let ids: Vec> = rows + .iter() + .map(|row| match row { + None => None, + Some((id, _)) => *id, + }) + .collect(); + let ns: Vec> = rows + .iter() + .map(|row| match row { + None => None, + Some((_, n)) => *n, + }) + .collect(); + let nulls: Vec = rows.iter().map(|r| r.is_none()).collect(); + let id_arr: ArrayRef = Arc::new(StringArray::from(ids)); + let n_arr: ArrayRef = Arc::new(Int32Array::from(ns)); + // Null buffer: bit set means valid. + let null_buffer = arrow::buffer::NullBuffer::from( + nulls.iter().map(|n| !*n).collect::>(), + ); + Arc::new(StructArray::new( + struct_fields(), + vec![id_arr, n_arr], + Some(null_buffer), + )) + } + + #[test] + fn append_equal_take_for_struct() { + let input = struct_array(&[ + Some((Some("a"), Some(1))), + Some((Some("b"), Some(2))), + Some((Some("a"), Some(1))), + None, + Some((Some("a"), Some(1))), + ]); + let mut builder = make_builder(); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + assert_eq!(builder.len(), 5); + + // equal_to: row 0 == row 2 == row 4, row 1 distinct, row 3 null + let probe = struct_array(&[ + Some((Some("a"), Some(1))), + Some((Some("b"), Some(2))), + None, + Some((Some("a"), Some(2))), // same id, different n + ]); + assert!(builder.equal_to(0, &probe, 0)); + assert!(builder.equal_to(1, &probe, 1)); + assert!(builder.equal_to(3, &probe, 2), "null == null"); + assert!(!builder.equal_to(0, &probe, 3), "(a,1) != (a,2)"); + assert!(!builder.equal_to(0, &probe, 2), "(a,1) != null"); + + // build + let out = Box::new(builder).build(); + let out_struct = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out_struct.len(), 5); + assert!(out_struct.is_null(3)); + let ids = out_struct + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), "a"); + assert_eq!(ids.value(2), "a"); + let ns = out_struct + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ns.value(0), 1); + assert_eq!(ns.value(2), 1); + } + + #[test] + fn struct_dispatcher_round_trip() { + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::datatypes::Schema; + use datafusion_expr::EmitTo; + + let field = Field::new("row_payload", DataType::Struct(struct_fields()), true); + let schema = Arc::new(Schema::new(vec![field])); + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + + let batch: ArrayRef = struct_array(&[ + Some((Some("a"), Some(1))), + Some((Some("b"), Some(2))), + Some((Some("a"), Some(1))), + None, + None, + ]); + let mut groups = Vec::new(); + gv.intern(&[batch], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 1, 0, 2, 2], + "null struct dedups against null" + ); + + let out = gv.emit(EmitTo::All).unwrap(); + let out_struct = out[0].as_any().downcast_ref::().unwrap(); + assert_eq!(out_struct.len(), 3); + assert!(!out_struct.is_null(0)); + assert!(!out_struct.is_null(1)); + assert!(out_struct.is_null(2)); + } + + #[test] + fn handles_sliced_input_struct_array() { + // Build a 5-row StructArray, slice off the first 2 rows, and verify + // that append_val + equal_to read the correct logical positions. + let full = struct_array(&[ + Some((Some("X"), Some(99))), // sliced off + Some((Some("Y"), Some(98))), // sliced off + Some((Some("a"), Some(1))), + None, + Some((Some("b"), Some(2))), + ]); + let sliced = full.slice(2, 3); + + let mut builder = make_builder(); + for i in 0..sliced.len() { + builder.append_val(&sliced, i).unwrap(); + } + + let probe = struct_array(&[ + Some((Some("a"), Some(1))), + None, + Some((Some("b"), Some(2))), + Some((Some("X"), Some(99))), + ]); + assert!(builder.equal_to(0, &probe, 0), "sliced[0] == (a,1)"); + assert!(builder.equal_to(1, &probe, 1), "sliced[1] == null"); + assert!(builder.equal_to(2, &probe, 2), "sliced[2] == (b,2)"); + assert!( + !builder.equal_to(0, &probe, 3), + "sliced[0] (a,1) != (X,99) (the sliced-off prefix value)" + ); + + // Equal_to against sliced rhs too. + for i in 0..sliced.len() { + assert!(builder.equal_to(i, &sliced, i)); + } + + let out = Box::new(builder).build(); + let out_struct = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out_struct.len(), 3); + assert!(out_struct.is_null(1)); + let ids = out_struct + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ns = out_struct + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), "a"); + assert_eq!(ns.value(0), 1); + assert_eq!(ids.value(2), "b"); + assert_eq!(ns.value(2), 2); + } + + #[test] + fn take_n_zero_full_and_with_nulls() { + let input = + struct_array(&[Some((Some("a"), Some(1))), None, Some((Some("b"), Some(2)))]); + let mut builder = make_builder(); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + + // take_n(0): empty prefix, remainder unchanged. + let none = builder.take_n(0); + assert_eq!(none.len(), 0); + assert_eq!(builder.len(), 3); + + // take_n(2): prefix contains the null row. + let two = builder.take_n(2); + let two = two.as_any().downcast_ref::().unwrap(); + assert_eq!(two.len(), 2); + assert!(!two.is_null(0)); + assert!(two.is_null(1), "null row carried into prefix"); + assert_eq!(builder.len(), 1); + + // Remaining = row 2 (b, 2). + let rest = Box::new(builder).build(); + let rest = rest.as_any().downcast_ref::().unwrap(); + assert_eq!(rest.len(), 1); + let ids = rest + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), "b"); + } + + #[test] + fn take_n_full_drains_builder() { + let input = struct_array(&[Some((Some("a"), Some(1))), None]); + let mut builder = make_builder(); + for i in 0..input.len() { + builder.append_val(&input, i).unwrap(); + } + let all = builder.take_n(2); + assert_eq!(all.len(), 2); + assert_eq!(builder.len(), 0); + } + + #[test] + fn vectorized_methods_match_per_row() { + let input = struct_array(&[ + Some((Some("a"), Some(1))), + None, + Some((Some("b"), Some(2))), + Some((Some("a"), Some(1))), + ]); + + let mut per_row = make_builder(); + for i in 0..input.len() { + per_row.append_val(&input, i).unwrap(); + } + + let mut vec_b = make_builder(); + vec_b.vectorized_append(&input, &[0, 1, 2, 3]).unwrap(); + assert_eq!(vec_b.len(), per_row.len()); + + let lhs = vec![0usize, 1, 2, 3]; + let rhs = vec![0usize, 1, 2, 0]; + let mut bb = BooleanBufferBuilder::new(4); + bb.append_n(4, true); + vec_b.vectorized_equal_to(&lhs, &input, &rhs, &mut bb); + for idx in 0..4 { + assert_eq!( + bb.get_bit(idx), + per_row.equal_to(lhs[idx], &input, rhs[idx]), + "row {idx}" + ); + } + } + + #[test] + fn size_grows_with_appends() { + let input = struct_array(&[Some((Some("hello world"), Some(7)))]); + let mut builder = make_builder(); + let s0 = builder.size(); + for _ in 0..16 { + builder.append_val(&input, 0).unwrap(); + } + let s1 = builder.size(); + assert!(s1 > s0, "size should grow ({s0} -> {s1})"); + } + + #[test] + fn build_empty_builder_returns_empty_struct() { + let builder = make_builder(); + let out = Box::new(builder).build(); + let out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(out.len(), 0); + assert_eq!(out.num_columns(), 2); + } + + #[test] + fn nested_struct_of_struct() { + // Build Struct, Int32> using the recursive factory. + use crate::aggregates::group_values::multi_group_by::supported_schema; + use crate::aggregates::group_values::new_group_values; + use crate::aggregates::order::GroupOrdering; + use arrow::array::StructArray; + use arrow::datatypes::Schema; + use datafusion_expr::EmitTo; + + let inner_fields = Fields::from(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("n", DataType::Int32, true), + ]); + let outer_fields = Fields::from(vec![ + Field::new("inner", DataType::Struct(inner_fields.clone()), true), + Field::new("tag", DataType::Int32, true), + ]); + let schema = Arc::new(Schema::new(vec![Field::new( + "row", + DataType::Struct(outer_fields.clone()), + true, + )])); + assert!( + supported_schema(schema.as_ref()), + "Struct> must be supported_schema=true" + ); + + // Build a Struct, tag: Int32> array with + // four rows: two identical, one distinct, one null outer. + // + // OuterRow models one outer struct row: outer null is `None`, + // otherwise `Some((inner, tag))` where `inner = (id, n)`. + type OuterRow<'a> = Option<((Option<&'a str>, Option), Option)>; + let mk_array = |rows: &[OuterRow<'_>]| -> ArrayRef { + // Inner struct children + let mut inner_ids: Vec> = Vec::with_capacity(rows.len()); + let mut inner_ns: Vec> = Vec::with_capacity(rows.len()); + let mut tags: Vec> = Vec::with_capacity(rows.len()); + let mut outer_validity: Vec = Vec::with_capacity(rows.len()); + let mut inner_validity: Vec = Vec::with_capacity(rows.len()); + for row in rows { + match row { + None => { + inner_ids.push(None); + inner_ns.push(None); + tags.push(None); + outer_validity.push(false); + inner_validity.push(false); + } + Some(((id, n), tag)) => { + inner_ids.push(*id); + inner_ns.push(*n); + tags.push(*tag); + outer_validity.push(true); + inner_validity.push(true); + } + } + } + let inner_id_arr: ArrayRef = Arc::new(StringArray::from(inner_ids)); + let inner_n_arr: ArrayRef = Arc::new(Int32Array::from(inner_ns)); + let inner_null = arrow::buffer::NullBuffer::from(inner_validity); + let inner = Arc::new(StructArray::new( + inner_fields.clone(), + vec![inner_id_arr, inner_n_arr], + Some(inner_null), + )) as ArrayRef; + let tag_arr: ArrayRef = Arc::new(Int32Array::from(tags)); + let outer_null = arrow::buffer::NullBuffer::from(outer_validity); + Arc::new(StructArray::new( + outer_fields.clone(), + vec![inner, tag_arr], + Some(outer_null), + )) + }; + + let mut gv = new_group_values(schema, &GroupOrdering::None).unwrap(); + let batch = mk_array(&[ + Some(((Some("a"), Some(1)), Some(7))), // 0 + Some(((Some("a"), Some(1)), Some(7))), // dup of 0 + Some(((Some("a"), Some(1)), Some(8))), // distinct (outer tag differs) + None, // 2 + Some(((Some("b"), Some(2)), Some(7))), // distinct (inner differs) + ]); + let mut groups = Vec::new(); + gv.intern(&[batch], &mut groups).unwrap(); + assert_eq!( + groups, + vec![0, 0, 1, 2, 3], + "nested struct dedup tracks inner+outer equality" + ); + let out = gv.emit(EmitTo::All).unwrap(); + let s = out[0].as_any().downcast_ref::().unwrap(); + assert_eq!(s.len(), 4); + assert!(s.is_null(2), "row 2 was the null outer group"); + } +}