Skip to content

feat: Stage based fallback#4519

Open
karuppayya wants to merge 4 commits into
apache:mainfrom
karuppayya:COMET-4518
Open

feat: Stage based fallback#4519
karuppayya wants to merge 4 commits into
apache:mainfrom
karuppayya:COMET-4518

Conversation

@karuppayya
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4518 .

Rationale for this change

Introduces a stage-aware fallback that reverts stages to Spark row-based execution when the number of C2R transitions exceeds a threshold.

What changes are included in this PR?

  • RevertNativeForTransitionHeavyStagespostColumnarTransitions rule that counts C2R transitions per stage and reverts via CometExec.originalPlan when threshold exceeded
  • Configs: spark.comet.exec.transitionRevert.enabled (default: true), spark.comet.exec.transitionRevert.maxTransitions (default: 2, min: 2)
  • Works for both AQE and non-AQE paths

How are these changes tested?

Unit tests added

@karuppayya
Copy link
Copy Markdown
Contributor Author

cc: @mbutrovich @andygrove @mbutrovich @parthchandra
The main idea here is for the stage to fallback entirely to JVM based on cost(number of transition between Columnar and row).

@andygrove
Copy link
Copy Markdown
Member

Thanks @karuppayya. I've observed this issue as well. I introduced CometConf.COMET_EXEC_SHUFFLE_CONVERT_FROM_SPARK_PLAN_ENABLED to control whether a row-based stage gets converted to columnar shuffle. It currently defaults to true, but I am curious whether changing the default to false would help with this. I was planning on running benchmarks with this config disabled but have not done that yet.

@andygrove
Copy link
Copy Markdown
Member

@karuppayya new test suites have to be manually added to pr_build_linux.yml and pr_build_macos.yml in .github workflows.

Suite not found in workflow .github/workflows/pr_build_linux.yml: org.apache.comet.rules.RevertNativeForTransitionHeavyStagesSuite

@karuppayya karuppayya force-pushed the COMET-4518 branch 2 times, most recently from 9061d20 to 7b69f7c Compare June 1, 2026 19:06
@karuppayya
Copy link
Copy Markdown
Contributor Author

Thanks @andygrove . I think difference is that COMET_EXEC_SHUFFLE_CONVERT_FROM_SPARK_PLAN_ENABLED only checks the immediate child of the shuffle to decide whether to convert or not. It doesn't look at the full stage plan, so you could still end up with costly transitions deeper in the stage

This changes tries evaluates the full stage plan and makes a per-stage decision based on the actual transition cost. In future we can extend it to factor in estimated row counts, operator complexity, etc.

@karuppayya
Copy link
Copy Markdown
Contributor Author

Also just curious since i dont have the background, why is the CI trigger manual?

@comphead
Copy link
Copy Markdown
Contributor

comphead commented Jun 1, 2026

Also just curious since i dont have the background, why is the CI trigger manual?

Comet has a large ASF consumption, and we working to offload longrunning CI flows to the user forks. For now we trying to run less pipelines in parallel

Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karuppayya

One thing to mention is: Comet mostly works on planning phase to inject Comet nodes to be executed. AQE, DPP also.

However this approach proposes a totally new vision and make decisions in runtime? Spark stage is the runtime unit and might also depend on data distribution.

This is a lot of gray area here, for example if Spark scans a folder with 100 files which are unevenly distributed, leading spark tasks to be executed with different runtime and workload, possibly leading to overloaded single task to work more than others. Is it considered a heavy weight stage? what is the strategy to fallback? restart the task?

I just want to understand the approach better and see whats pros and cons it could bring up

@karuppayya
Copy link
Copy Markdown
Contributor Author

Thanks @comphead.

However this approach proposes a totally new vision and make decisions in runtime?

Yes and no. This is still a planning-phase rule (postColumnarTransitions). For non-AQE, it's staticsince the decision is made once during initial plan optimization. For AQE, it runs per-stage as each stage is planned at every shuffle boundary during runtime, but it's still a plan based decision.

Is it considered a heavy weight stage? what is the strategy to fallback?

Currently the heuristic is structural ie it is based on counts the number of C2R transitions in a stage's plan. It doesn't take into account data volume etc. Incorporating those kinds of statistics could be a natural next step toward a more cost-based decision.

@comphead
Copy link
Copy Markdown
Contributor

comphead commented Jun 2, 2026

Thanks @karuppayya for the explanation, having conversion counter guardrail on planner makes a lot of sense to me

_ >= 2,
"Must be >= 2. A reverted stage still requires at least one " +
"R2C at the columnar shuffle boundary.")
.createWithDefault(2)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably too aggressive and is likely to cause CI failure (because the plans will not match).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep the threshold at 2 eventually since more than 2 transitions means significant overhead. But for now I've bumped it to 5 for CI to pass. Once we've reviewed the logic , we can iterate on the default and fix test failures than came with it?.

