Skip to content

[GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo) translator#38764

Open
junaiddshaukat wants to merge 1 commit into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-executable-stage
Open

[GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo) translator#38764
junaiddshaukat wants to merge 1 commit into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-executable-stage

Conversation

@junaiddshaukat
Copy link
Copy Markdown
Contributor

@junaiddshaukat junaiddshaukat commented Jun 1, 2026

Summary

Third sub-issue under the Kafka Streams runner GSoC 2026 project. Adds the ExecutableStage translator and SDK-harness bridge so fused stateless user code (ParDo etc.) actually runs in the SDK harness over the Fn API and its outputs flow back into the topology.

Per design doc §4.2 and the live discussion with @je-ik on the issue.

What's in this PR

  • KafkaStreamsExecutableStageContextFactory mirroring Flink's pattern.
  • ExecutableStageProcessor (the harness bridge) + ExecutableStageTranslator.
  • URN dispatch registers ExecutableStage.URN; prepareForTranslation now runs GreedyPipelineFuser.
  • KStreamsPayload.toString uses MoreObjects.toStringHelper (post-merge tweak from [GSoC 2026] Kafka Streams runner — translation framework + Impulse translator #38689).
  • Test deps: testImplementation project(':sdks:java:harness') for the EMBEDDED environment.

Tests

  • ExecutableStageTranslatorTest builds Impulse -> ParDo via the Beam
    Java SDK, drives the resulting topology under TopologyTestDriver
    with the EMBEDDED environment, and asserts via side effect that the
    DoFn ran in the SDK harness with the expected input. Approach
    discussed with @je-ik on [GSoC 2026] Kafka Streams Runner — ExecutableStage (stateless ParDo) translator #38743 — because the ParDo's output has no
    downstream consumer, the stage has no output PCollection and the
    harness does not deliver the value back to the runner (per Beam
    semantics), so the bridge is verified by a recorded side effect.
  • New SharedTestCollector<T> helper: instances are Serializable
    but their identity is a UUID; the actual storage lives in a static
    registry keyed by UUID. Survives the runner cloning the DoFn
    (current or future EMBEDDED behaviour).
  • KafkaStreamsPipelineTranslatorTest updated so the Impulse case
    builds via the Beam SDK (validator-compliant proto for the fuser)
    and the URN-rejection case calls translate directly to keep the
    dispatch-loop contract isolated.

Validation

  • ./gradlew :runners:kafka-streams:check green locally (14 tests).
  • No @SuppressWarnings of any flavor in the new code.

Notes / deferred

  • The watermark-driven bundle flush in ExecutableStageProcessor is
    provisional. When the WatermarkManager lands, the output watermark
    will be min() across received watermarks and the flush should fire
    only when that minimum moves forward, not on every received
    watermark — comment in the processor flags this.
  • KStreamsPayload Coder + Kafka Serde for topic-boundary
    serialization is still deferred to the first sub-issue that
    introduces a topic boundary (GBK / repartition).
  • ExecutableStage with state, timers, side inputs is out of scope.

Closes #38743
Refs #18479
cc @je-ik

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request implements the third sub-issue of the Kafka Streams runner GSoC 2026 project, focusing on the execution of stateless user code. By adding the ExecutableStage translator and the necessary bridge to the SDK harness, the runner can now execute fused ParDo operations using the Fn API. This change includes infrastructure for managing harness contexts and robust testing utilities to verify execution within the Kafka Streams topology.

Highlights

  • ExecutableStage Support: Introduced the ExecutableStage translator and processor to enable running fused stateless user code (e.g., ParDo) in the SDK harness via the Fn API.
  • Pipeline Fusion: Updated the pipeline translation process to include the GreedyPipelineFuser, ensuring stateless code is correctly grouped into ExecutableStage nodes.
  • SDK Harness Integration: Added KafkaStreamsExecutableStageContextFactory to manage SDK harness environments and implemented a bridge to handle data flow between Kafka Streams and the Fn API.
  • Testing Infrastructure: Added SharedTestCollector for reliable side-effect verification in tests and implemented ExecutableStageTranslatorTest to validate end-to-end execution.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the translation and execution of fused executable stages (beam:runner:executable_stage:v1) for the Kafka Streams runner, allowing stateless user code to run in the Beam SDK harness. The feedback highlights two critical issues: a concurrency bug in ExecutableStageProcessor due to the use of a non-thread-safe ArrayDeque for queueing outputs across threads, which should be replaced with ConcurrentLinkedQueue, and a potential memory leak in KafkaStreamsExecutableStageContextFactory where job factories are not cleaned up from the map when their reference count drops to zero.

*/
package org.apache.beam.runners.kafka.streams.translation;

import java.util.ArrayDeque;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since the queue is accessed concurrently by SDK harness threads (which enqueue outputs) and the Kafka Streams processing thread (which flushes them), we should use a thread-safe queue like ConcurrentLinkedQueue instead of ArrayDeque to avoid race conditions and data corruption.

Suggested change
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;

private final RunnerApi.ExecutableStagePayload stagePayload;
private final JobInfo jobInfo;

private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instantiate pendingOutputs as a ConcurrentLinkedQueue to ensure thread-safe operations when elements are added from harness threads and polled from the processing thread.

Suggested change
private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>();
private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>();

Comment on lines +57 to +59
// Release the context synchronously once its reference count drops to zero;
// the runner does not keep contexts alive across stages beyond their use.
(caller) -> true));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The jobFactories map is a static ConcurrentHashMap that stores ExecutableStageContext.Factory instances per job. If we do not remove the factory from the map when the reference count drops to zero, it will cause a memory leak of the factory and its associated resources for every job run in the JVM. We should remove the job ID from jobFactories in the releaser callback.

Suggested change
// Release the context synchronously once its reference count drops to zero;
// the runner does not keep contexts alive across stages beyond their use.
(caller) -> true));
(caller) -> {
jobFactories.remove(jobInfo.jobId());
return true;
}));

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for executing fused ExecutableStage nodes (stateless user code) in the Beam SDK harness over the Fn API for the Kafka Streams runner. It adds the ExecutableStageProcessor to run the fused code, the ExecutableStageTranslator to handle the translation, and a context factory to manage shared SDK harness environments. Feedback on the changes highlights a concurrency issue in ExecutableStageProcessor where a non-thread-safe ArrayDeque is used for pending outputs, which should be replaced with ConcurrentLinkedQueue. Additionally, the translator should explicitly reject unsupported features like side inputs, user states, and timers to fail fast with descriptive errors.

private final RunnerApi.ExecutableStagePayload stagePayload;
private final JobInfo jobInfo;

private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

pendingOutputs is accessed concurrently: it is populated by SDK harness threads inside the FnDataReceiver callback and drained by the Kafka Streams processing thread. Since ArrayDeque is not thread-safe and lacks memory visibility guarantees across threads, this can lead to race conditions or data corruption.

Using ConcurrentLinkedQueue provides a thread-safe, lock-free queue that is ideal for this producer-consumer pattern.

Suggested change
private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>();
private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>();

*/
package org.apache.beam.runners.kafka.streams.translation;

import java.util.ArrayDeque;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Replace the ArrayDeque import with ConcurrentLinkedQueue to support the thread-safe queue implementation for pendingOutputs.

Suggested change
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;

Comment on lines +53 to +54
String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of using Iterables.getOnlyElement(transform.getInputsMap().values()), which will fail with a generic IllegalArgumentException if there are side inputs, we can directly retrieve the main input PCollection ID from stagePayload.getInput().

Additionally, we should explicitly check and reject side inputs, user states, and timers using stagePayload to fail fast with a clear, descriptive error message.

    if (stagePayload.getSideInputsCount() > 0) {
      throw new UnsupportedOperationException(
          "ExecutableStage "
              + transformId
              + " has side inputs; side inputs are not yet supported by the Kafka Streams runner.");
    }
    if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) {
      throw new UnsupportedOperationException(
          "ExecutableStage "
              + transformId
              + " has user states or timers; these are not yet supported by the Kafka Streams runner.");
    }

    String inputPCollectionId = stagePayload.getInput();
    String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Assigning reviewers:

R: @chamikaramj added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@je-ik je-ik self-requested a review June 2, 2026 06:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant