[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124
[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124AnishMahto wants to merge 9 commits into
PipelinesHandler#56124Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
Took a careful pass through this PR (focus on the SDP Connect wiring). Overall this is a focused, well-scoped change that follows existing UntypedFlow registration patterns — happy to see AutoCDC reach end-to-end through Connect. Core implementation is sound and test coverage at the registration/mapping layer is solid.
A few correctness, error-handling, and coverage items I'd like to see addressed before merge — all inline below. The most important is the source identifier handling (comment #1): the proto/Python API don't constrain it, but the current server-side wrapping silently mishandles multi-part names. My recommendation is to restrict and document, rather than partially-support.
Summary of comments:
- (PipelinesHandler) Restrict
sourceto single-part and document it (also: update proto comment, Python docstring, add negative tests). Most important item. - (PipelinesHandler)
.sql-based column-name extraction is fragile for non-attribute expressions; extract fromUnresolvedAttributedirectly. - (PipelinesHandler) Use
AnalysisException+ structured error class instead ofIllegalArgumentExceptionfor the column_list / except_column_list conflict. - (PipelinesHandler) Nit: drop fully-qualified
scala.collection.immutable.Seq. - (PipelinesHandler) Add a TODO listing the proto fields that aren't honored yet (
apply_as_truncates,ignore_null_updates_*). - (PythonPipelineSuite) Duplicate SCD-type coverage.
- (PythonPipelineSuite) Additional negative-/end-to-end tests.
Housekeeping (non-blocking):
- PR title references SPARK-57080 but branch is
SPARK-56957-.... Worth aligning. - The fork's linter check is currently failing — please check before merge.
Generated-by: Claude-Opus-4.7-thinking-xhigh
gengliangwang
left a comment
There was a problem hiding this comment.
Focused PR — this is the last piece of wiring for AutoCDC flows through Spark Connect (the proto fields, Python client, and server-side AutoCdcFlow model already exist; this connects them). Implementation follows the existing UntypedFlow registration shape.
@szehon-ho's prior review already covers the substantive items (single-part source enforcement, .sql-based column extraction, structured AnalysisException, un-honored proto fields, test gaps). I agree with those findings and won't restate them inline. One additional comment below on a documentation-accuracy point that overlaps his comment on the column_list / except_column_list throw and may be useful regardless of how that comment is resolved.
One meta note on the scala.collection.immutable.Seq(...) style nit (separate thread): I'd hold off on "just use Seq(...)". The file imports scala.collection.Seq at line 20, so plain Seq(...) here resolves to scala.collection.Seq and won't satisfy UnresolvedRelation.multipartIdentifier: scala.collection.immutable.Seq[String]. The FQN is deliberate.
Generated-by: Claude-Opus-4.7
szehon-ho
left a comment
There was a problem hiding this comment.
Thanks for the thorough response to the first round — all the substantive items (source identifier handling, column extraction, structured AnalysisException for the column-list conflict, TODOs for un-honored proto fields, end-to-end resolution test) are addressed cleanly. Approving on the strength of those.
Leaving a few inline follow-ups, all non-blocking — feel free to fold in here or in a follow-up:
- Duplicate SCD-type test could be replaced by a
SCD_TYPE_2negative test (the only currently-uncovered arm). - The two single-part-
sourceresolution tests overlap; either drop one or make one exercise pipeline-level defaults that differ from session defaults. buildAutoCdcFlowdoesn't guard against an unset protosource/sequence_by(Python always populates, but a non-Python client wouldn't).- (Soft / discussion only) SQLSTATE for
AUTOCDC_NON_COLUMN_IDENTIFIER— happy to defer since the neighborAUTOCDC_MULTIPART_COLUMN_IDENTIFIERuses the same code.
One housekeeping item that's worth confirming before merge but isn't a code review point: Run / Linters, licenses, and dependencies (Scala linter step) is still red on d517ddff. The fork's annotation API doesn't surface a line, so a local ./dev/lint-scala is probably the fastest path. Also please confirm SPARK-57092 / SPARK-57093 exist as real JIRAs (the TODO references).
Generated-by: Claude-Opus-4.7
cloud-fan
left a comment
There was a problem hiding this comment.
LGTM if all comments from @szehon-ho are addressed
cloud-fan
left a comment
There was a problem hiding this comment.
Re-reviewing after the two follow-up commits since approval (e51b3cb, ddf76ec):
hasSource/hasSequenceByguards landed cleanly; the new error entries are well-scoped and the messages are actionable.- sqlState moves from
42703→22023for the AutoCDC identifier-shape errors are consistent withAUTOCDC_EMPTY_KEYS/AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA; the remaining42613onAUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LISTis the standard "invalid options" code, so the inconsistency reads as deliberate. - The session-vs-pipeline-defaults test came back as a database-only test (catalog half no longer exercised) — see inline.
Approving. Two non-blocking nits inline; feel free to fold in or defer.
Housekeeping still open from prior rounds, please confirm before merge:
- PR title
[SPARK-57080]vs. branchSPARK-56957-...— please align. - TODOs reference
SPARK-57092/SPARK-57093— please confirm the JIRAs exist. - The
Linters, licenses, and dependenciesjob is in_progress on the current HEAD (was failing ond517ddff); please confirm green.
Generated-by: Claude-Opus-4.7
| func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan), | ||
| sqlConf = flow.getSqlConfMap.asScala.toMap, | ||
| queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), | ||
| origin = QueryOrigin( |
There was a problem hiding this comment.
The QueryOrigin(...) block here is identical to the one in the RELATION_FLOW_DETAILS arm at line 394 — same filePath, line, objectType, objectName, language. Now that two flow-construction paths in this file share the exact same origin shape, a small private helper would keep the third path (whenever one is added) from drifting:
private def flowOrigin(
flow: proto.PipelineCommand.DefineFlow,
flowIdentifier: TableIdentifier): QueryOrigin =
QueryOrigin(
filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
flow.getSourceCodeLocation.getFileName),
line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
flow.getSourceCodeLocation.getLineNumber),
objectType = Some(QueryOriginType.Flow.toString),
objectName = Option(flowIdentifier.unquotedString),
language = Some(Python()))Non-blocking; feel free to fold in or defer.
Generated-by: Claude-Opus-4.7
| assert(resolvedFlow.inputs == Set(graphIdentifier("src"))) | ||
| } | ||
|
|
||
| test("AutoCDC API: single-part `source` inherits the pipeline's default database") { |
There was a problem hiding this comment.
After the rename in ddf76ec98ad, this test exercises only the database half of pipeline-default inheritance: defaultCatalog = Some("spark_catalog") is now equal to the session's default catalog (also spark_catalog), so the catalog component of the asserted identifier would resolve the same way even without the defaultCatalog parameter being plumbed through buildAutoCdcFlow. A regression that severed pipeline-catalog propagation would not be caught here.
Two options:
(a) acknowledge in the inline comment that only database inheritance is exercised (and rely on multi-part \source` resolves...` below for catalog routing), or
(b) register a fresh catalog up front, e.g.
spark.conf.set("spark.sql.catalog.my_catalog", classOf[InMemoryCatalog].getName)then revert defaultCatalog to Some("my_catalog") so the assertion meaningfully exercises catalog inheritance.
Non-blocking.
Generated-by: Claude-Opus-4.7
…inheritance
- Factor the duplicated QueryOrigin block in PipelinesHandler into a private
flowOrigin helper shared by the RELATION_FLOW_DETAILS and AUTO_CDC_FLOW_DETAILS
paths.
- Rewrite the pipeline-default inheritance test to exercise both catalog and
database halves: register a V2 in-memory `my_catalog` via sparkConf, plumb a
setupSql parameter through buildGraph so the test creates the `my_db`
namespace on the same Connect session that subsequently creates the dataflow
graph, and assert against `my_catalog.my_db.{src,target}` (neither of which
is the session default).
Move the stripMargin `|` marker from each generated statement back into the
template, so we only need one mkString("\n") instead of mapping each statement
to "|" + _.
The setupSqlLine/blank line added to the python template in buildGraph shifts every subsequent generated line by +2, so update the source-location assertions in PythonPipelineSuite accordingly. Also apply scalafmt to PipelinesHandler.
|
The |
|
Thanks, merging to master/4.x/4.2 |
### What changes were proposed in this pull request? In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow` proto is received with `AUTO_CDC_FLOW_DETAILS`. This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner. ### Why are the changes needed? Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test graph registration and construction through Python client in `PythonPipelineSuite`. ### Was this patch authored or co-authored using generative AI tooling? Co-authored. Generated-by: Claude-Opus-4.7-thinking-xhigh Closes #56124 from AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit b848baa) Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow` proto is received with `AUTO_CDC_FLOW_DETAILS`. This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner. ### Why are the changes needed? Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test graph registration and construction through Python client in `PythonPipelineSuite`. ### Was this patch authored or co-authored using generative AI tooling? Co-authored. Generated-by: Claude-Opus-4.7-thinking-xhigh Closes #56124 from AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit b848baa) Signed-off-by: Gengliang Wang <gengliang@apache.org>
What changes were proposed in this pull request?
In the
PipelinesHandler, register anAutoCdcFlowwhen aDefineFlowproto is received withAUTO_CDC_FLOW_DETAILS.This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner.
Why are the changes needed?
Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Test graph registration and construction through Python client in
PythonPipelineSuite.Was this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh