Skip to content

Mark BufferExec and AnalyzeExec as eager#22711

Open
geoffreyclaude wants to merge 3 commits into
apache:mainfrom
geoffreyclaude:fix/eager-evaluation-metadata
Open

Mark BufferExec and AnalyzeExec as eager#22711
geoffreyclaude wants to merge 3 commits into
apache:mainfrom
geoffreyclaude:fix/eager-evaluation-metadata

Conversation

@geoffreyclaude
Copy link
Copy Markdown
Contributor

@geoffreyclaude geoffreyclaude commented Jun 2, 2026

Which issue does this PR close?

Rationale for this change

BufferExec and AnalyzeExec both have eager evaluation behavior: they can drive child streams in spawned tasks instead of performing exactly one downstream poll worth of work at a time. Their PlanProperties currently report EvaluationType::Lazy, so physical-plan metadata does not describe how these operators execute.

While making that metadata accurate, this PR also keeps need_data_exchange(...) scoped to its original purpose. #4585 introduced that helper as a way to identify physical operators that require exchange-style handling because they redistribute partitions or gather multiple input partitions into one output partition. #4586 implemented it for the native exchange/gather operators called out there: non-round-robin RepartitionExec, CoalescePartitionsExec, and SortPreservingMergeExec.

The later cooperative-scheduling work in #16398 introduced EvaluationType and made need_data_exchange(...) use evaluation_type == EvaluationType::Eager. That shortcut worked when the eager operators and exchange/gather operators were the same set. With BufferExec and AnalyzeExec correctly classified as eager, the shortcut would make need_data_exchange(...) report operators that do eager child polling but do not perform a data exchange.

So the intended split in this PR is:

  • EvaluationType describes execution behavior.
  • need_data_exchange(...) identifies partition redistribution or partition gathering.

What changes are included in this PR?

  • Mark BufferExec as EvaluationType::Eager while keeping its existing cooperative scheduling metadata.
  • Mark AnalyzeExec as EvaluationType::Eager in its computed plan properties.
  • Clarify the EvaluationType docs so eager evaluation is not defined by whether work starts in execute or on the first stream poll.
  • Restore need_data_exchange(...) as an exchange/gather predicate for the native exchange operators instead of deriving it from all eager operators.

Are these changes tested?

Targeted tests pass:

cargo test -p datafusion-physical-plan --lib buffer::tests
cargo test -p datafusion-physical-plan --lib analyze::tests
cargo test -p datafusion-physical-plan --doc execution_plan
cargo test -p datafusion-physical-plan --lib execution_plan::tests::buffer_exec_does_not_need_data_exchange
RUSTDOCFLAGS="-D warnings" cargo doc -p datafusion-physical-plan --no-deps

I also added a focused regression test that BufferExec does not require data exchange. I did not add tests that merely assert the assigned EvaluationType, as those would duplicate the implementation rather than cover behavior.

Are there any user-facing changes?

No query-result changes are expected. This updates physical-plan metadata and keeps need_data_exchange(...) scoped to operators that move data between partitions or gather partitions together.

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 2, 2026
@geoffreyclaude geoffreyclaude force-pushed the fix/eager-evaluation-metadata branch from 298245c to aab6e33 Compare June 2, 2026 08:30
Copy link
Copy Markdown
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 +1 from me, this looks more correct now.

Comment on lines -1220 to +1249
plan.properties().evaluation_type == EvaluationType::Eager
if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() {
!matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_))
} else if let Some(coalesce) = plan.downcast_ref::<CoalescePartitionsExec>() {
coalesce.input().output_partitioning().partition_count() > 1
} else if let Some(sort_preserving_merge) =
plan.downcast_ref::<SortPreservingMergeExec>()
{
sort_preserving_merge
.input()
.output_partitioning()
.partition_count()
> 1
} else {
false
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some operators that are correctly marked as Eager, but that this function was wrongly treating them as need_data_exchange() -> true:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need_data_exchange was pretty broken then... As discussed privately, we should look into deprecating then deleting this method, as probably no-one is using it (at least correctly.)

@gabotechs
Copy link
Copy Markdown
Contributor

I'll leave this here marinating for a day so that other people can chime in, otherwise, I'll merge it tomorrow

@geoffreyclaude geoffreyclaude force-pushed the fix/eager-evaluation-metadata branch from aab6e33 to 334cb57 Compare June 2, 2026 09:23
@github-actions github-actions Bot added the sqllogictest SQL Logic Tests (.slt) label Jun 2, 2026
@geoffreyclaude geoffreyclaude force-pushed the fix/eager-evaluation-metadata branch from 99f62b2 to 334cb57 Compare June 2, 2026 10:41
@github-actions github-actions Bot removed the sqllogictest SQL Logic Tests (.slt) label Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

BufferExec and AnalyzeExec should report eager evaluation

2 participants