Skip to content

Fix non-deterministic iteration in SessionStateBuilder#21262

Open
shehab-ali wants to merge 1 commit intoapache:mainfrom
shehab-ali:shehab-ali/fix-indeterministic-udf
Open

Fix non-deterministic iteration in SessionStateBuilder#21262
shehab-ali wants to merge 1 commit intoapache:mainfrom
shehab-ali:shehab-ali/fix-indeterministic-udf

Conversation

@shehab-ali
Copy link
Copy Markdown

@shehab-ali shehab-ali commented Mar 30, 2026

Which issue does this PR close?

Problem: SessionStateBuilder::new_from_existing() previously used HashMap::into_values().collect_vec() for scalar, aggregate, and window functions. Since the HashMap stores entries for both the canonical name and each alias (all pointing to the same Arc<UDF>), into_values() produced duplicate entries in arbitrary order. When build() re-registered them, the last-writer-wins behavior on shared alias keys was nondeterministic.

Fix: Introduced dedup_function_registry_by_canonical_name() which:

Iterates HashMap keys in sorted order (deterministic)
Keeps only one Arc<UDF> per unique canonical name (no duplicates)
This ensures build() re-registers each function exactly once, making the round-trip through new_from_existing()build() deterministic and alias-preserving.

Say you register a UDF named "postgres_to_char" with alias "to_char". The SessionState HashMap stores:
"postgres_to_char" → Arc<UDF(postgres_to_char)>
"to_char"          → Arc<UDF(postgres_to_char)>   // same arc, alias entry

Before the fix — into_values().collect_vec() produces:

// Two copies of the same UDF, in arbitrary HashMap iteration order
[Arc<UDF(postgres_to_char)>, Arc<UDF(postgres_to_char)>]

build() then calls register_udf() for each entry. Each register_udf call re-inserts both the canonical name and all aliases:

// First registration (say the "to_char" entry came first):
"postgres_to_char" → Arc_1
"to_char"          → Arc_1

// Second registration (the "postgres_to_char" entry):
"postgres_to_char" → Arc_2   // overwrites Arc_1
"to_char"          → Arc_2   // overwrites Arc_1

If `with_updated_config()` returns `Some` (which creates a new Arc each call), 
Arc_1 and Arc_2 are different objects for the same logical function. 
Which one ends up stored depends on HashMap iteration order — which is random per run. 
Scale this to many UDFs with aliases and the rebuilt state becomes non-deterministic.

After the fix — filtering to key == canonical_name produces:

// Only the canonical entry, no duplicates
[Arc<UDF(postgres_to_char)>]

Are these changes tested?

Added unit test

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Mar 30, 2026
@shehab-ali shehab-ali force-pushed the shehab-ali/fix-indeterministic-udf branch from a9333c6 to 97e37c6 Compare March 30, 2026 20:12
/// one [`Arc`] per logical function.
fn dedup_function_registry_by_canonical_name<T>(
map: &HashMap<String, Arc<T>>,
canonical_name: impl Fn(&T) -> &str,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need canonical_name as it's own function if all the uses are the same function?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those function have different types though: WindowUDF, ScalarUDF, AggregateUDF

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see now, my bad.

@shehab-ali shehab-ali force-pushed the shehab-ali/fix-indeterministic-udf branch from 97e37c6 to 936c4bd Compare March 30, 2026 20:50
@shehab-ali shehab-ali marked this pull request as ready for review March 31, 2026 14:13
Copy link
Copy Markdown
Contributor

@ahmed-mez ahmed-mez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this! I hope the maintainers review this soon.

/// matches the canonical name. The session stores one hash map entry per alias
/// plus the canonical name; filtering to canonical-name entries yields exactly
/// one [`Arc`] per logical function.
fn dedup_function_registry_by_canonical_name<T>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think the function could consume the map instead of borrowing it, to avoid some arc clones

  fn dedup_function_registry_by_canonical_name<T>(
      map: HashMap<String, Arc<T>>,
      canonical_name: impl Fn(&T) -> &str,
  ) -> Vec<Arc<T>> {
      map.into_iter()
          .filter(|(key, udf)| key.as_str() == canonical_name(udf.as_ref()))
          .map(|(_, udf)| udf)  // no Arc::clone needed
          .collect()
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, it might be more robust to dedup by identity

  fn dedup_by_identity<T>(map: HashMap<String, Arc<T>>) -> Vec<Arc<T>> {
      let mut seen = HashSet::new();
      map.into_values()
          .filter(|arc| seen.insert(Arc::as_ptr(arc)))
          .collect()
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants