Skip to content

feat: enable chat events on coordinator WebSocket#237

Open
aliev wants to merge 8 commits intomainfrom
feat/chat-events-on-coordinator-ws
Open

feat: enable chat events on coordinator WebSocket#237
aliev wants to merge 8 commits intomainfrom
feat/chat-events-on-coordinator-ws

Conversation

@aliev
Copy link
Copy Markdown
Member

@aliev aliev commented Apr 13, 2026

Why

The coordinator WebSocket is connected and receiving events, but there's no public way to register listeners on it from outside ConnectionManager. This blocks use cases like forwarding call events (turn detection, custom events) to external systems.

Changes

  • Expose coordinator_ws property on ConnectionManager for event listener registration

Usage

Receiving events:

ws = connection_manager.coordinator_ws

@ws.on("custom")
def on_custom(event):
    print(event["custom"])

ws.on_wildcard("call.**", lambda event_type, event: print(event_type))

Sending custom events:

await call.send_call_event(
    user_id="agent",
    custom={"type": "status_update", "status": "processing"},
)

Verified event types:

  • custom -- user-defined events (turn detection, agent heartbeat, etc.)
  • call.session_participant_count_updated -- participant count changes
  • health.check -- keep-alive pings

Future work

  • The Python SDK docs currently don't cover event subscription (only JS/Flutter/Kotlin/Swift do). Adding a proper event subscription API (lazy channel watching, similar to JS SDK's channel.watch() + channel.on()) is a future task once we determine the best approach for the Python SDK.

Summary by CodeRabbit

  • New Features

    • Coordinator WebSocket access is exposed after connecting, enabling real-time coordinator events.
  • Documentation

    • Added guidance on subscribing to coordinator WebSocket events and when the interface becomes available.
  • Tests

    • Added an integration test validating end-to-end custom event publish/subscribe via the coordinator WebSocket.

The coordinator WS now subscribes to both video and chat products,
allowing chat events (message.new, reactions, typing, etc.) to arrive
on the same connection as video call events. No second WS needed.

- Add "chat" to products in StreamAPIWS auth payload
- Add watch_channels() in connection_utils (same pattern as watch_call)
- ConnectionManager subscribes to messaging channel after watch_call
- Expose coordinator_ws property for event listener registration
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 13, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8bd430d5-b1c0-495a-82a9-b80a41d48cd9

📥 Commits

Reviewing files that changed from the base of the PR and between c68730c and b151b75.

📒 Files selected for processing (1)
  • tests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/rtc/coordinator/test_custom_events.py

📝 Walkthrough

Walkthrough

Added a read-only coordinator_ws property to ConnectionManager that returns the internal _coordinator_ws_client only when it exists and is connected; otherwise it returns None. Added an integration test that verifies custom event publish/subscribe via the RTC coordinator WebSocket.

Changes

Cohort / File(s) Summary
ConnectionManager property
getstream/video/rtc/connection_manager.py
Added coordinator_ws read-only @property returning the internal _coordinator_ws_client when present and _connected is truthy; includes docstring describing purpose (call-scoped events), subscription methods, and availability (post-connect).
RTC coordinator integration test
tests/rtc/coordinator/test_custom_events.py
New integration test and fixture that upserts two temporary users, creates/joins a call, subscribes to connection.coordinator_ws for "custom" events, sends a REST call.send_call_event, and asserts the custom event round-trips to the subscriber.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I twitch my nose, a tiny cheer,
A websocket door now crystal clear,
Custom hops arrive — I catch the beat,
Payloads and types — a joyful feat,
I nibble logs and dance, so sweet.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title 'feat: enable chat events on coordinator WebSocket' is misleading. The changes actually expose a coordinator_ws property for custom events (including custom event types and call events), not specifically 'chat events.' The test verifies custom event pub/sub, not chat functionality. Revise the title to accurately reflect the changes, such as 'feat: expose coordinator WebSocket property for event listener registration' or 'feat: add coordinator_ws property to ConnectionManager for custom events.'
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/chat-events-on-coordinator-ws

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Chat event subscription needs discussion with the team -- JS SDK uses
two separate WS connections for chat and video, and mixing them on one
connection may affect MAU billing. Keep coordinator_ws property for
custom events which already work via watch_call.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@getstream/video/rtc/connection_manager.py`:
- Around line 623-632: The property coordinator_ws should honor "Returns None if
not connected" by returning the websocket client only when it is actually
connected; update coordinator_ws to check _coordinator_ws_client's connection
state (e.g., _coordinator_ws_client.connected or
_coordinator_ws_client.is_connected) and/or ensure the coordinator helper/task
(e.g., _coordinator_task) is still running, and return None otherwise; use
getattr checks to avoid attribute errors so callers never receive a
non-connected client from coordinator_ws.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 66b96399-2289-4c8e-be81-8a56b62ec916

📥 Commits

Reviewing files that changed from the base of the PR and between 7c4c123 and 810998f.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py

Comment thread getstream/video/rtc/connection_manager.py Outdated
Return Optional[StreamAPIWS] and check _connected before returning,
so callers get None instead of a disconnected client.
…r_ws

Verifies the full pub/sub cycle: send a custom event via REST
(call.send_call_event), receive it on ConnectionManager.coordinator_ws
through ws.on("custom") listener. Uses asyncio.Event for reliable
timing instead of fixed sleep.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/rtc/coordinator/test_custom_events.py (1)

36-69: Group integration tests in a test class for consistency.

This test is currently top-level; move it under a test class to match repository test-organization rules.

Suggested structure
-@pytest.mark.asyncio
-@pytest.mark.integration
-@skip_on_rate_limit
-async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str):
-    """Send a custom event via REST and verify it arrives on coordinator_ws."""
-    call = async_client.video.call("default", str(uuid.uuid4()))
-    await call.get_or_create(data=CallRequest(created_by_id=test_user))
-
-    async with await rtc.join(call, test_user) as connection:
-        assert connection.connection_state == ConnectionState.JOINED
-
-        ws = connection.coordinator_ws
-        assert ws is not None
-
-        received_event = None
-        event_received = asyncio.Event()
-
-        `@ws.on`("custom")
-        def on_custom(event):
-            nonlocal received_event
-            received_event = event
-            event_received.set()
-
-        await call.send_call_event(
-            user_id=test_user,
-            custom={"type": "test_event", "payload": "hello from test"},
-        )
-
-        await asyncio.wait_for(event_received.wait(), timeout=10.0)
-
-        assert received_event is not None
-        custom_data = received_event.get("custom", {})
-        assert custom_data.get("type") == "test_event"
-        assert custom_data.get("payload") == "hello from test"
+class TestCoordinatorCustomEvents:
+    `@pytest.mark.asyncio`
+    `@pytest.mark.integration`
+    `@skip_on_rate_limit`
+    async def test_custom_event_round_trip(self, async_client: AsyncStream, test_user: str):
+        """Send a custom event via REST and verify it arrives on coordinator_ws."""
+        call = async_client.video.call("default", str(uuid.uuid4()))
+        await call.get_or_create(data=CallRequest(created_by_id=test_user))
+
+        async with await rtc.join(call, test_user) as connection:
+            assert connection.connection_state == ConnectionState.JOINED
+
+            ws = connection.coordinator_ws
+            assert ws is not None
+
+            received_event = None
+            event_received = asyncio.Event()
+
+            `@ws.on`("custom")
+            def on_custom(event):
+                nonlocal received_event
+                received_event = event
+                event_received.set()
+
+            await call.send_call_event(
+                user_id=test_user,
+                custom={"type": "test_event", "payload": "hello from test"},
+            )
+
+            await asyncio.wait_for(event_received.wait(), timeout=10.0)
+
+            assert received_event is not None
+            custom_data = received_event.get("custom", {})
+            assert custom_data.get("type") == "test_event"
+            assert custom_data.get("payload") == "hello from test"

As per coding guidelines, "**/test_*.py: Keep tests well organized and use test classes to group similar tests."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/rtc/coordinator/test_custom_events.py` around lines 36 - 69, Move the
standalone test_custom_event_round_trip coroutine into a test class (e.g., class
TestCustomEvents:) to group integration tests; keep all existing decorators
(`@pytest.mark.asyncio`, `@pytest.mark.integration`, `@skip_on_rate_limit`) on the
method and retain the signature (async_client: AsyncStream, test_user: str) and
body including the call variable, rtc.join usage, ws.on("custom") handler, and
assertions; ensure the new method name remains test_custom_event_round_trip and
update any imports or fixtures if needed so pytest discovers the test class and
method.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 28-33: The teardown currently swallows all errors in the
try/except around async_client.delete_users (the block calling await
async_client.delete_users(user_ids=[user_id], user="hard", conversations="hard",
messages="hard")), which hides failures; change the except to either log the
exception (e.g., with logger.exception or print) or re-raise/assert so CI fails
— locate the try/except around delete_users in
tests/rtc/coordinator/test_custom_events.py and replace the bare "except
Exception: pass" with an exception handler that records the error details and/or
raises the exception to surface teardown failures.

---

Nitpick comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 36-69: Move the standalone test_custom_event_round_trip coroutine
into a test class (e.g., class TestCustomEvents:) to group integration tests;
keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`,
`@skip_on_rate_limit`) on the method and retain the signature (async_client:
AsyncStream, test_user: str) and body including the call variable, rtc.join
usage, ws.on("custom") handler, and assertions; ensure the new method name
remains test_custom_event_round_trip and update any imports or fixtures if
needed so pytest discovers the test class and method.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 47b0d709-cb15-49fa-8573-c6965a0645f9

📥 Commits

Reviewing files that changed from the base of the PR and between 810998f and c68730c.

📒 Files selected for processing (2)
  • getstream/video/rtc/connection_manager.py
  • tests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • getstream/video/rtc/connection_manager.py

Comment thread tests/rtc/coordinator/test_custom_events.py Outdated
aliev added 2 commits April 16, 2026 15:16
Sender creates the call and sends the event via REST, receiver joins
via rtc.join and verifies the event arrives on coordinator_ws.
Confirms cross-participant event delivery.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants