From 42ab95398d8825b531b138bbec7d1975382312a9 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:53:39 +0000 Subject: [PATCH 1/9] feat(storage): parse finalize_time and server crc32c in async object stream --- .../asyncio/async_read_object_stream.py | 19 ++++++++++ .../asyncio/test_async_read_object_stream.py | 37 ++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index cd7ae067c631..8655ed005d61 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime from typing import List, Optional, Tuple from google.api_core.bidi_async import AsyncBidiRpc @@ -79,6 +80,9 @@ def __init__( self.socket_like_rpc: Optional[AsyncBidiRpc] = None self._is_stream_open: bool = False self.persisted_size: Optional[int] = None + self.is_finalized: bool = False + self.full_obj_server_crc32c: Optional[int] = None + self.object_metadata: Optional[_storage_v2.Object] = None async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: """Opens the bidi-gRPC connection to read from the object. @@ -132,6 +136,21 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: self.generation_number = response.metadata.generation # update persisted size self.persisted_size = response.metadata.size + self.object_metadata = response.metadata + # Since full object checksum validation is only required for finalized objects, + # check finalize_time (which is DatetimeWithNanoseconds/datetime in production, or mocked in tests). + finalize_time = getattr(response.metadata, "finalize_time", None) + if finalize_time: + is_finalized_val = False + if isinstance(finalize_time, datetime.datetime): + is_finalized_val = True + elif hasattr(finalize_time, "seconds") and finalize_time.seconds > 0: + is_finalized_val = True + + if is_finalized_val: + self.is_finalized = True + if hasattr(response.metadata, "checksums") and response.metadata.checksums: + self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: self.read_handle = response.read_handle diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index f5783be6bf94..fc16ae41a0ac 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -38,9 +38,11 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open socket_like_rpc.open = AsyncMock() recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) - recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object) + recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE + recv_response.metadata.finalize_time.seconds = 12345 + recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) @@ -130,6 +132,8 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc): assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER assert read_obj_stream.read_handle == _TEST_READ_HANDLE assert read_obj_stream.persisted_size == _TEST_OBJECT_SIZE + assert read_obj_stream.is_finalized is True + assert read_obj_stream.full_obj_server_crc32c == 98765 assert read_obj_stream.is_stream_open @@ -381,3 +385,34 @@ async def test_recv_updates_read_handle_on_refresh( await stream.recv() assert stream.read_handle == refreshed_handle + + +@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc") +@mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_open_unfinalized_object_skips_checksum(mock_client, mock_cls_async_bidi_rpc): + socket_like_rpc = AsyncMock() + mock_cls_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = AsyncMock() + + recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) + recv_response.metadata = mock.MagicMock() + recv_response.metadata.generation = _TEST_GENERATION_NUMBER + recv_response.metadata.size = _TEST_OBJECT_SIZE + recv_response.metadata.finalize_time.seconds = 0 # NOT finalized! + recv_response.metadata.checksums.crc32c = 98765 + recv_response.read_handle = _TEST_READ_HANDLE + socket_like_rpc.recv = AsyncMock(return_value=recv_response) + + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + + await read_obj_stream.open() + + assert read_obj_stream.is_finalized is False + assert read_obj_stream.full_obj_server_crc32c is None From d972c2cc3aea01e14633054b961eff577bff4910 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 20:54:04 +0000 Subject: [PATCH 2/9] feat(storage): implement rolling checksum and verification in reads resumption strategy --- .../retry/reads_resumption_strategy.py | 29 ++++++++- .../retry/test_reads_resumption_strategy.py | 59 ++++++++++++++++++- 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 845770c3a215..35d974ae2911 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -36,7 +36,7 @@ class _DownloadState: """A helper class to track the state of a single range download.""" def __init__( - self, initial_offset: int, initial_length: int, user_buffer: IO[bytes] + self, initial_offset: int, initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False ): self.initial_offset = initial_offset self.initial_length = initial_length @@ -44,6 +44,8 @@ def __init__( self.bytes_written = 0 self.next_expected_offset = initial_offset self.is_complete = False + self.is_full_object_read = is_full_object_read + self.rolling_checksum = google_crc32c.Checksum() if is_full_object_read else None class _ReadResumptionStrategy(_BaseResumptionStrategy): @@ -90,6 +92,7 @@ def update_state_from_response( ) download_states = state["download_states"] + checksum_enabled = state.get("enable_checksum", True) for object_data_range in proto.object_data_ranges: # Ignore empty ranges or ranges for IDs not in our state @@ -125,7 +128,7 @@ def update_state_from_response( checksummed_data = object_data_range.checksummed_data data = checksummed_data.content - if checksummed_data.HasField("crc32c"): + if checksum_enabled and checksummed_data.HasField("crc32c"): server_checksum = checksummed_data.crc32c client_checksum = google_crc32c.value(data) if server_checksum != client_checksum: @@ -138,10 +141,14 @@ def update_state_from_response( # Update State & Write Data chunk_size = len(data) read_state.user_buffer.write(data) + + # Commit updates only after the write succeeds + if read_state.rolling_checksum is not None: + read_state.rolling_checksum.update(data) read_state.bytes_written += chunk_size read_state.next_expected_offset += chunk_size - # Final Byte Count Verification + # Final Byte Count & Full Object Checksum Verification if object_data_range.range_end: read_state.is_complete = True if ( @@ -154,6 +161,22 @@ def update_state_from_response( f"Expected {read_state.initial_length}, got {read_state.bytes_written}", ) + # Perform full-object checksum verification once the stream finishes. + if read_state.is_full_object_read and checksum_enabled: + full_obj_server_crc32c = state.get("full_obj_server_crc32c") + if full_obj_server_crc32c is not None: + # Use standard big-endian byte conversion to retrieve the rolling checksum value. + client_checksum = int.from_bytes( + read_state.rolling_checksum.digest(), + byteorder="big", + ) + if client_checksum != full_obj_server_crc32c: + raise DataCorruption( + response, + f"Full object checksum mismatch for read_id {read_id}. " + f"Server authoritative crc32c: {full_obj_server_crc32c}, client calculated rolling: {client_checksum}.", + ) + async def recover_state_on_failure(self, error: Exception, state: Any) -> None: """Handles BidiReadObjectRedirectedError for reads.""" routing_token, read_handle = _handle_redirect(error) diff --git a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py index dc27cb701974..88bae515729b 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py @@ -45,6 +45,24 @@ def test_initialization(self): self.assertEqual(state.bytes_written, 0) self.assertEqual(state.next_expected_offset, initial_offset) self.assertFalse(state.is_complete) + self.assertFalse(state.is_full_object_read) + self.assertIsNone(state.rolling_checksum) + + def test_initialization_with_full_object_read(self): + """Test that _DownloadState initializes correctly when is_full_object_read is True.""" + initial_offset = 10 + initial_length = 100 + user_buffer = io.BytesIO() + state_full = _DownloadState(initial_offset, initial_length, user_buffer, is_full_object_read=True) + + self.assertEqual(state_full.initial_offset, initial_offset) + self.assertEqual(state_full.initial_length, initial_length) + self.assertEqual(state_full.user_buffer, user_buffer) + self.assertEqual(state_full.bytes_written, 0) + self.assertEqual(state_full.next_expected_offset, initial_offset) + self.assertFalse(state_full.is_complete) + self.assertTrue(state_full.is_full_object_read) + self.assertIsNotNone(state_full.rolling_checksum) class TestReadResumptionStrategy(unittest.TestCase): @@ -53,12 +71,12 @@ def setUp(self): self.state = {"download_states": {}, "read_handle": None, "routing_token": None} - def _add_download(self, read_id, offset=0, length=100, buffer=None): + def _add_download(self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False): """Helper to inject a download state into the correct nested location.""" if buffer is None: buffer = io.BytesIO() state = _DownloadState( - initial_offset=offset, initial_length=length, user_buffer=buffer + initial_offset=offset, initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read ) self.state["download_states"][read_id] = state return state @@ -358,3 +376,40 @@ async def run(): # Token should remain unchanged self.assertEqual(self.state["routing_token"], "existing-token") + + def test_update_state_full_object_checksum_success(self): + """Test that full object checksum verification succeeds on range_end.""" + read_state = self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = True + self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1") + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + self.strategy.update_state_from_response(resp2, self.state) + + self.assertTrue(read_state.is_complete) + self.assertEqual(read_state.bytes_written, 9) + + def test_update_state_full_object_checksum_failure(self): + """Test that full object checksum verification raises DataCorruption on mismatch at range_end.""" + self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = True + self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum! + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + with self.assertRaisesRegex(DataCorruption, "Full object checksum mismatch"): + self.strategy.update_state_from_response(resp2, self.state) + + def test_update_state_checksum_mismatch_ignored_when_disabled(self): + """Test that a CRC32C mismatch is ignored when enable_checksum is False.""" + self._add_download(_READ_ID) + self.state["enable_checksum"] = False + response = self._create_response(b"data", _READ_ID, offset=0, crc=999999) + + # Should NOT raise DataCorruption! + self.strategy.update_state_from_response(response, self.state) From 7110744e7cfcd68e75d66fce375dd66611c99962 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 26 May 2026 21:02:53 +0000 Subject: [PATCH 3/9] style(storage): apply user feedback on finalize_time and fix unused import lint error --- .../asyncio/async_read_object_stream.py | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 8655ed005d61..965c1abb9835 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime from typing import List, Optional, Tuple from google.api_core.bidi_async import AsyncBidiRpc @@ -137,20 +136,14 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: # update persisted size self.persisted_size = response.metadata.size self.object_metadata = response.metadata - # Since full object checksum validation is only required for finalized objects, - # check finalize_time (which is DatetimeWithNanoseconds/datetime in production, or mocked in tests). - finalize_time = getattr(response.metadata, "finalize_time", None) - if finalize_time: - is_finalized_val = False - if isinstance(finalize_time, datetime.datetime): - is_finalized_val = True - elif hasattr(finalize_time, "seconds") and finalize_time.seconds > 0: - is_finalized_val = True - - if is_finalized_val: - self.is_finalized = True - if hasattr(response.metadata, "checksums") and response.metadata.checksums: - self.full_obj_server_crc32c = response.metadata.checksums.crc32c + if ( + hasattr(response.metadata, "finalize_time") + and response.metadata.finalize_time + and response.metadata.finalize_time.seconds > 0 + ): + self.is_finalized = True + if hasattr(response.metadata, "checksums") and response.metadata.checksums: + self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: self.read_handle = response.read_handle From 31ef2e399f59becde18ab59cb909f8a5f4916e87 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:01:53 +0000 Subject: [PATCH 4/9] style(storage): run ruff format to resolve CI lint failures --- .../google/cloud/storage/asyncio/async_read_object_stream.py | 5 ++++- .../tests/unit/asyncio/test_async_read_object_stream.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 965c1abb9835..8d5595a81ed7 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -142,7 +142,10 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: and response.metadata.finalize_time.seconds > 0 ): self.is_finalized = True - if hasattr(response.metadata, "checksums") and response.metadata.checksums: + if ( + hasattr(response.metadata, "checksums") + and response.metadata.checksums + ): self.full_obj_server_crc32c = response.metadata.checksums.crc32c if response and response.read_handle: diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index fc16ae41a0ac..9aaec2daed1b 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -392,7 +392,9 @@ async def test_recv_updates_read_handle_on_refresh( "google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) @pytest.mark.asyncio -async def test_open_unfinalized_object_skips_checksum(mock_client, mock_cls_async_bidi_rpc): +async def test_open_unfinalized_object_skips_checksum( + mock_client, mock_cls_async_bidi_rpc +): socket_like_rpc = AsyncMock() mock_cls_async_bidi_rpc.return_value = socket_like_rpc socket_like_rpc.open = AsyncMock() From c371f59259cf39dea9c96587674437c6f12c4fd0 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:06:22 +0000 Subject: [PATCH 5/9] test(storage): add conftest autouse event loop fixture to resolve pytest-asyncio issues --- .../tests/unit/conftest.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 packages/google-cloud-storage/tests/unit/conftest.py diff --git a/packages/google-cloud-storage/tests/unit/conftest.py b/packages/google-cloud-storage/tests/unit/conftest.py new file mode 100644 index 000000000000..2eeabdc990e6 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/conftest.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import pytest + + +@pytest.fixture(autouse=True) +def set_event_loop(): + try: + asyncio.get_running_loop() + yield + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + yield + finally: + loop.close() + asyncio.set_event_loop(None) From c47064135cc470a0e706df2d0cbf7c125c569218 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 08:40:18 +0000 Subject: [PATCH 6/9] fix(storage): use second attribute instead of seconds for DatetimeWithNanoseconds --- .../google/cloud/storage/asyncio/async_read_object_stream.py | 2 +- .../tests/unit/asyncio/test_async_read_object_stream.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py index 8d5595a81ed7..8fd98d623571 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py @@ -139,7 +139,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: if ( hasattr(response.metadata, "finalize_time") and response.metadata.finalize_time - and response.metadata.finalize_time.seconds > 0 + and response.metadata.finalize_time.second > 0 ): self.is_finalized = True if ( diff --git a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py index 9aaec2daed1b..a8f64422765e 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py @@ -41,7 +41,7 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE - recv_response.metadata.finalize_time.seconds = 12345 + recv_response.metadata.finalize_time.second = 30 recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) @@ -403,7 +403,7 @@ async def test_open_unfinalized_object_skips_checksum( recv_response.metadata = mock.MagicMock() recv_response.metadata.generation = _TEST_GENERATION_NUMBER recv_response.metadata.size = _TEST_OBJECT_SIZE - recv_response.metadata.finalize_time.seconds = 0 # NOT finalized! + recv_response.metadata.finalize_time.second = 0 # NOT finalized! recv_response.metadata.checksums.crc32c = 98765 recv_response.read_handle = _TEST_READ_HANDLE socket_like_rpc.recv = AsyncMock(return_value=recv_response) From 2f49dcb1d83012f0d0ea29ce5714a54fcbcbd211 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 May 2026 09:18:23 +0000 Subject: [PATCH 7/9] perf(storage): bypass rolling checksum updates when checksum validation is disabled --- .../retry/reads_resumption_strategy.py | 12 ++++++-- .../retry/test_reads_resumption_strategy.py | 30 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 35d974ae2911..3a782c8135eb 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -36,7 +36,11 @@ class _DownloadState: """A helper class to track the state of a single range download.""" def __init__( - self, initial_offset: int, initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False + self, + initial_offset: int, + initial_length: int, + user_buffer: IO[bytes], + is_full_object_read: bool = False, ): self.initial_offset = initial_offset self.initial_length = initial_length @@ -45,7 +49,9 @@ def __init__( self.next_expected_offset = initial_offset self.is_complete = False self.is_full_object_read = is_full_object_read - self.rolling_checksum = google_crc32c.Checksum() if is_full_object_read else None + self.rolling_checksum = ( + google_crc32c.Checksum() if is_full_object_read else None + ) class _ReadResumptionStrategy(_BaseResumptionStrategy): @@ -143,7 +149,7 @@ def update_state_from_response( read_state.user_buffer.write(data) # Commit updates only after the write succeeds - if read_state.rolling_checksum is not None: + if checksum_enabled and read_state.rolling_checksum is not None: read_state.rolling_checksum.update(data) read_state.bytes_written += chunk_size read_state.next_expected_offset += chunk_size diff --git a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py index 88bae515729b..4f7849801acd 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py @@ -53,7 +53,9 @@ def test_initialization_with_full_object_read(self): initial_offset = 10 initial_length = 100 user_buffer = io.BytesIO() - state_full = _DownloadState(initial_offset, initial_length, user_buffer, is_full_object_read=True) + state_full = _DownloadState( + initial_offset, initial_length, user_buffer, is_full_object_read=True + ) self.assertEqual(state_full.initial_offset, initial_offset) self.assertEqual(state_full.initial_length, initial_length) @@ -71,12 +73,17 @@ def setUp(self): self.state = {"download_states": {}, "read_handle": None, "routing_token": None} - def _add_download(self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False): + def _add_download( + self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False + ): """Helper to inject a download state into the correct nested location.""" if buffer is None: buffer = io.BytesIO() state = _DownloadState( - initial_offset=offset, initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read + initial_offset=offset, + initial_length=length, + user_buffer=buffer, + is_full_object_read=is_full_object_read, ) self.state["download_states"][read_id] = state return state @@ -379,7 +386,9 @@ async def run(): def test_update_state_full_object_checksum_success(self): """Test that full object checksum verification succeeds on range_end.""" - read_state = self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + read_state = self._add_download( + _READ_ID, offset=0, length=9, is_full_object_read=True + ) self.state["enable_checksum"] = True self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1") @@ -413,3 +422,16 @@ def test_update_state_checksum_mismatch_ignored_when_disabled(self): # Should NOT raise DataCorruption! self.strategy.update_state_from_response(response, self.state) + + def test_update_state_full_object_checksum_mismatch_ignored_when_disabled(self): + """Test that a full-object CRC32C mismatch is ignored when enable_checksum is False.""" + self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self.state["enable_checksum"] = False + self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum! + + resp1 = self._create_response(b"test", _READ_ID, offset=0) + self.strategy.update_state_from_response(resp1, self.state) + + resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True) + # Should NOT raise DataCorruption! + self.strategy.update_state_from_response(resp2, self.state) From 8fdb57123d3920a52bd497b115377328b2ac29a4 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 4 Jun 2026 10:10:18 +0000 Subject: [PATCH 8/9] chore: trigger workflows From 465d210505ff37a29eb407e535756f0f70f45485 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 4 Jun 2026 11:05:35 +0000 Subject: [PATCH 9/9] fix(storage): skip rolling checksum creation when validation is disabled --- .../retry/reads_resumption_strategy.py | 11 +++++- .../retry/test_reads_resumption_strategy.py | 39 ++++++++++++++++++- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 3a782c8135eb..6cf17af19089 100644 --- a/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -41,6 +41,7 @@ def __init__( initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False, + enable_checksum: bool = True, ): self.initial_offset = initial_offset self.initial_length = initial_length @@ -50,7 +51,9 @@ def __init__( self.is_complete = False self.is_full_object_read = is_full_object_read self.rolling_checksum = ( - google_crc32c.Checksum() if is_full_object_read else None + google_crc32c.Checksum() + if (is_full_object_read and enable_checksum) + else None ) @@ -168,7 +171,11 @@ def update_state_from_response( ) # Perform full-object checksum verification once the stream finishes. - if read_state.is_full_object_read and checksum_enabled: + if ( + read_state.is_full_object_read + and checksum_enabled + and read_state.rolling_checksum is not None + ): full_obj_server_crc32c = state.get("full_obj_server_crc32c") if full_obj_server_crc32c is not None: # Use standard big-endian byte conversion to retrieve the rolling checksum value. diff --git a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py index 4f7849801acd..841ea655626e 100644 --- a/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py +++ b/packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py @@ -66,6 +66,28 @@ def test_initialization_with_full_object_read(self): self.assertTrue(state_full.is_full_object_read) self.assertIsNotNone(state_full.rolling_checksum) + def test_initialization_with_full_object_read_and_checksum_disabled(self): + """Test that _DownloadState does not initialize rolling_checksum when enable_checksum is False.""" + initial_offset = 10 + initial_length = 100 + user_buffer = io.BytesIO() + state_full = _DownloadState( + initial_offset, + initial_length, + user_buffer, + is_full_object_read=True, + enable_checksum=False, + ) + + self.assertEqual(state_full.initial_offset, initial_offset) + self.assertEqual(state_full.initial_length, initial_length) + self.assertEqual(state_full.user_buffer, user_buffer) + self.assertEqual(state_full.bytes_written, 0) + self.assertEqual(state_full.next_expected_offset, initial_offset) + self.assertFalse(state_full.is_complete) + self.assertTrue(state_full.is_full_object_read) + self.assertIsNone(state_full.rolling_checksum) + class TestReadResumptionStrategy(unittest.TestCase): def setUp(self): @@ -74,7 +96,13 @@ def setUp(self): self.state = {"download_states": {}, "read_handle": None, "routing_token": None} def _add_download( - self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False + self, + read_id, + offset=0, + length=100, + buffer=None, + is_full_object_read=False, + enable_checksum=True, ): """Helper to inject a download state into the correct nested location.""" if buffer is None: @@ -84,6 +112,7 @@ def _add_download( initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read, + enable_checksum=enable_checksum, ) self.state["download_states"][read_id] = state return state @@ -425,7 +454,13 @@ def test_update_state_checksum_mismatch_ignored_when_disabled(self): def test_update_state_full_object_checksum_mismatch_ignored_when_disabled(self): """Test that a full-object CRC32C mismatch is ignored when enable_checksum is False.""" - self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True) + self._add_download( + _READ_ID, + offset=0, + length=9, + is_full_object_read=True, + enable_checksum=False, + ) self.state["enable_checksum"] = False self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!