Skip to content

feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528

Open
zhixiangli wants to merge 3 commits intogoogleapis:mainfrom
zhixiangli:zhixiangli/multiplexing-downloader
Open

feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528
zhixiangli wants to merge 3 commits intogoogleapis:mainfrom
zhixiangli:zhixiangli/multiplexing-downloader

Conversation

@zhixiangli
Copy link
Copy Markdown

@zhixiangli zhixiangli commented Apr 2, 2026

This PR implements AsyncMultiRangeDownloader with a new _StreamMultiplexer, enabling multiple concurrent range downloads to share a single bidirectional gRPC stream.

Before vs. After

Feature Before After (This PR)
Concurrency Sequential or multiple connections Concurrent over one connection
Overhead High (multiple gRPC streams) Low (multiplexed single stream)
Reliability Per-stream retry logic Unified generation-gated reopening

How it works

The system uses a background _StreamMultiplexer to manage the shared bidirectional stream:

  1. Requests: Concurrent tasks send range requests (BidiReadObjectRequest) directly to the shared stream.
  2. Multiplexing: A background Recv Loop listens for all responses. It uses the read_id in each response to route data to the correct task-specific asyncio.Queue.
  3. Error Handling: If the stream breaks, a generation-gated lock ensures the stream is reopened only once. All active tasks receive a _StreamError and automatically retry using the new stream generation.

Key Changes:

  • _StreamMultiplexer: Background receiver loop for routing responses.
  • Generation-Gated Reopening: Coordinates stream recovery across concurrent tasks.
  • AsyncMultiRangeDownloader Integration: Full support for concurrent download_ranges calls.

@zhixiangli zhixiangli requested review from a team as code owners April 2, 2026 07:22
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a _StreamMultiplexer to handle concurrent download tasks over a single bidirectional gRPC stream, replacing the previous locking mechanism in AsyncMultiRangeDownloader. The multiplexer routes responses to per-task asyncio queues based on read_id, allowing for better resource utilization. The lock parameter in download_ranges is now deprecated. Feedback focuses on improving the multiplexer's reliability and performance, specifically by using asyncio.gather to prevent head-of-line blocking during response broadcasting, ensuring the background receive loop terminates when no tasks are active, and adding error logging for observability.

@zhixiangli zhixiangli changed the title feat: implement AsyncMultiRangeDownloader and integrate _StreamMultiplexer feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support Apr 2, 2026
@zhixiangli zhixiangli force-pushed the zhixiangli/multiplexing-downloader branch from 13d5d08 to 774d691 Compare April 2, 2026 08:22
@zhixiangli zhixiangli force-pushed the zhixiangli/multiplexing-downloader branch from acfab40 to aed8682 Compare April 2, 2026 09:47


def test_mrd_concurrent_download(
storage_client, blobs_to_delete, event_loop, grpc_client
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you use grpc_client_direct , although both works but we're generally interested in direct path

logger = logging.getLogger(__name__)

_DEFAULT_QUEUE_MAX_SIZE = 100
_DEFAULT_PUT_TIMEOUT = 20.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider _DEFAULT_PUT_TIMEOUT_SECONDS

keyed by read_id. Coordinates stream reopening via generation-gated
locking.

A slow consumer on one task will slow down the entire shared connection
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it makes sense @zhixiangli ?

if isinstance(item, _StreamEnd):
if pending_read_ids:
last_broken_generation = my_generation
raise exceptions.ServiceUnavailable(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be required. See my comment on _stream_multiplexer.py


pending_read_ids = {r.read_id for r in requests}
my_generation = self._multiplexer.stream_generation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about renaming it to stream_generation ?

self, mock_cls_async_read_object_stream, mock_random_int
):
# Arrange
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't remove this. Please verify the same for all other tests.

@@ -419,6 +422,8 @@ async def test_create_mrd_with_generation_number(
mock_stream.generation_number = _TEST_GENERATION_NUMBER
mock_stream.persisted_size = _TEST_OBJECT_SIZE
mock_stream.read_handle = _TEST_READ_HANDLE
mock_stream.is_stream_open = True
mock_stream.recv = AsyncMock(side_effect=asyncio.Event().wait)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding these lines have NO effect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although you haven't changed code related to this line, I would appreciate if you can add assert on read_handle and object_size

@@ -0,0 +1,503 @@
# Copyright 2025 Google LLC
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2026

assert isinstance(sentinel, _StreamEnd)


class TestStreamMultiplexerInit:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please follow - go/unit-testing-practices?polyglot=python#structure for all tests in this file.

@chandra-siri chandra-siri self-assigned this Apr 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants