feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528
feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support#16528zhixiangli wants to merge 3 commits intogoogleapis:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a _StreamMultiplexer to handle concurrent download tasks over a single bidirectional gRPC stream, replacing the previous locking mechanism in AsyncMultiRangeDownloader. The multiplexer routes responses to per-task asyncio queues based on read_id, allowing for better resource utilization. The lock parameter in download_ranges is now deprecated. Feedback focuses on improving the multiplexer's reliability and performance, specifically by using asyncio.gather to prevent head-of-line blocking during response broadcasting, ensuring the background receive loop terminates when no tasks are active, and adding error logging for observability.
packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py
Outdated
Show resolved
Hide resolved
packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py
Show resolved
Hide resolved
packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py
Show resolved
Hide resolved
13d5d08 to
774d691
Compare
acfab40 to
aed8682
Compare
|
|
||
|
|
||
| def test_mrd_concurrent_download( | ||
| storage_client, blobs_to_delete, event_loop, grpc_client |
There was a problem hiding this comment.
nit: can you use grpc_client_direct , although both works but we're generally interested in direct path
| logger = logging.getLogger(__name__) | ||
|
|
||
| _DEFAULT_QUEUE_MAX_SIZE = 100 | ||
| _DEFAULT_PUT_TIMEOUT = 20.0 |
There was a problem hiding this comment.
nit: consider _DEFAULT_PUT_TIMEOUT_SECONDS
| keyed by read_id. Coordinates stream reopening via generation-gated | ||
| locking. | ||
|
|
||
| A slow consumer on one task will slow down the entire shared connection |
| if isinstance(item, _StreamEnd): | ||
| if pending_read_ids: | ||
| last_broken_generation = my_generation | ||
| raise exceptions.ServiceUnavailable( |
There was a problem hiding this comment.
This may not be required. See my comment on _stream_multiplexer.py
|
|
||
| pending_read_ids = {r.read_id for r in requests} | ||
| my_generation = self._multiplexer.stream_generation |
There was a problem hiding this comment.
nit: how about renaming it to stream_generation ?
| self, mock_cls_async_read_object_stream, mock_random_int | ||
| ): | ||
| # Arrange |
There was a problem hiding this comment.
don't remove this. Please verify the same for all other tests.
| @@ -419,6 +422,8 @@ async def test_create_mrd_with_generation_number( | |||
| mock_stream.generation_number = _TEST_GENERATION_NUMBER | |||
| mock_stream.persisted_size = _TEST_OBJECT_SIZE | |||
| mock_stream.read_handle = _TEST_READ_HANDLE | |||
| mock_stream.is_stream_open = True | |||
| mock_stream.recv = AsyncMock(side_effect=asyncio.Event().wait) | |||
There was a problem hiding this comment.
adding these lines have NO effect.
There was a problem hiding this comment.
Although you haven't changed code related to this line, I would appreciate if you can add assert on read_handle and object_size
| @@ -0,0 +1,503 @@ | |||
| # Copyright 2025 Google LLC | |||
| assert isinstance(sentinel, _StreamEnd) | ||
|
|
||
|
|
||
| class TestStreamMultiplexerInit: |
There was a problem hiding this comment.
please follow - go/unit-testing-practices?polyglot=python#structure for all tests in this file.
This PR implements
AsyncMultiRangeDownloaderwith a new_StreamMultiplexer, enabling multiple concurrent range downloads to share a single bidirectional gRPC stream.Before vs. After
How it works
The system uses a background
_StreamMultiplexerto manage the shared bidirectional stream:BidiReadObjectRequest) directly to the shared stream.read_idin each response to route data to the correct task-specificasyncio.Queue._StreamErrorand automatically retry using the new stream generation.Key Changes:
_StreamMultiplexer: Background receiver loop for routing responses.AsyncMultiRangeDownloaderIntegration: Full support for concurrentdownload_rangescalls.