Skip to content

[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124

Closed
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56957-register-autocdc-flow-from-pipelineshandler
Closed

[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124
AnishMahto wants to merge 9 commits into
apache:masterfrom
AnishMahto:SPARK-56957-register-autocdc-flow-from-pipelineshandler

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

In the PipelinesHandler, register an AutoCdcFlow when a DefineFlow proto is received with AUTO_CDC_FLOW_DETAILS.

This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner.

Why are the changes needed?

Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test graph registration and construction through Python client in PythonPipelineSuite.

Was this patch authored or co-authored using generative AI tooling?

Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

@AnishMahto
Copy link
Copy Markdown
Contributor Author

@szehon-ho @gengliangwang

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a careful pass through this PR (focus on the SDP Connect wiring). Overall this is a focused, well-scoped change that follows existing UntypedFlow registration patterns — happy to see AutoCDC reach end-to-end through Connect. Core implementation is sound and test coverage at the registration/mapping layer is solid.

A few correctness, error-handling, and coverage items I'd like to see addressed before merge — all inline below. The most important is the source identifier handling (comment #1): the proto/Python API don't constrain it, but the current server-side wrapping silently mishandles multi-part names. My recommendation is to restrict and document, rather than partially-support.

Summary of comments:

  1. (PipelinesHandler) Restrict source to single-part and document it (also: update proto comment, Python docstring, add negative tests). Most important item.
  2. (PipelinesHandler) .sql-based column-name extraction is fragile for non-attribute expressions; extract from UnresolvedAttribute directly.
  3. (PipelinesHandler) Use AnalysisException + structured error class instead of IllegalArgumentException for the column_list / except_column_list conflict.
  4. (PipelinesHandler) Nit: drop fully-qualified scala.collection.immutable.Seq.
  5. (PipelinesHandler) Add a TODO listing the proto fields that aren't honored yet (apply_as_truncates, ignore_null_updates_*).
  6. (PythonPipelineSuite) Duplicate SCD-type coverage.
  7. (PythonPipelineSuite) Additional negative-/end-to-end tests.

Housekeeping (non-blocking):

  • PR title references SPARK-57080 but branch is SPARK-56957-.... Worth aligning.
  • The fork's linter check is currently failing — please check before merge.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Focused PR — this is the last piece of wiring for AutoCDC flows through Spark Connect (the proto fields, Python client, and server-side AutoCdcFlow model already exist; this connects them). Implementation follows the existing UntypedFlow registration shape.

@szehon-ho's prior review already covers the substantive items (single-part source enforcement, .sql-based column extraction, structured AnalysisException, un-honored proto fields, test gaps). I agree with those findings and won't restate them inline. One additional comment below on a documentation-accuracy point that overlaps his comment on the column_list / except_column_list throw and may be useful regardless of how that comment is resolved.

One meta note on the scala.collection.immutable.Seq(...) style nit (separate thread): I'd hold off on "just use Seq(...)". The file imports scala.collection.Seq at line 20, so plain Seq(...) here resolves to scala.collection.Seq and won't satisfy UnresolvedRelation.multipartIdentifier: scala.collection.immutable.Seq[String]. The FQN is deliberate.

Generated-by: Claude-Opus-4.7

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough response to the first round — all the substantive items (source identifier handling, column extraction, structured AnalysisException for the column-list conflict, TODOs for un-honored proto fields, end-to-end resolution test) are addressed cleanly. Approving on the strength of those.

Leaving a few inline follow-ups, all non-blocking — feel free to fold in here or in a follow-up:

  1. Duplicate SCD-type test could be replaced by a SCD_TYPE_2 negative test (the only currently-uncovered arm).
  2. The two single-part-source resolution tests overlap; either drop one or make one exercise pipeline-level defaults that differ from session defaults.
  3. buildAutoCdcFlow doesn't guard against an unset proto source / sequence_by (Python always populates, but a non-Python client wouldn't).
  4. (Soft / discussion only) SQLSTATE for AUTOCDC_NON_COLUMN_IDENTIFIER — happy to defer since the neighbor AUTOCDC_MULTIPART_COLUMN_IDENTIFIER uses the same code.

One housekeeping item that's worth confirming before merge but isn't a code review point: Run / Linters, licenses, and dependencies (Scala linter step) is still red on d517ddff. The fork's annotation API doesn't surface a line, so a local ./dev/lint-scala is probably the fastest path. Also please confirm SPARK-57092 / SPARK-57093 exist as real JIRAs (the TODO references).

Generated-by: Claude-Opus-4.7

Comment thread common/utils/src/main/resources/error/error-conditions.json Outdated
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if all comments from @szehon-ho are addressed

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reviewing after the two follow-up commits since approval (e51b3cb, ddf76ec):

  • hasSource / hasSequenceBy guards landed cleanly; the new error entries are well-scoped and the messages are actionable.
  • sqlState moves from 4270322023 for the AutoCDC identifier-shape errors are consistent with AUTOCDC_EMPTY_KEYS / AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA; the remaining 42613 on AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST is the standard "invalid options" code, so the inconsistency reads as deliberate.
  • The session-vs-pipeline-defaults test came back as a database-only test (catalog half no longer exercised) — see inline.

Approving. Two non-blocking nits inline; feel free to fold in or defer.

Housekeeping still open from prior rounds, please confirm before merge:

  • PR title [SPARK-57080] vs. branch SPARK-56957-... — please align.
  • TODOs reference SPARK-57092 / SPARK-57093 — please confirm the JIRAs exist.
  • The Linters, licenses, and dependencies job is in_progress on the current HEAD (was failing on d517ddff); please confirm green.

Generated-by: Claude-Opus-4.7

func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan),
sqlConf = flow.getSqlConfMap.asScala.toMap,
queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)),
origin = QueryOrigin(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QueryOrigin(...) block here is identical to the one in the RELATION_FLOW_DETAILS arm at line 394 — same filePath, line, objectType, objectName, language. Now that two flow-construction paths in this file share the exact same origin shape, a small private helper would keep the third path (whenever one is added) from drifting:

private def flowOrigin(
    flow: proto.PipelineCommand.DefineFlow,
    flowIdentifier: TableIdentifier): QueryOrigin =
  QueryOrigin(
    filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
      flow.getSourceCodeLocation.getFileName),
    line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
      flow.getSourceCodeLocation.getLineNumber),
    objectType = Some(QueryOriginType.Flow.toString),
    objectName = Option(flowIdentifier.unquotedString),
    language = Some(Python()))

Non-blocking; feel free to fold in or defer.

Generated-by: Claude-Opus-4.7

assert(resolvedFlow.inputs == Set(graphIdentifier("src")))
}

test("AutoCDC API: single-part `source` inherits the pipeline's default database") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the rename in ddf76ec98ad, this test exercises only the database half of pipeline-default inheritance: defaultCatalog = Some("spark_catalog") is now equal to the session's default catalog (also spark_catalog), so the catalog component of the asserted identifier would resolve the same way even without the defaultCatalog parameter being plumbed through buildAutoCdcFlow. A regression that severed pipeline-catalog propagation would not be caught here.

Two options:
(a) acknowledge in the inline comment that only database inheritance is exercised (and rely on multi-part \source` resolves...` below for catalog routing), or
(b) register a fresh catalog up front, e.g.

spark.conf.set("spark.sql.catalog.my_catalog", classOf[InMemoryCatalog].getName)

then revert defaultCatalog to Some("my_catalog") so the assertion meaningfully exercises catalog inheritance.

Non-blocking.

Generated-by: Claude-Opus-4.7

cloud-fan added 3 commits May 27, 2026 11:44
…inheritance

- Factor the duplicated QueryOrigin block in PipelinesHandler into a private
  flowOrigin helper shared by the RELATION_FLOW_DETAILS and AUTO_CDC_FLOW_DETAILS
  paths.
- Rewrite the pipeline-default inheritance test to exercise both catalog and
  database halves: register a V2 in-memory `my_catalog` via sparkConf, plumb a
  setupSql parameter through buildGraph so the test creates the `my_db`
  namespace on the same Connect session that subsequently creates the dataflow
  graph, and assert against `my_catalog.my_db.{src,target}` (neither of which
  is the session default).
Move the stripMargin `|` marker from each generated statement back into the
template, so we only need one mkString("\n") instead of mapping each statement
to "|" + _.
The setupSqlLine/blank line added to the python template in
buildGraph shifts every subsequent generated line by +2, so update
the source-location assertions in PythonPipelineSuite accordingly.
Also apply scalafmt to PipelinesHandler.
@AnishMahto
Copy link
Copy Markdown
Contributor Author

The pyspark-pandas CI failure should be unrelated, these changes do not touch that module.

@gengliangwang
Copy link
Copy Markdown
Member

Thanks, merging to master/4.x/4.2

gengliangwang pushed a commit that referenced this pull request May 27, 2026
### What changes were proposed in this pull request?
In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow` proto is received with `AUTO_CDC_FLOW_DETAILS`.

This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner.

### Why are the changes needed?
Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Test graph registration and construction through Python client in `PythonPipelineSuite`.

### Was this patch authored or co-authored using generative AI tooling?
Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Closes #56124 from AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler.

Lead-authored-by: AnishMahto <anish.mahto99@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit b848baa)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
gengliangwang pushed a commit that referenced this pull request May 27, 2026
### What changes were proposed in this pull request?
In the `PipelinesHandler`, register an `AutoCdcFlow` when a `DefineFlow` proto is received with `AUTO_CDC_FLOW_DETAILS`.

This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner.

### Why are the changes needed?
Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Test graph registration and construction through Python client in `PythonPipelineSuite`.

### Was this patch authored or co-authored using generative AI tooling?
Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Closes #56124 from AnishMahto/SPARK-56957-register-autocdc-flow-from-pipelineshandler.

Lead-authored-by: AnishMahto <anish.mahto99@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit b848baa)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants