diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 4cf5cc366158b..15ae7f991c0ac 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -713,27 +713,16 @@ impl Statistics { }) .collect(); - // Accumulate all statistics in a single pass. - // Uses precision_add for sum (reuses the lhs accumulator for - // direct numeric addition), while preserving the NDV update - // ordering required by estimate_ndv_with_overlap. + // Accumulate mergeable statistics in a single pass. + // `distinct_count` is recomputed afterward from the original + // unsmeared inputs passed to this call so direct multi-input + // merges stay order-stable. for stat in items.iter().skip(1) { - for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { - let item_cs = &stat.column_statistics[col_idx]; - + for (col_stats, item_cs) in column_statistics + .iter_mut() + .zip(stat.column_statistics.iter()) + { col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); - - // NDV must be computed before min/max update (needs pre-merge ranges) - col_stats.distinct_count = match ( - col_stats.distinct_count.get_value(), - item_cs.distinct_count.get_value(), - ) { - (Some(&l), Some(&r)) => Precision::Inexact( - estimate_ndv_with_overlap(col_stats, item_cs, l, r) - .unwrap_or_else(|| usize::max(l, r)), - ), - _ => Precision::Absent, - }; col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); let item_sum_value = item_cs.sum_value.cast_to_sum_type(); @@ -742,6 +731,29 @@ impl Statistics { } } + for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { + let ndv_inputs: Option> = items + .iter() + .map(|stat| { + let input = &stat.column_statistics[col_idx]; + input + .distinct_count + .get_value() + .copied() + .map(|ndv| (input, ndv)) + }) + .collect(); + + col_stats.distinct_count = ndv_inputs + .map(|inputs| { + Precision::Inexact( + estimate_ndv_with_overlap_many(&inputs) + .unwrap_or_else(|| max_input_ndv(&inputs)), + ) + }) + .unwrap_or(Precision::Absent); + } + Ok(Statistics { num_rows, total_byte_size, @@ -840,6 +852,133 @@ pub fn estimate_ndv_with_overlap( Some((intersection + only_left + only_right).round() as usize) } +/// Estimates the combined number of distinct values (NDV) for multiple inputs +/// by partitioning the overall value space into non-overlapping segments. +/// +/// For each open interval between sorted min/max boundaries, this helper: +/// +/// - finds every input range that fully covers the segment, +/// - estimates each input's contribution using uniform density +/// `segment_width * ndv / full_range_width`, +/// - keeps only the maximum contribution for that segment. +/// +/// This is a multi-way analogue of [`estimate_ndv_with_overlap`], but it avoids +/// repeatedly feeding synthesized min/max/NDV values back into later merges. +/// That makes `Statistics::try_merge_iter` stable across permutations of the +/// original inputs passed to a single call. +/// +/// Constant inputs (`min == max`) are handled separately as point values. +/// If a point is already covered by one or more ranged inputs, only the +/// uncovered remainder is added by taking `max(point_ndv - covered_ndv, 0)`. +/// +/// Returns `None` when any input lacks comparable min/max values or when +/// distance is unsupported for the underlying scalar type. +fn estimate_ndv_with_overlap_many( + inputs: &[(&ColumnStatistics, usize)], +) -> Option { + if inputs.is_empty() { + return Some(0); + } + + if inputs.len() == 1 { + return Some(inputs[0].1); + } + + struct RangedInput<'a> { + min: &'a ScalarValue, + max: &'a ScalarValue, + ndv: usize, + range: f64, + } + + let mut ranged_inputs = Vec::new(); + let mut point_inputs = Vec::new(); + let mut boundaries = Vec::new(); + let sort_and_dedup_scalars = |values: &mut Vec, message: &str| { + values.sort_by(|left, right| left.partial_cmp(right).expect(message)); + values.dedup(); + }; + + for (stats, ndv) in inputs { + let min = stats.min_value.get_value()?; + let max = stats.max_value.get_value()?; + let range = max.distance(min)? as f64; + + if range == 0.0 { + point_inputs.push((min, *ndv)); + continue; + } + + boundaries.push(min.clone()); + boundaries.push(max.clone()); + ranged_inputs.push(RangedInput { + min, + max, + ndv: *ndv, + range, + }); + } + + sort_and_dedup_scalars( + &mut boundaries, + "statistics merge requires comparable boundary values", + ); + + let mut estimate = 0.0; + for window in boundaries.windows(2) { + let segment_start = &window[0]; + let segment_end = &window[1]; + let segment_range = segment_end.distance(segment_start)? as f64; + + if segment_range == 0.0 { + continue; + } + + let segment_estimate = ranged_inputs + .iter() + .filter(|input| input.min <= segment_start && input.max >= segment_end) + .map(|input| segment_range * input.ndv as f64 / input.range) + .fold(0.0, f64::max); + estimate += segment_estimate; + } + + let mut point_values: Vec = point_inputs + .iter() + .map(|(value, _)| (*value).clone()) + .collect(); + sort_and_dedup_scalars( + &mut point_values, + "statistics merge requires comparable point values", + ); + + for value in point_values { + let point_ndv = point_inputs + .iter() + .filter(|(point, _)| *point == &value) + .map(|(_, ndv)| *ndv) + .max() + .expect("point inputs were grouped from existing values"); + let covered_ndv = ranged_inputs + .iter() + .filter(|input| input.min <= &value && &value <= input.max) + .map(|input| input.ndv) + .max() + .unwrap_or(0); + + estimate += point_ndv.saturating_sub(covered_ndv) as f64; + } + + Some(estimate.round() as usize) +} + +fn max_input_ndv(inputs: &[(&ColumnStatistics, usize)]) -> usize { + inputs + .iter() + .map(|(_, ndv)| *ndv) + .max() + .expect("statistics merge requires at least one input") +} + /// Creates an estimate of the number of rows in the output using the given /// optional value and exactness flag. fn check_num_rows(value: Option, is_exact: bool) -> Precision { @@ -1044,6 +1183,26 @@ mod tests { use arrow::datatypes::Field; use std::sync::Arc; + fn int32_test_schema() -> Schema { + Schema::new(vec![Field::new("a", DataType::Int32, true)]) + } + + fn int32_ndv_stats( + num_rows: usize, + min: i32, + max: i32, + distinct_count: usize, + ) -> Statistics { + Statistics::default() + .with_num_rows(Precision::Exact(num_rows)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(min)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(max)))) + .with_distinct_count(Precision::Exact(distinct_count)), + ) + } + #[test] fn test_get_value() { let exact_precision = Precision::Exact(42); @@ -1646,24 +1805,10 @@ mod tests { #[test] fn test_try_merge_ndv_identical_ranges() { - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(50)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(30)), - ); + let stats1 = int32_ndv_stats(100, 0, 100, 50); + let stats2 = int32_ndv_stats(100, 0, 100, 30); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); // Full overlap -> max(50, 30) = 50 assert_eq!( @@ -1674,24 +1819,10 @@ mod tests { #[test] fn test_try_merge_ndv_partial_overlap() { - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(80)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) - .with_distinct_count(Precision::Exact(60)), - ); + let stats1 = int32_ndv_stats(100, 0, 100, 80); + let stats2 = int32_ndv_stats(100, 50, 150, 60); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); // overlap=[50,100], range_left=100, range_right=100, overlap_range=50 // overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30 @@ -1763,24 +1894,10 @@ mod tests { #[test] fn test_try_merge_ndv_constant_columns() { // Same constant: [5,5]+[5,5] -> max - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); + let stats1 = int32_ndv_stats(10, 5, 5, 1); + let stats2 = int32_ndv_stats(10, 5, 5, 1); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); assert_eq!( merged.column_statistics[0].distinct_count, @@ -1788,22 +1905,8 @@ mod tests { ); // Different constants: [5,5]+[10,10] -> sum - let stats3 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); - let stats4 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_distinct_count(Precision::Exact(1)), - ); + let stats3 = int32_ndv_stats(10, 5, 5, 1); + let stats4 = int32_ndv_stats(10, 10, 10, 1); let merged = Statistics::try_merge_iter([&stats3, &stats4], &schema).unwrap(); assert_eq!( @@ -1812,6 +1915,32 @@ mod tests { ); } + #[test] + fn test_try_merge_ndv_order_invariant_across_permutations() { + let schema = int32_test_schema(); + let stats_a = int32_ndv_stats(100, 0, 100, 100); + let stats_b = int32_ndv_stats(10, 40, 60, 10); + let stats_c = int32_ndv_stats(100, 50, 150, 100); + + let permutations = [ + [&stats_a, &stats_b, &stats_c], + [&stats_a, &stats_c, &stats_b], + [&stats_b, &stats_a, &stats_c], + [&stats_b, &stats_c, &stats_a], + [&stats_c, &stats_a, &stats_b], + [&stats_c, &stats_b, &stats_a], + ]; + + for (idx, inputs) in permutations.into_iter().enumerate() { + let merged = Statistics::try_merge_iter(inputs, &schema).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(150), + "permutation {idx} should be order invariant", + ); + } + } + #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch