Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion @plotly/dash-websocket-worker/src/MessageRouter.ts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the message order matter? If you batch everything, the order will change. [set_prop_1, otherMessage, set_prop_2] gets sent out in the order of [set_prop_1, set_prop_2], [otherMessage].

Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ export class MessageRouter {

/**
* Handle a message from the WebSocket server.
* @param message The message from the server
* Messages may be batched as arrays for efficiency.
* @param message The message from the server (single message or array)
*/
public handleServerMessage(message: unknown): void {
// Handle batched messages (array of messages)
if (Array.isArray(message)) {
this.handleBatchedMessages(message as WorkerMessage[]);
return;
}

const msg = message as WorkerMessage;
const rendererId = msg.rendererId;

Expand All @@ -92,6 +99,65 @@ export class MessageRouter {
}
}

/**
* Handle a batch of messages from the server.
* Groups set_props by renderer and forwards as a single batch message.
*
* Note: set_props messages are processed before other message types.
* This is intentional - batching groups all set_props for efficiency,
* and the relative order within set_props is preserved.
*
* @param messages Array of messages (may include server-only types like heartbeat_ack)
*/
private handleBatchedMessages(messages: WorkerMessage[]): void {
// Group set_props by renderer, keep others separate
const setPropsPayloadsByRenderer: Map<string, SetPropsMessage['payload'][]> = new Map();
const otherMessages: WorkerMessage[] = [];

for (const msg of messages) {
// Skip heartbeat_ack - server-only type, already handled by WebSocketManager
if (msg.type === WorkerMessageType.HEARTBEAT_ACK) {
continue;
}
if (msg.type === WorkerMessageType.SET_PROPS) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use WorkerMessage[] instead of unknown[], you can get rid of the casts here.

const setPropsMsg = msg as SetPropsMessage;
const rendererId = setPropsMsg.rendererId;
const existing = setPropsPayloadsByRenderer.get(rendererId);
if (existing) {
existing.push(setPropsMsg.payload);
} else {
setPropsPayloadsByRenderer.set(rendererId, [setPropsMsg.payload]);
}
} else {
otherMessages.push(msg);
}
}

// Forward batched set_props to each renderer
for (const [rendererId, payloads] of setPropsPayloadsByRenderer) {
const port = this.renderers.get(rendererId);
if (!port) {
console.warn(`Renderer ${rendererId} not found for batch, skipping`);
continue;
}
try {
port.postMessage({
type: WorkerMessageType.SET_PROPS_BATCH,
rendererId,
payload: payloads
});
} catch (error) {
console.warn(`Failed to forward batch to renderer ${rendererId}, removing`);
this.renderers.delete(rendererId);
}
}

// Forward other messages individually
for (const msg of otherMessages) {
this.handleServerMessage(msg);
}
}

/**
* Send a message to all connected renderers.
* @param message The message to broadcast
Expand Down
38 changes: 36 additions & 2 deletions @plotly/dash-websocket-worker/src/WebSocketManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Message received from the server.
* Can be a WorkerMessage or a heartbeat_ack.
*/
interface ServerMessage {
type: string;
[key: string]: unknown;
}

/**
* Configuration options for WebSocket connection.
*/
Expand Down Expand Up @@ -193,9 +202,34 @@ export class WebSocketManager {

private handleMessage(event: MessageEvent): void {
try {
const data = JSON.parse(event.data);
const data: ServerMessage | ServerMessage[] = JSON.parse(event.data);

// Handle batched messages - check for heartbeat_ack in the batch
if (Array.isArray(data)) {
let hasHeartbeatAck = false;
let hasOtherMessages = false;
for (const msg of data) {
if (msg.type === 'heartbeat_ack') {
hasHeartbeatAck = true;
} else {
hasOtherMessages = true;
}
}
if (hasHeartbeatAck) {
this.clearHeartbeatTimeout();
}
// Only track activity if there are non-heartbeat messages
// This matches the single-message behavior
if (hasOtherMessages) {
this.lastActivityTime = Date.now();
if (this.onMessage) {
this.onMessage(data);
}
}
return;
}

// Handle heartbeat acknowledgment - does NOT count as activity
// Handle single heartbeat acknowledgment - does NOT count as activity
if (data.type === 'heartbeat_ack') {
this.clearHeartbeatTimeout();
return;
Expand Down
19 changes: 18 additions & 1 deletion @plotly/dash-websocket-worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ export enum WorkerMessageType {
DISCONNECTED = 'disconnected',
CALLBACK_RESPONSE = 'callback_response',
SET_PROPS = 'set_props',
SET_PROPS_BATCH = 'set_props_batch',
GET_PROPS_REQUEST = 'get_props_request',
ERROR = 'error'
ERROR = 'error',

// Server -> Worker (not forwarded to renderer)
HEARTBEAT_ACK = 'heartbeat_ack'
}

/**
Expand Down Expand Up @@ -88,6 +92,18 @@ export interface SetPropsMessage extends WorkerMessage {
};
}

/**
* Message from worker to renderer to set props for multiple components at once.
* Used for batching multiple set_props calls for efficiency.
*/
export interface SetPropsBatchMessage extends WorkerMessage {
type: WorkerMessageType.SET_PROPS_BATCH;
payload: Array<{
componentId: string;
props: Record<string, unknown>;
}>;
}

/**
* Message from worker to renderer requesting prop values.
*/
Expand Down Expand Up @@ -144,6 +160,7 @@ export type AnyWorkerMessage =
| CallbackRequestMessage
| CallbackResponseMessage
| SetPropsMessage
| SetPropsBatchMessage
| GetPropsRequestMessage
| GetPropsResponseMessage
| ErrorMessage
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This project adheres to [Semantic Versioning](https://semver.org/).
## [UNRELEASED]

## Added
- [#3783](https://github.com/plotly/dash/pull/3783) Add batching/debouncing for websocket `set_props` messages to reduce lag when updating multiple components in a loop. Configurable via `websocket_batch_delay` (default 5ms, set to 0 to disable).
- [#3669](https://github.com/plotly/dash/pull/3669) Selection for DataTable cleared with custom action settings
- [#3680](https://github.com/plotly/dash/pull/3680) Added `search_order` prop to `Dropdown` to allow users to preserve original option order during search
- Added `csrf_token_name` and `csrf_header_name` config options to allow configuring the CSRF cookie and header names. Fixes [#729](https://github.com/plotly/dash/issues/729)
Expand Down
4 changes: 3 additions & 1 deletion dash/backends/_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,10 @@ async def websocket_handler(websocket: WebSocket):
pending_callbacks: Dict[str, concurrent.futures.Future] = {}

# Start sender task to drain outbound queue (sends pre-serialized text)
# pylint: disable=protected-access
batch_delay = getattr(dash_app, "_websocket_batch_delay", 0.005)
sender_task = asyncio.create_task(
run_ws_sender(websocket.send_text, outbound_queue)
run_ws_sender(websocket.send_text, outbound_queue, batch_delay)
)

try:
Expand Down
6 changes: 5 additions & 1 deletion dash/backends/_quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,11 @@ async def websocket_handler(): # pylint: disable=too-many-branches
pending_callbacks: Dict[str, concurrent.futures.Future] = {}

# Start sender task to drain outbound queue (sends pre-serialized text)
sender_task = asyncio.create_task(run_ws_sender(ws.send, outbound_queue))
# pylint: disable=protected-access
batch_delay = getattr(dash_app, "_websocket_batch_delay", 0.005)
sender_task = asyncio.create_task(
run_ws_sender(ws.send, outbound_queue, batch_delay)
)

try:
shutdown_event = self._ws_shutdown_event
Expand Down
57 changes: 51 additions & 6 deletions dash/backends/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

SHUTDOWN_SIGNAL = "__shutdown__"
DISCONNECTED = "__disconnected__"
FLUSH_SIGNAL = "__flush__"


class DashWebsocketCallback:
Expand Down Expand Up @@ -184,29 +185,71 @@ def create_ws_context(


async def run_ws_sender(
send_text: Callable[[str], Any], outbound_queue: janus.Queue[str]
send_text: Callable[[str], Any],
outbound_queue: janus.Queue[str],
batch_delay: float = 0.005,
) -> None:
"""Sender coroutine - drains queue and sends to WebSocket.

This coroutine runs in the main event loop and handles sending
messages that are queued by worker threads via janus.Queue.

Messages are pre-serialized strings (using to_json).
Messages are pre-serialized strings (using to_json). For efficiency,
this function batches messages that arrive within batch_delay of each
other, sending them as a JSON array. When no message arrives within
the window, all collected messages are sent immediately.

Args:
send_text: Async function to send text data over WebSocket
outbound_queue: janus.Queue instance for receiving messages (strings)
batch_delay: Time in seconds to wait for additional messages (default: 5ms).
Set to 0 to disable batching and send messages immediately.
"""
q = outbound_queue.async_q
messages: list[str] = []
try:
while True:
msg = await outbound_queue.async_q.get()
if msg == SHUTDOWN_SIGNAL:
break
await send_text(msg)
# Wait indefinitely for first message, then use timeout for batching
timeout = batch_delay if messages else None
try:
msg = await asyncio.wait_for(q.get(), timeout=timeout)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more appropriate to get a message and send it immediately with send_text if batch_delay is 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it saves one iteration 👍

if msg == SHUTDOWN_SIGNAL:
if messages:
await _send_batched(send_text, messages)
return
if msg == FLUSH_SIGNAL:
if messages:
await _send_batched(send_text, messages)
messages = []
continue
if not batch_delay:
await send_text(msg)
else:
messages.append(msg)
except asyncio.TimeoutError:
await _send_batched(send_text, messages)
messages = []
except asyncio.CancelledError:
pass


async def _send_batched(send_text: Callable[[str], Any], messages: list) -> None:
"""Send messages as a batch.

Single messages are sent as-is. Multiple messages are wrapped
in a JSON array without re-parsing - just string concatenation.

Args:
send_text: Async function to send text data over WebSocket
messages: List of pre-serialized JSON message strings
"""
if len(messages) == 1:
await send_text(messages[0])
else:
# Wrap in array: "[msg1,msg2,msg3]"
await send_text("[" + ",".join(messages) + "]")


def make_callback_done_handler(
outbound_queue: janus.Queue[str],
pending_callbacks: Dict[str, concurrent.futures.Future],
Expand Down Expand Up @@ -269,6 +312,8 @@ def on_done(f: concurrent.futures.Future) -> None:
)
finally:
pending_callbacks.pop(request_id, None)
if not shutdown_event.is_set():
outbound_queue.sync_q.put_nowait(FLUSH_SIGNAL)

return on_done

Expand Down
17 changes: 15 additions & 2 deletions dash/dash-renderer/src/observers/websocketObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import {updateProps, notifyObservers, setPaths} from '../actions';
import {parsePatchProps} from '../actions/patch';
import {computePaths, getPath} from '../actions/paths';
import {batch} from 'react-redux';
import {
getWorkerClient,
SetPropsPayload,
Expand Down Expand Up @@ -72,8 +73,8 @@

const workerClient = getWorkerClient();

// Handle SET_PROPS messages
workerClient.onSetProps = (payload: SetPropsPayload) => {
// Helper to process a single set_props payload
const processSetProps = (payload: SetPropsPayload) => {
const {componentId, props: rawProps} = payload;
const parsedId = parseComponentId(componentId);
const state = store.getState();
Expand Down Expand Up @@ -129,6 +130,18 @@
}
};

// Handle single SET_PROPS message
workerClient.onSetProps = processSetProps;

// Handle batched SET_PROPS_BATCH message
workerClient.onSetPropsBatch = (payloads: SetPropsPayload[]) => {
batch(() => {
for (const payload of payloads) {
processSetProps(payload);
}
});
};

// Handle GET_PROPS_REQUEST messages
workerClient.onGetPropsRequest = (
requestId: string,
Expand Down Expand Up @@ -181,9 +194,9 @@
try {
// config.websocket is guaranteed to exist due to wsAvailable check above
await workerClient.connect(
config.websocket!.worker_url,

Check warning on line 197 in dash/dash-renderer/src/observers/websocketObserver.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.8)

Forbidden non-null assertion

Check warning on line 197 in dash/dash-renderer/src/observers/websocketObserver.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.12)

Forbidden non-null assertion
wsUrl,
config.websocket!.inactivity_timeout

Check warning on line 199 in dash/dash-renderer/src/observers/websocketObserver.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.8)

Forbidden non-null assertion

Check warning on line 199 in dash/dash-renderer/src/observers/websocketObserver.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.12)

Forbidden non-null assertion
);
} catch (error) {
console.error('[Dash] Failed to connect to WebSocket worker:', error);
Expand Down
11 changes: 11 additions & 0 deletions dash/dash-renderer/src/utils/workerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DISCONNECTED = 'disconnected',
CALLBACK_RESPONSE = 'callback_response',
SET_PROPS = 'set_props',
SET_PROPS_BATCH = 'set_props_batch',
GET_PROPS_REQUEST = 'get_props_request',
ERROR = 'error'
}
Expand Down Expand Up @@ -58,6 +59,10 @@
/** Callback when SET_PROPS message is received */
public onSetProps: ((payload: SetPropsPayload) => void) | null = null;

/** Callback when SET_PROPS_BATCH message is received */
public onSetPropsBatch: ((payloads: SetPropsPayload[]) => void) | null =
null;

/** Callback when GET_PROPS_REQUEST message is received */
public onGetPropsRequest:
| ((requestId: string, payload: GetPropsRequestPayload) => void)
Expand Down Expand Up @@ -209,7 +214,7 @@
return new Promise((resolve, reject) => {
this.pendingCallbacks.set(requestId, {resolve, reject});

this.worker!.port.postMessage({

Check warning on line 217 in dash/dash-renderer/src/utils/workerClient.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.8)

Forbidden non-null assertion

Check warning on line 217 in dash/dash-renderer/src/utils/workerClient.ts

View workflow job for this annotation

GitHub Actions / Lint & Unit Tests (Python 3.12)

Forbidden non-null assertion
type: WorkerMessageType.CALLBACK_REQUEST,
rendererId: this.rendererId,
requestId,
Expand Down Expand Up @@ -289,6 +294,12 @@
}
break;

case WorkerMessageType.SET_PROPS_BATCH:
if (this.onSetPropsBatch) {
this.onSetPropsBatch(message.payload);
}
break;

case WorkerMessageType.GET_PROPS_REQUEST:
if (this.onGetPropsRequest) {
this.onGetPropsRequest(message.requestId, message.payload);
Expand Down
2 changes: 2 additions & 0 deletions dash/dash.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ def __init__( # pylint: disable=too-many-statements, too-many-branches
websocket_callbacks: Optional[bool] = False,
websocket_allowed_origins: Optional[List[str]] = None,
websocket_inactivity_timeout: Optional[int] = 300000,
websocket_batch_delay: Optional[float] = 0.005,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a user explicitly passes in None for this value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't pass typing? Optional[float] requires a float and has float as default.

**obsolete,
):

Expand Down Expand Up @@ -645,6 +646,7 @@ def __init__( # pylint: disable=too-many-statements, too-many-branches
self._websocket_callbacks = websocket_callbacks
self._websocket_allowed_origins = websocket_allowed_origins or []
self._websocket_inactivity_timeout = websocket_inactivity_timeout
self._websocket_batch_delay = websocket_batch_delay

self.logger = logging.getLogger(__name__)

Expand Down
Loading
Loading