-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
batch/debounce websocket set_props #3783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,9 +64,16 @@ export class MessageRouter { | |
|
|
||
| /** | ||
| * Handle a message from the WebSocket server. | ||
| * @param message The message from the server | ||
| * Messages may be batched as arrays for efficiency. | ||
| * @param message The message from the server (single message or array) | ||
| */ | ||
| public handleServerMessage(message: unknown): void { | ||
| // Handle batched messages (array of messages) | ||
| if (Array.isArray(message)) { | ||
| this.handleBatchedMessages(message as WorkerMessage[]); | ||
| return; | ||
| } | ||
|
|
||
| const msg = message as WorkerMessage; | ||
| const rendererId = msg.rendererId; | ||
|
|
||
|
|
@@ -92,6 +99,65 @@ export class MessageRouter { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle a batch of messages from the server. | ||
| * Groups set_props by renderer and forwards as a single batch message. | ||
| * | ||
| * Note: set_props messages are processed before other message types. | ||
| * This is intentional - batching groups all set_props for efficiency, | ||
| * and the relative order within set_props is preserved. | ||
| * | ||
| * @param messages Array of messages (may include server-only types like heartbeat_ack) | ||
| */ | ||
| private handleBatchedMessages(messages: WorkerMessage[]): void { | ||
| // Group set_props by renderer, keep others separate | ||
| const setPropsPayloadsByRenderer: Map<string, SetPropsMessage['payload'][]> = new Map(); | ||
| const otherMessages: WorkerMessage[] = []; | ||
|
|
||
| for (const msg of messages) { | ||
| // Skip heartbeat_ack - server-only type, already handled by WebSocketManager | ||
| if (msg.type === WorkerMessageType.HEARTBEAT_ACK) { | ||
| continue; | ||
| } | ||
| if (msg.type === WorkerMessageType.SET_PROPS) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you use |
||
| const setPropsMsg = msg as SetPropsMessage; | ||
| const rendererId = setPropsMsg.rendererId; | ||
| const existing = setPropsPayloadsByRenderer.get(rendererId); | ||
| if (existing) { | ||
| existing.push(setPropsMsg.payload); | ||
| } else { | ||
| setPropsPayloadsByRenderer.set(rendererId, [setPropsMsg.payload]); | ||
| } | ||
| } else { | ||
| otherMessages.push(msg); | ||
| } | ||
| } | ||
|
|
||
| // Forward batched set_props to each renderer | ||
| for (const [rendererId, payloads] of setPropsPayloadsByRenderer) { | ||
| const port = this.renderers.get(rendererId); | ||
| if (!port) { | ||
| console.warn(`Renderer ${rendererId} not found for batch, skipping`); | ||
| continue; | ||
| } | ||
| try { | ||
| port.postMessage({ | ||
| type: WorkerMessageType.SET_PROPS_BATCH, | ||
| rendererId, | ||
| payload: payloads | ||
| }); | ||
| } catch (error) { | ||
| console.warn(`Failed to forward batch to renderer ${rendererId}, removing`); | ||
| this.renderers.delete(rendererId); | ||
| } | ||
| } | ||
|
|
||
| // Forward other messages individually | ||
| for (const msg of otherMessages) { | ||
| this.handleServerMessage(msg); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Send a message to all connected renderers. | ||
| * @param message The message to broadcast | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
|
|
||
| SHUTDOWN_SIGNAL = "__shutdown__" | ||
| DISCONNECTED = "__disconnected__" | ||
| FLUSH_SIGNAL = "__flush__" | ||
|
|
||
|
|
||
| class DashWebsocketCallback: | ||
|
|
@@ -184,29 +185,71 @@ def create_ws_context( | |
|
|
||
|
|
||
| async def run_ws_sender( | ||
| send_text: Callable[[str], Any], outbound_queue: janus.Queue[str] | ||
| send_text: Callable[[str], Any], | ||
| outbound_queue: janus.Queue[str], | ||
| batch_delay: float = 0.005, | ||
| ) -> None: | ||
| """Sender coroutine - drains queue and sends to WebSocket. | ||
|
|
||
| This coroutine runs in the main event loop and handles sending | ||
| messages that are queued by worker threads via janus.Queue. | ||
|
|
||
| Messages are pre-serialized strings (using to_json). | ||
| Messages are pre-serialized strings (using to_json). For efficiency, | ||
| this function batches messages that arrive within batch_delay of each | ||
| other, sending them as a JSON array. When no message arrives within | ||
| the window, all collected messages are sent immediately. | ||
|
|
||
| Args: | ||
| send_text: Async function to send text data over WebSocket | ||
| outbound_queue: janus.Queue instance for receiving messages (strings) | ||
| batch_delay: Time in seconds to wait for additional messages (default: 5ms). | ||
| Set to 0 to disable batching and send messages immediately. | ||
| """ | ||
| q = outbound_queue.async_q | ||
| messages: list[str] = [] | ||
| try: | ||
| while True: | ||
| msg = await outbound_queue.async_q.get() | ||
| if msg == SHUTDOWN_SIGNAL: | ||
| break | ||
| await send_text(msg) | ||
| # Wait indefinitely for first message, then use timeout for batching | ||
| timeout = batch_delay if messages else None | ||
| try: | ||
| msg = await asyncio.wait_for(q.get(), timeout=timeout) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be more appropriate to get a message and send it immediately with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it saves one iteration 👍 |
||
| if msg == SHUTDOWN_SIGNAL: | ||
| if messages: | ||
| await _send_batched(send_text, messages) | ||
| return | ||
| if msg == FLUSH_SIGNAL: | ||
| if messages: | ||
| await _send_batched(send_text, messages) | ||
| messages = [] | ||
| continue | ||
| if not batch_delay: | ||
| await send_text(msg) | ||
| else: | ||
| messages.append(msg) | ||
| except asyncio.TimeoutError: | ||
| await _send_batched(send_text, messages) | ||
| messages = [] | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
|
|
||
| async def _send_batched(send_text: Callable[[str], Any], messages: list) -> None: | ||
| """Send messages as a batch. | ||
|
|
||
| Single messages are sent as-is. Multiple messages are wrapped | ||
| in a JSON array without re-parsing - just string concatenation. | ||
|
|
||
| Args: | ||
| send_text: Async function to send text data over WebSocket | ||
| messages: List of pre-serialized JSON message strings | ||
| """ | ||
| if len(messages) == 1: | ||
| await send_text(messages[0]) | ||
| else: | ||
| # Wrap in array: "[msg1,msg2,msg3]" | ||
| await send_text("[" + ",".join(messages) + "]") | ||
|
|
||
|
|
||
| def make_callback_done_handler( | ||
| outbound_queue: janus.Queue[str], | ||
| pending_callbacks: Dict[str, concurrent.futures.Future], | ||
|
|
@@ -269,6 +312,8 @@ def on_done(f: concurrent.futures.Future) -> None: | |
| ) | ||
| finally: | ||
| pending_callbacks.pop(request_id, None) | ||
| if not shutdown_event.is_set(): | ||
| outbound_queue.sync_q.put_nowait(FLUSH_SIGNAL) | ||
|
|
||
| return on_done | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -486,6 +486,7 @@ def __init__( # pylint: disable=too-many-statements, too-many-branches | |
| websocket_callbacks: Optional[bool] = False, | ||
| websocket_allowed_origins: Optional[List[str]] = None, | ||
| websocket_inactivity_timeout: Optional[int] = 300000, | ||
| websocket_batch_delay: Optional[float] = 0.005, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if a user explicitly passes in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't pass typing? Optional[float] requires a float and has float as default. |
||
| **obsolete, | ||
| ): | ||
|
|
||
|
|
@@ -645,6 +646,7 @@ def __init__( # pylint: disable=too-many-statements, too-many-branches | |
| self._websocket_callbacks = websocket_callbacks | ||
| self._websocket_allowed_origins = websocket_allowed_origins or [] | ||
| self._websocket_inactivity_timeout = websocket_inactivity_timeout | ||
| self._websocket_batch_delay = websocket_batch_delay | ||
|
|
||
| self.logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the message order matter? If you batch everything, the order will change.
[set_prop_1, otherMessage, set_prop_2]gets sent out in the order of[set_prop_1, set_prop_2],[otherMessage].