From 1acdf3d8e9ec5b5ce490d9c0eb2ad18a2e6a3b8a Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 31 Mar 2026 01:25:00 +0300 Subject: [PATCH 1/4] Fix `optimize_projections` failure after mark joins created by `EXISTS OR EXISTS` --- .../src/decorrelate_predicate_subquery.rs | 23 +++++-- .../optimizer/src/optimize_projections/mod.rs | 65 +++++++++++++++++-- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index a4c5d8c38549d..0609109ec6e58 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -89,6 +89,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery { // iterate through all exists clauses in predicate, turning each into a join let mut cur_input = Arc::unwrap_or_clone(filter.input); + let original_schema = cur_input.schema().columns(); for subquery_expr in with_subqueries { match extract_subquery_info(subquery_expr) { // The subquery expression is at the top level of the filter @@ -115,6 +116,13 @@ impl OptimizerRule for DecorrelatePredicateSubquery { let new_filter = Filter::try_new(expr, Arc::new(cur_input))?; cur_input = LogicalPlan::Filter(new_filter); } + + if cur_input.schema().fields().len() != original_schema.len() { + cur_input = LogicalPlanBuilder::from(cur_input) + .project(original_schema.into_iter().map(Expr::from))? + .build()?; + } + Ok(Transformed::yes(cur_input)) } @@ -1736,13 +1744,14 @@ mod tests { plan, @r" Projection: customer.c_custkey [c_custkey:Int64] - Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean] - LeftMark Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean] - TableScan: customer [c_custkey:Int64, c_name:Utf8] - SubqueryAlias: __correlated_sq_1 [o_custkey:Int64] - Projection: orders.o_custkey [o_custkey:Int64] - Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] - TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] + Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8] + Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean] + LeftMark Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean] + TableScan: customer [c_custkey:Int64, c_name:Utf8] + SubqueryAlias: __correlated_sq_1 [o_custkey:Int64] + Projection: orders.o_custkey [o_custkey:Int64] + Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] + TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N] " ) } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 93df300bb50b4..f00231b20662f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -385,8 +385,9 @@ fn optimize_projections( } LogicalPlan::Join(join) => { let left_len = join.left.schema().fields().len(); + let right_len = join.right.schema().fields().len(); let (left_req_indices, right_req_indices) = - split_join_requirements(left_len, indices, &join.join_type); + split_join_requirements(left_len, right_len, indices, &join.join_type); let left_indices = left_req_indices.with_plan_exprs(&plan, join.left.schema())?; let right_indices = @@ -757,21 +758,29 @@ fn outer_columns_helper_multi<'a, 'b>( /// adjusted based on the join type. fn split_join_requirements( left_len: usize, + right_len: usize, indices: RequiredIndices, join_type: &JoinType, ) -> (RequiredIndices, RequiredIndices) { match join_type { // In these cases requirements are split between left/right children: - JoinType::Inner - | JoinType::Left - | JoinType::Right - | JoinType::Full - | JoinType::LeftMark - | JoinType::RightMark => { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: indices.split_off(left_len) } + JoinType::LeftMark => { + // LeftMark output: [left_cols(0..left_len), mark(left_len)] + // The mark column is synthetic (produced by the join itself), + // so discard it and route only to the left child. + let (left_indices, _mark) = indices.split_off(left_len); + (left_indices, RequiredIndices::new()) + } + JoinType::RightMark => { + // Same as LeftMark, but for the right child. + let (right_indices, _mark) = indices.split_off(right_len); + (RequiredIndices::new(), right_indices) + } // All requirements can be re-routed to left child directly. JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()), // All requirements can be re-routed to right side directly. @@ -2311,6 +2320,48 @@ mod tests { ) } + #[test] + fn optimize_projections_left_mark_join_with_outer_join() -> Result<()> { + use datafusion_expr::utils::disjunction; + use datafusion_expr::{exists, out_ref_col}; + + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + + let sq_a = Arc::new( + LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?) + .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))? + .project(vec![lit(1)])? + .build()?, + ); + + let sq_b = Arc::new( + LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?) + .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))? + .project(vec![lit(1)])? + .build()?, + ); + + let exists_a = exists(sq_a); + let exists_b = exists(sq_b); + + let plan = LogicalPlanBuilder::from(table_a) + .filter(disjunction(vec![exists_a, exists_b]).unwrap())? + .join(table_b, JoinType::Left, (vec!["a"], vec!["a"]), None)? + .build()?; + + let optimizer = Optimizer::new(); + let config = OptimizerContext::new(); + let result = optimizer.optimize(plan, &config, observe); + assert!( + result.is_ok(), + "Full optimizer should not fail with schema mismatch: {:?}", + result.err() + ); + + Ok(()) + } + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} fn optimize(plan: LogicalPlan) -> Result { From 49b743f27b0718e6860558cd06befc9e5232606e Mon Sep 17 00:00:00 2001 From: buraksenn Date: Tue, 31 Mar 2026 01:31:44 +0300 Subject: [PATCH 2/4] add optimized plan check follow up on https://github.com/apache/datafusion/pull/17119 --- datafusion/optimizer/src/optimize_projections/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index f00231b20662f..a17ef620c2f64 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -2358,6 +2358,14 @@ mod tests { "Full optimizer should not fail with schema mismatch: {:?}", result.err() ); + let optimized = result.unwrap(); + let plan_str = format!("{optimized}"); + // Verify no double projection — the projection we add to strip mark + // columns should be merged by optimize_projections, not left stacked. + assert!( + !plan_str.contains("Projection: a.a, a.b, a.c\n Projection:"), + "Double projection should be merged by optimize_projections" + ); Ok(()) } From 79d69749b23ea72a483dcb9275c274517a153305 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Mon, 6 Apr 2026 11:55:41 +0300 Subject: [PATCH 3/4] address comments --- .../optimizer/src/optimize_projections/mod.rs | 58 ++++++------------- 1 file changed, 17 insertions(+), 41 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index a17ef620c2f64..3a8a60b7cf519 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -747,6 +747,7 @@ fn outer_columns_helper_multi<'a, 'b>( /// # Parameters /// /// * `left_len` - The length of the left child. +/// * `right_len` - The length of the right child. /// * `indices` - A slice of requirement indices. /// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`). /// @@ -770,7 +771,7 @@ fn split_join_requirements( indices.split_off(left_len) } JoinType::LeftMark => { - // LeftMark output: [left_cols(0..left_len), mark(left_len)] + // LeftMark output: [left_cols(0..left_len), mark] // The mark column is synthetic (produced by the join itself), // so discard it and route only to the left child. let (left_indices, _mark) = indices.split_off(left_len); @@ -2322,52 +2323,27 @@ mod tests { #[test] fn optimize_projections_left_mark_join_with_outer_join() -> Result<()> { - use datafusion_expr::utils::disjunction; - use datafusion_expr::{exists, out_ref_col}; - let table_a = test_table_scan_with_name("a")?; let table_b = test_table_scan_with_name("b")?; - - let sq_a = Arc::new( - LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?) - .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))? - .project(vec![lit(1)])? - .build()?, - ); - - let sq_b = Arc::new( - LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?) - .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))? - .project(vec![lit(1)])? - .build()?, - ); - - let exists_a = exists(sq_a); - let exists_b = exists(sq_b); + let table_c = test_table_scan_with_name("c")?; let plan = LogicalPlanBuilder::from(table_a) - .filter(disjunction(vec![exists_a, exists_b]).unwrap())? - .join(table_b, JoinType::Left, (vec!["a"], vec!["a"]), None)? + .join(table_b, JoinType::LeftMark, (vec!["a"], vec!["a"]), None)? + .project(vec![col("a.a"), col("a.b"), col("a.c")])? + .join(table_c, JoinType::Left, (vec!["a"], vec!["a"]), None)? .build()?; - let optimizer = Optimizer::new(); - let config = OptimizerContext::new(); - let result = optimizer.optimize(plan, &config, observe); - assert!( - result.is_ok(), - "Full optimizer should not fail with schema mismatch: {:?}", - result.err() - ); - let optimized = result.unwrap(); - let plan_str = format!("{optimized}"); - // Verify no double projection — the projection we add to strip mark - // columns should be merged by optimize_projections, not left stacked. - assert!( - !plan_str.contains("Projection: a.a, a.b, a.c\n Projection:"), - "Double projection should be merged by optimize_projections" - ); - - Ok(()) + assert_optimized_plan_equal!( + plan, + @r" + Left Join: a.a = c.a + Projection: a.a, a.b, a.c + LeftMark Join: a.a = b.a + TableScan: a projection=[a, b, c] + TableScan: b projection=[a] + TableScan: c projection=[a, b, c] + " + ) } fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} From f63337c140928ff0fb8b1e30efb972762316d626 Mon Sep 17 00:00:00 2001 From: buraksenn Date: Mon, 6 Apr 2026 12:07:34 +0300 Subject: [PATCH 4/4] add regression test with comment --- .../optimizer/src/optimize_projections/mod.rs | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 3a8a60b7cf519..d1248349fded1 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -2321,8 +2321,45 @@ mod tests { ) } + // Regression test for https://github.com/apache/datafusion/issues/20083 + // Optimizer must not fail when LeftMark joins from EXISTS OR EXISTS + // feed into a Left join. #[test] - fn optimize_projections_left_mark_join_with_outer_join() -> Result<()> { + fn optimize_projections_exists_or_exists_with_outer_join() -> Result<()> { + use datafusion_expr::utils::disjunction; + use datafusion_expr::{exists, out_ref_col}; + + let table_a = test_table_scan_with_name("a")?; + let table_b = test_table_scan_with_name("b")?; + + let sq_a = Arc::new( + LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?) + .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))? + .project(vec![lit(1)])? + .build()?, + ); + + let sq_b = Arc::new( + LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?) + .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))? + .project(vec![lit(1)])? + .build()?, + ); + + let plan = LogicalPlanBuilder::from(table_a) + .filter(disjunction(vec![exists(sq_a), exists(sq_b)]).unwrap())? + .join(table_b, JoinType::Left, (vec!["a"], vec!["a"]), None)? + .build()?; + + let optimizer = Optimizer::new(); + let config = OptimizerContext::new(); + optimizer.optimize(plan, &config, observe)?; + + Ok(()) + } + + #[test] + fn optimize_projections_left_mark_join_with_projection() -> Result<()> { let table_a = test_table_scan_with_name("a")?; let table_b = test_table_scan_with_name("b")?; let table_c = test_table_scan_with_name("c")?;