case RowToColumnarExec(child) => child
}
val reverted = stripped.transformUp {
case cometShuffle: CometShuffleExchangeExec =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CometBroadcastExchange extends CometPlan (not CometExec) and will not be matched by any case in this pass.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AQE this is already handled — the broadcast exchange is wrapped in BroadcastQueryStageExec (which extends QueryStageExec), and countTransitions stops at QueryStageExec.
For non-AQE, I've added BroadcastExchangeLike to the stop condition in countTransitions to treat it as an execution boundary.(as broadcast runs as a separate job independentof the parent stage).

case cometExec: CometExec =>
if (cometExec.originalPlan.children.size == cometExec.children.size) {
cometExec.originalPlan.withNewChildren(cometExec.children)
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this get hit? We should probably never reach this in which case a warning in the log is called for.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a warnign

plan.transformUp { case exchange: ShuffleExchangeLike =>
revertStageIfNeeded(exchange.child, exchange.supportsColumnar)
.map(reverted => exchange.withNewChildren(Seq(reverted)))
.getOrElse(exchange)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no case _ => arm like in the AQE case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test that reverts and then executes the plan? Maybe two variants like - fallback triggered by unsupported expressions that are reverted, and a threshold set to 1 to force revert.
Test should verify results against Spark.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disabled the project operator to guarantee transitions Do you think we still need a test with a naturally fallback (unsupported expression may be UDF), or is the current approach
sufficient?

plan.transformUp {
case node if !node.isInstanceOf[QueryStageExec] && !node.supportsColumnar =>
val newChildren = node.children.map { child =>
if (child.supportsColumnar) ColumnarToRowExec(child) else child
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also hit a Row child and a Columnar parent (the reverse transition).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After revertToSpark, all CometExec nodes are replaced with their row-based operator. The only columnar nodes remaining would be QueryStageExec leaves (inputs from prior stages). I think we will have only columnar children feeding row-based parents. May be i am missing the scenario, an example would help.

if (newChildren != node.children) node.withNewChildren(newChildren) else node
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to remove CometTelemetryExec as well. It won't do any harm if it is there but it's pointless to collet Comet telemetry for Spark nodes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated?

Copy link
Copy Markdown
Contributor Author

@karuppayya karuppayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthchandra for the rview.
i have also cleaned up tets.

case RowToColumnarExec(child) => child
}
val reverted = stripped.transformUp {
case cometShuffle: CometShuffleExchangeExec =>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AQE this is already handled — the broadcast exchange is wrapped in BroadcastQueryStageExec (which extends QueryStageExec), and countTransitions stops at QueryStageExec.
For non-AQE, I've added BroadcastExchangeLike to the stop condition in countTransitions to treat it as an execution boundary.(as broadcast runs as a separate job independentof the parent stage).

plan.transformUp { case exchange: ShuffleExchangeLike =>
revertStageIfNeeded(exchange.child, exchange.supportsColumnar)
.map(reverted => exchange.withNewChildren(Seq(reverted)))
.getOrElse(exchange)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

case cometExec: CometExec =>
if (cometExec.originalPlan.children.size == cometExec.children.size) {
cometExec.originalPlan.withNewChildren(cometExec.children)
} else {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a warnign

plan.transformUp {
case node if !node.isInstanceOf[QueryStageExec] && !node.supportsColumnar =>
val newChildren = node.children.map { child =>
if (child.supportsColumnar) ColumnarToRowExec(child) else child
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After revertToSpark, all CometExec nodes are replaced with their row-based operator. The only columnar nodes remaining would be QueryStageExec leaves (inputs from prior stages). I think we will have only columnar children feeding row-based parents. May be i am missing the scenario, an example would help.

if (newChildren != node.children) node.withNewChildren(newChildren) else node
}
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated?

_ >= 2,
"Must be >= 2. A reverted stage still requires at least one " +
"R2C at the columnar shuffle boundary.")
.createWithDefault(2)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep the threshold at 2 eventually since more than 2 transitions means significant overhead. But for now I've bumped it to 5 for CI to pass. Once we've reviewed the logic , we can iterate on the default and fix test failures than came with it?.

}
}
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disabled the project operator to guarantee transitions Do you think we still need a test with a naturally fallback (unsupported expression may be UDF), or is the current approach
sufficient?

stagePlan: SparkPlan,
outputColumnar: Boolean): Option[SparkPlan] = {
val transitionCount = countTransitions(stagePlan)
if (transitionCount <= maxTransitions) return None
Copy link
Copy Markdown
Contributor

@comphead comphead Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make a user friendly response it would be better to use withFallbackReason instead of logging. Example CometExecRule.scala

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Revert transition-heavy stages to Spark row-based execution

4 participants