diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 17b7a94f82..8209419594 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -66,9 +66,6 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on ### Array Expressions -- **ArraysOverlap**: Inconsistent behavior when arrays contain NULL values. - [#3645](https://github.com/apache/datafusion-comet/issues/3645), - [#2036](https://github.com/apache/datafusion-comet/issues/2036) - **ArrayUnion**: Sorts input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. [#3644](https://github.com/apache/datafusion-comet/issues/3644) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index daac6e1841..153f89a98c 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -253,7 +253,7 @@ Comet supports using the following aggregate functions within window contexts wi | ArrayRemove | Yes | | | ArrayRepeat | No | | | ArrayUnion | No | Behaves differently than spark. Comet sorts the input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. | -| ArraysOverlap | No | | +| ArraysOverlap | Yes | | | CreateArray | Yes | | | ElementAt | Yes | Input must be an array. Map inputs are not supported. | | Flatten | Yes | | diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs new file mode 100644 index 0000000000..662186e614 --- /dev/null +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible `arrays_overlap` with correct null handling. +//! +//! DataFusion's `array_has_any` uses `RowConverter` for element comparison, which +//! treats NULL == NULL as true (grouping semantics). For outer-level null elements, +//! Spark's `arrays_overlap` uses three-valued logic: NULL elements are skipped but +//! cause the result to be null if no definite overlap is found. For comparing +//! non-null elements (including nested types), Spark uses structural equality via +//! `ordering.equiv` where NULL == NULL is true. +//! +//! This implementation returns: +//! - true if any non-null element appears in both arrays +//! - null if no definite overlap but either array contains null elements +//! - false if no overlap and neither array contains null elements + +use arrow::array::{ + Array, ArrayRef, BooleanArray, FixedSizeListArray, GenericListArray, OffsetSizeTrait, Scalar, + StructArray, +}; +use arrow::compute::kernels::cmp::eq; +use arrow::datatypes::{DataType, FieldRef}; +use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkArraysOverlap { + signature: Signature, +} + +impl Default for SparkArraysOverlap { + fn default() -> Self { + Self::new() + } +} + +impl SparkArraysOverlap { + pub fn new() -> Self { + Self { + signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkArraysOverlap { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_arrays_overlap" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn return_field_from_args( + &self, + _args: datafusion::logical_expr::ReturnFieldArgs, + ) -> Result { + Ok(Arc::new(arrow::datatypes::Field::new( + self.name(), + DataType::Boolean, + true, + ))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [left, right] = take_function_args(self.name(), &args.args)?; + + // Return null if either input is a null scalar + if let ColumnarValue::Scalar(s) = &left { + if s.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))); + } + } + if let ColumnarValue::Scalar(s) = &right { + if s.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))); + } + } + + match (left, right) { + (ColumnarValue::Array(left_arr), ColumnarValue::Array(right_arr)) => { + let result = match (left_arr.data_type(), right_arr.data_type()) { + (DataType::List(_), DataType::List(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (DataType::LargeList(_), DataType::LargeList(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (l, r) => { + return exec_err!( + "spark_arrays_overlap does not support types '{l}' and '{r}'" + ) + } + }; + Ok(ColumnarValue::Array(result)) + } + (left, right) => { + // Handle scalar inputs by converting to arrays + let left_arr = left.to_array(1)?; + let right_arr = right.to_array(1)?; + let result = match (left_arr.data_type(), right_arr.data_type()) { + (DataType::List(_), DataType::List(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (DataType::LargeList(_), DataType::LargeList(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (l, r) => { + return exec_err!( + "spark_arrays_overlap does not support types '{l}' and '{r}'" + ) + } + }; + let scalar = ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + } + } +} + +/// Spark-compatible arrays_overlap with SQL three-valued null logic. +/// +/// For each row, compares elements of two list arrays and returns: +/// - null if either array is null +/// - true if any non-null element appears in both arrays +/// - null if no definite overlap but either array contains null elements +/// - false otherwise +fn arrays_overlap_list( + left: &GenericListArray, + right: &GenericListArray, +) -> Result { + let len = left.len(); + let mut builder = BooleanArray::builder(len); + + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + continue; + } + + let left_values = left.value(i); + let right_values = right.value(i); + + if left_values.is_empty() || right_values.is_empty() { + builder.append_value(false); + continue; + } + + // DataFusion's make_array(NULL) produces a List with NullArray values. + if left_values.data_type() == &DataType::Null || right_values.data_type() == &DataType::Null + { + builder.append_null(); + continue; + } + + let mut found_overlap = false; + let mut has_null = false; + + // Put smaller array on the probe side: fewer find_in_array calls means + // fewer kernel dispatches and allocations in the flat vectorized path. + let (probe, search) = if left_values.len() <= right_values.len() { + (&left_values, &right_values) + } else { + (&right_values, &left_values) + }; + + // Check element type once outside the loop. + let use_vectorized = !needs_recursive_eq(probe.data_type()); + + for pi in 0..probe.len() { + if probe.is_null(pi) { + has_null = true; + continue; + } + let (found, null_eq) = if use_vectorized { + find_in_array_flat(probe, pi, search)? + } else { + find_in_array_nested(probe, pi, search)? + }; + if null_eq { + has_null = true; + } + if found { + found_overlap = true; + break; + } + } + + if found_overlap { + builder.append_value(true); + } else if has_null { + builder.append_null(); + } else { + builder.append_value(false); + } + } + + Ok(Arc::new(builder.finish())) +} + +/// Vectorized search using Arrow's `eq` kernel. One SIMD call per probe element. +fn find_in_array_flat(probe: &ArrayRef, pi: usize, search: &ArrayRef) -> Result<(bool, bool)> { + let scalar = Scalar::new(probe.slice(pi, 1)); + let eq_result = eq(search, &scalar) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?; + Ok((eq_result.true_count() > 0, eq_result.null_count() > 0)) +} + +/// Element-by-element search using structural equality for nested types. +fn find_in_array_nested(probe: &ArrayRef, pi: usize, search: &ArrayRef) -> Result<(bool, bool)> { + let mut has_null = false; + for si in 0..search.len() { + if search.is_null(si) { + has_null = true; + continue; + } + if structural_eq(probe.as_ref(), pi, search.as_ref(), si)? { + return Ok((true, has_null)); + } + } + Ok((false, has_null)) +} + +fn needs_recursive_eq(dt: &DataType) -> bool { + matches!( + dt, + DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Struct(_) + ) +} + +/// Structural equality for array elements (grouping semantics: NULL == NULL is true). +/// This matches Spark's `ordering.equiv` used inside `arrays_overlap`. +/// Three-valued null logic only applies to outer-level null elements (handled by the caller). +fn structural_eq(left: &dyn Array, li: usize, right: &dyn Array, ri: usize) -> Result { + // NullArray::is_null() returns false (no null buffer), so check data type first. + if left.data_type() == &DataType::Null && right.data_type() == &DataType::Null { + return Ok(true); + } + + if left.is_null(li) && right.is_null(ri) { + return Ok(true); + } + if left.is_null(li) || right.is_null(ri) { + return Ok(false); + } + + match left.data_type() { + DataType::List(_) => { + let ll = left + .as_any() + .downcast_ref::>() + .unwrap(); + let rl = right + .as_any() + .downcast_ref::>() + .unwrap(); + list_structural_eq(&ll.value(li), &rl.value(ri)) + } + DataType::LargeList(_) => { + let ll = left + .as_any() + .downcast_ref::>() + .unwrap(); + let rl = right + .as_any() + .downcast_ref::>() + .unwrap(); + list_structural_eq(&ll.value(li), &rl.value(ri)) + } + DataType::FixedSizeList(_, _) => { + let ll = left.as_any().downcast_ref::().unwrap(); + let rl = right.as_any().downcast_ref::().unwrap(); + list_structural_eq(&ll.value(li), &rl.value(ri)) + } + DataType::Struct(_) => { + let ls = left.as_any().downcast_ref::().unwrap(); + let rs = right.as_any().downcast_ref::().unwrap(); + struct_structural_eq(ls, li, rs, ri) + } + _ => { + // Both non-null at this point; eq on two non-null scalars is definitive. + let l = Scalar::new(left.slice(li, 1)); + let r = Scalar::new(right.slice(ri, 1)); + let result = eq(&l, &r) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?; + Ok(result.value(0)) + } + } +} + +fn list_structural_eq(left: &ArrayRef, right: &ArrayRef) -> Result { + if left.len() != right.len() { + return Ok(false); + } + for k in 0..left.len() { + if !structural_eq(left.as_ref(), k, right.as_ref(), k)? { + return Ok(false); + } + } + Ok(true) +} + +fn struct_structural_eq( + left: &StructArray, + li: usize, + right: &StructArray, + ri: usize, +) -> Result { + for (lc, rc) in left.columns().iter().zip(right.columns().iter()) { + if !structural_eq(lc.as_ref(), li, rc.as_ref(), ri)? { + return Ok(false); + } + } + Ok(true) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, Int32Builder, ListArray, ListBuilder, StructBuilder}; + use arrow::buffer::{NullBuffer, OffsetBuffer}; + use arrow::datatypes::Field; + + fn make_list_array( + values: &Int32Array, + offsets: &[i32], + nulls: Option, + ) -> ListArray { + ListArray::new( + Arc::new(Field::new("item", DataType::Int32, true)), + OffsetBuffer::new(offsets.to_vec().into()), + Arc::new(values.clone()), + nulls, + ) + } + + #[test] + fn test_basic_overlap() -> Result<()> { + // [1, 2, 3] vs [3, 4, 5] => true + let left = make_list_array(&Int32Array::from(vec![1, 2, 3]), &[0, 3], None); + let right = make_list_array(&Int32Array::from(vec![3, 4, 5]), &[0, 3], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.value(0)); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_no_overlap() -> Result<()> { + // [1, 2] vs [3, 4] => false + let left = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![3, 4]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(!result.value(0)); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_null_only_overlap() -> Result<()> { + // [1, NULL] vs [NULL, 2] => null (no definite overlap, but nulls present) + let left = make_list_array(&Int32Array::from(vec![Some(1), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![None, Some(2)]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_null_with_overlap() -> Result<()> { + // [1, NULL] vs [1, 2] => true (definite overlap on 1) + let left = make_list_array(&Int32Array::from(vec![Some(1), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.value(0)); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_empty_array() -> Result<()> { + // [1, NULL, 3] vs [] => false + let left = make_list_array( + &Int32Array::from(vec![Some(1), None, Some(3)]), + &[0, 3], + None, + ); + let right = make_list_array(&Int32Array::from(Vec::::new()), &[0, 0], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(!result.value(0)); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_null_array() -> Result<()> { + // NULL vs [1, 2] => null + let left = make_list_array( + &Int32Array::from(Vec::::new()), + &[0, 0], + Some(NullBuffer::from(vec![false])), + ); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_both_null_elements() -> Result<()> { + // [NULL] vs [NULL] => null + let left = make_list_array(&Int32Array::from(vec![None::]), &[0, 1], None); + let right = make_list_array(&Int32Array::from(vec![None::]), &[0, 1], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_both_null_elements_via_null_array() -> Result<()> { + // Simulate what DataFusion's make_array(NULL) produces: List with NullArray values + use arrow::array::NullArray; + + let null_values = Arc::new(NullArray::new(1)) as ArrayRef; + let null_field = Arc::new(Field::new("item", DataType::Null, true)); + let left = ListArray::new( + Arc::clone(&null_field), + OffsetBuffer::new(vec![0, 1].into()), + Arc::clone(&null_values), + None, + ); + let right = ListArray::new( + null_field, + OffsetBuffer::new(vec![0, 1].into()), + null_values, + None, + ); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!( + result.is_null(0), + "Expected null for [NULL] vs [NULL] (NullArray representation), got {:?}", + result + ); + Ok(()) + } + + #[test] + fn test_one_null_element_no_overlap() -> Result<()> { + // [3, NULL] vs [1, 2] => null + let left = make_list_array(&Int32Array::from(vec![Some(3), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + /// Build a single-row ListArray of nested lists: List> + fn make_nested_list(elements: Vec>>>) -> ListArray { + let inner_builder = ListBuilder::new(Int32Builder::new()); + let mut outer_builder = ListBuilder::new(inner_builder); + + for elem in &elements { + match elem { + Some(inner) => { + let inner_list_builder = outer_builder.values(); + for val in inner { + match val { + Some(v) => inner_list_builder.values().append_value(*v), + None => inner_list_builder.values().append_null(), + } + } + inner_list_builder.append(true); + } + None => { + outer_builder.values().append(false); + } + } + } + outer_builder.append(true); + outer_builder.finish() + } + + #[test] + fn test_nested_array_basic_overlap() -> Result<()> { + // [[1,2], [3,4]] vs [[3,4], [5,6]] => true + let left = make_nested_list(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + ]); + let right = make_nested_list(vec![ + Some(vec![Some(3), Some(4)]), + Some(vec![Some(5), Some(6)]), + ]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + #[test] + fn test_nested_array_no_overlap() -> Result<()> { + // [[1,2]] vs [[3,4]] => false + let left = make_nested_list(vec![Some(vec![Some(1), Some(2)])]); + let right = make_nested_list(vec![Some(vec![Some(3), Some(4)])]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(!result.value(0)); + Ok(()) + } + + #[test] + fn test_nested_array_inner_nulls_match() -> Result<()> { + // [[1,NULL]] vs [[1,NULL]] => true (structural equality: NULL == NULL) + let left = make_nested_list(vec![Some(vec![Some(1), None])]); + let right = make_nested_list(vec![Some(vec![Some(1), None])]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + #[test] + fn test_nested_array_inner_nulls_no_match() -> Result<()> { + // [[1,NULL]] vs [[1,2], [3,4]] => false (structural: [1,NULL] != [1,2], [1,NULL] != [3,4]) + let left = make_nested_list(vec![Some(vec![Some(1), None])]); + let right = make_nested_list(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + ]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(!result.value(0)); + Ok(()) + } + + #[test] + fn test_nested_array_all_null_elements_match() -> Result<()> { + // [[NULL]] vs [[NULL]] => true (structural equality: [NULL] == [NULL]) + let left = make_nested_list(vec![Some(vec![None])]); + let right = make_nested_list(vec![Some(vec![None])]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!( + result.is_valid(0), + "Expected true for [[NULL]] vs [[NULL]], got null" + ); + assert!( + result.value(0), + "Expected true for [[NULL]] vs [[NULL]], got false" + ); + Ok(()) + } + + #[test] + fn test_nested_array_definite_match_despite_inner_nulls() -> Result<()> { + // [[1,2], [1,NULL]] vs [[1,2]] => true (definite match on [1,2]) + let left = make_nested_list(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(1), None]), + ]); + let right = make_nested_list(vec![Some(vec![Some(1), Some(2)])]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + /// Build a single-row ListArray of structs: List> + fn make_struct_list(elements: Vec, Option)>>) -> ListArray { + let fields = vec![ + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Field::new("b", DataType::Int32, true)), + ]; + let struct_builder = StructBuilder::new( + fields.clone(), + vec![Box::new(Int32Builder::new()), Box::new(Int32Builder::new())], + ); + let mut list_builder = ListBuilder::new(struct_builder); + + for elem in &elements { + let sb = list_builder.values(); + match elem { + Some((a, b)) => { + sb.field_builder::(0) + .unwrap() + .append_option(*a); + sb.field_builder::(1) + .unwrap() + .append_option(*b); + sb.append(true); + } + None => { + sb.field_builder::(0).unwrap().append_null(); + sb.field_builder::(1).unwrap().append_null(); + sb.append(false); + } + } + } + list_builder.append(true); + list_builder.finish() + } + + #[test] + fn test_struct_basic_overlap() -> Result<()> { + // [{1,2}, {3,4}] vs [{3,4}, {5,6}] => true + let left = make_struct_list(vec![Some((Some(1), Some(2))), Some((Some(3), Some(4)))]); + let right = make_struct_list(vec![Some((Some(3), Some(4))), Some((Some(5), Some(6)))]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + #[test] + fn test_struct_no_overlap() -> Result<()> { + // [{1,2}] vs [{3,4}] => false + let left = make_struct_list(vec![Some((Some(1), Some(2)))]); + let right = make_struct_list(vec![Some((Some(3), Some(4)))]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(!result.value(0)); + Ok(()) + } + + #[test] + fn test_struct_with_null_field_match() -> Result<()> { + // [{1,NULL}] vs [{1,NULL}] => true (structural equality: NULL == NULL) + let left = make_struct_list(vec![Some((Some(1), None))]); + let right = make_struct_list(vec![Some((Some(1), None))]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + #[test] + fn test_struct_definite_match_with_null_field() -> Result<()> { + // [{1,2}, {1,NULL}] vs [{1,2}] => true (definite match on {1,2}) + let left = make_struct_list(vec![Some((Some(1), Some(2))), Some((Some(1), None))]); + let right = make_struct_list(vec![Some((Some(1), Some(2)))]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_valid(0)); + assert!(result.value(0)); + Ok(()) + } + + #[test] + fn test_struct_null_element() -> Result<()> { + // [NULL] vs [{1,2}] => null (null outer element) + let left = make_struct_list(vec![None]); + let right = make_struct_list(vec![Some((Some(1), Some(2)))]); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } +} diff --git a/native/spark-expr/src/array_funcs/mod.rs b/native/spark-expr/src/array_funcs/mod.rs index e5fc57a7e3..99f0a1eecc 100644 --- a/native/spark-expr/src/array_funcs/mod.rs +++ b/native/spark-expr/src/array_funcs/mod.rs @@ -17,6 +17,7 @@ mod array_compact; mod array_insert; +mod arrays_overlap; mod arrays_zip; mod get_array_struct_fields; mod list_extract; @@ -24,6 +25,7 @@ mod size; pub use array_compact::SparkArrayCompact; pub use array_insert::ArrayInsert; +pub use arrays_overlap::SparkArraysOverlap; pub use arrays_zip::SparkArraysZipFunc; pub use get_array_struct_fields::GetArrayStructFields; pub use list_extract::ListExtract; diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index f6f6ef29c4..74e688cd1c 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -23,8 +23,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, - spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff, - SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc, + spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArraysOverlap, SparkContains, + SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -201,6 +201,7 @@ pub fn create_comet_physical_fun_with_eval_mode( fn all_scalar_functions() -> Vec> { vec![ Arc::new(ScalarUDF::new_from_impl(SparkArrayCompact::default())), + Arc::new(ScalarUDF::new_from_impl(SparkArraysOverlap::default())), Arc::new(ScalarUDF::new_from_impl(SparkContains::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())), diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index bd8c10c15f..91f62f1e0d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -228,23 +228,15 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { - - override def getSupportLevel(expr: ArraysOverlap): SupportLevel = - Incompatible( - Some( - "Inconsistent behavior with NULL values" + - " (https://github.com/apache/datafusion-comet/issues/3645)" + - " (https://github.com/apache/datafusion-comet/issues/2036)")) - override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val leftArrayExprProto = exprToProto(expr.children.head, inputs, binding) - val rightArrayExprProto = exprToProto(expr.children(1), inputs, binding) + val leftArrayExprProto = exprToProto(expr.left, inputs, binding) + val rightArrayExprProto = exprToProto(expr.right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( - "array_has_any", + "spark_arrays_overlap", BooleanType, false, leftArrayExprProto, diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql index e9f107d568..f2a47a33ac 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql @@ -15,19 +15,17 @@ -- specific language governing permissions and limitations -- under the License. --- Config: spark.comet.expression.ArraysOverlap.allowIncompatible=true - statement CREATE TABLE test_arrays_overlap(a array, b array) USING parquet statement INSERT INTO test_arrays_overlap VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array(3, 4)), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2)) -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, b) FROM test_arrays_overlap -- column + literal -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, array(3, 4, 5)) FROM test_arrays_overlap -- literal + column @@ -37,3 +35,170 @@ SELECT arrays_overlap(array(1, 2, 3), b) FROM test_arrays_overlap -- literal + literal query SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)), arrays_overlap(array(1, 2), array(3, 4)), arrays_overlap(array(), array(1)), arrays_overlap(cast(NULL as array), array(1)) + +-- NULL element semantics (three-valued logic) +-- When no match is found but NULL elements exist, result should be NULL (uncertain) +statement +CREATE TABLE test_overlap_nulls(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_nulls VALUES (array(1, NULL, 3), array(4, 5)), (array(1, NULL, 3), array(1, 5)), (array(1, NULL), array(NULL, 2)), (array(NULL), array(NULL)), (array(NULL, NULL), array(NULL, NULL)), (array(1, NULL), array(2, NULL)), (array(NULL, 2), array(1, NULL)) + +-- no match + has NULL => NULL +query +SELECT arrays_overlap(a, b) FROM test_overlap_nulls WHERE a = array(1, NULL, 3) AND b = array(4, 5) + +-- has match + has NULL => true (match found, NULL irrelevant) +query +SELECT arrays_overlap(a, b) FROM test_overlap_nulls WHERE a = array(1, NULL, 3) AND b = array(1, 5) + +-- NULL vs NULL elements => NULL (NULL != NULL) +query +SELECT arrays_overlap(a, b) FROM test_overlap_nulls WHERE a = array(NULL) AND b = array(NULL) + +-- all rows +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_nulls + +-- empty array combinations +query +SELECT arrays_overlap(array(), array()) FROM test_overlap_nulls + +query +SELECT arrays_overlap(array(), array(1, 2)) FROM test_overlap_nulls + +query +SELECT arrays_overlap(array(1, 2), array()) FROM test_overlap_nulls + +query +SELECT arrays_overlap(array(), array(NULL)) FROM test_overlap_nulls + +-- both-NULL arrays +query +SELECT arrays_overlap(cast(NULL as array), cast(NULL as array)) FROM test_overlap_nulls + +-- identical arrays +query +SELECT arrays_overlap(a, a) FROM test_overlap_nulls + +-- duplicate elements in arrays +statement +CREATE TABLE test_overlap_dups(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_dups VALUES (array(1, 1, 1), array(2, 2, 2)), (array(1, 1, 1), array(1, 2, 2)), (array(1, 2, 1, 2), array(3, 4, 3, 4)), (array(1, 2, 1, 2), array(2, 3, 2, 3)) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_dups + +-- single element arrays +query +SELECT arrays_overlap(array(1), array(1)) FROM test_overlap_dups + +query +SELECT arrays_overlap(array(1), array(2)) FROM test_overlap_dups + +-- string arrays +statement +CREATE TABLE test_overlap_str(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_str VALUES (array('a', 'b', 'c'), array('c', 'd')), (array('a', 'b'), array('c', 'd')), (array('a', NULL), array('b', NULL)), (array('a', NULL), array('a', 'b')), (NULL, array('a')), (array(''), array('')), (array('', NULL), array('x')) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_str + +-- empty string vs NULL +query +SELECT arrays_overlap(array('', 'a'), array('')) FROM test_overlap_str + +-- double arrays with special values +statement +CREATE TABLE test_overlap_dbl(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_dbl VALUES (array(1.0, 2.0), array(2.0, 3.0)), (array(1.0, double('NaN')), array(double('NaN'), 2.0)), (array(double('Infinity'), 1.0), array(double('Infinity'))), (array(double('-Infinity')), array(double('Infinity'))), (array(0.0), array(-0.0)), (array(1.0, NULL), array(2.0, NULL)) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_dbl + +-- boolean arrays +query +SELECT arrays_overlap(array(true, false), array(false)) FROM test_overlap_dbl + +query +SELECT arrays_overlap(array(true), array(false)) FROM test_overlap_dbl + +query +SELECT arrays_overlap(array(true, NULL), array(false)) FROM test_overlap_dbl + +-- bigint arrays +statement +CREATE TABLE test_overlap_long(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_long VALUES (array(9223372036854775807, 1), array(9223372036854775807)), (array(-9223372036854775808), array(-9223372036854775808)), (array(0), array(1)) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_long + +-- decimal arrays +statement +CREATE TABLE test_overlap_dec(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_dec VALUES (array(1.00, 2.50), array(2.50, 3.00)), (array(1.00, 2.00), array(3.00, 4.00)), (array(1.10, NULL), array(2.20, NULL)) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_dec + +-- date arrays +statement +CREATE TABLE test_overlap_date(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_date VALUES (array(date '2024-01-01', date '2024-06-15'), array(date '2024-06-15', date '2024-12-31')), (array(date '2024-01-01'), array(date '2024-12-31')), (array(date '2024-01-01', NULL), array(date '2024-12-31')) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_date + +-- timestamp arrays +statement +CREATE TABLE test_overlap_ts(a array, b array) USING parquet + +statement +INSERT INTO test_overlap_ts VALUES (array(timestamp '2024-01-01 00:00:00', timestamp '2024-06-15 12:00:00'), array(timestamp '2024-06-15 12:00:00')), (array(timestamp '2024-01-01 00:00:00'), array(timestamp '2024-12-31 23:59:59')) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_ts + +-- nested arrays +statement +CREATE TABLE test_overlap_nested(a array>, b array>) USING parquet + +statement +INSERT INTO test_overlap_nested VALUES (array(array(1, 2), array(3, 4)), array(array(3, 4), array(5, 6))), (array(array(1, 2)), array(array(3, 4))), (array(array(1, 2), cast(NULL as array)), array(array(3, 4))), (array(array(1, NULL)), array(array(1, NULL))), (array(cast(NULL as array)), array(cast(NULL as array))) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_nested + +-- struct element arrays +statement +CREATE TABLE test_overlap_struct(a array>, b array>) USING parquet + +statement +INSERT INTO test_overlap_struct VALUES (array(named_struct('x', 1, 'y', 2)), array(named_struct('x', 1, 'y', 2))), (array(named_struct('x', 1, 'y', 2)), array(named_struct('x', 3, 'y', 4))), (array(named_struct('x', 1, 'y', cast(NULL as int))), array(named_struct('x', 1, 'y', cast(NULL as int)))), (array(cast(NULL as struct)), array(cast(NULL as struct))) + +query +SELECT a, b, arrays_overlap(a, b) FROM test_overlap_struct + +-- mixed column and literal with NULL elements +query +SELECT arrays_overlap(a, array(99, NULL)) FROM test_arrays_overlap + +query +SELECT arrays_overlap(array(NULL, 99), b) FROM test_arrays_overlap + +-- conditional (CASE WHEN) arrays +query +SELECT arrays_overlap(CASE WHEN a IS NOT NULL THEN a ELSE array(0) END, b) FROM test_arrays_overlap diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index cb52e7e6c8..2a143c4601 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArrayUnion} import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -539,22 +539,100 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap") { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArraysOverlap]) -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); - } + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); + } + } + } + } + + test("arrays_overlap - null handling behavior verification") { + withSQLConf( + "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTable("t") { + sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") + val data = Seq( + "array(1, 2, 3)", + "array(3, 4, 5)", + "array(1, 2)", + "array(3, 4)", + "array(1, NULL, 3)", + "array(4, 5)", + "array(1, 4)", + "array(1, NULL)", + "array(2, NULL)", + "array(NULL, 2)", + "array(1)", + "array(2)", + "array()", + "array(NULL)", + "array(NULL, NULL)", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } + } + } + } + + test("arrays_overlap - nested array null handling behavior verification") { + withSQLConf( + "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTable("t") { + sql( + "create table t using parquet as select CAST(NULL as array>) a1 from range(1)") + val data = Seq( + "array(array(1, 2), array(3, 4))", + "array(array(1, 2), array(5, 6))", + "array(array(1, 2))", + "array(array(3, 4))", + "array(array(1, NULL))", + "array(array(NULL, 2))", + "array(array(NULL))", + "array(CAST(NULL as array))", + "array(array(1, 2), CAST(NULL as array))", + "array()", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } + } + } + } + + test("arrays_overlap - struct element null handling behavior verification") { + withSQLConf( + "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTable("t") { + sql( + "create table t using parquet as select CAST(NULL as array>) a1 from range(1)") + // Cast all structs to the same nullable type to avoid Arrow schema mismatch + val s = "struct" + val data = Seq( + s"array(CAST(named_struct('a', 1, 'b', 2) AS $s), CAST(named_struct('a', 3, 'b', 4) AS $s))", + s"array(CAST(named_struct('a', 1, 'b', 2) AS $s))", + s"array(CAST(named_struct('a', 3, 'b', 4) AS $s))", + s"array(CAST(named_struct('a', 1, 'b', CAST(NULL as int)) AS $s))", + s"array(CAST(named_struct('a', CAST(NULL as int), 'b', 2) AS $s))", + s"array(CAST(named_struct('a', CAST(NULL as int), 'b', CAST(NULL as int)) AS $s))", + s"array(CAST(NULL as $s))", + s"array(CAST(named_struct('a', 1, 'b', 2) AS $s), CAST(NULL as $s))", + "array()", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) } } }