From 3996901f93343769374a89bd0860850d87cc2ea9 Mon Sep 17 00:00:00 2001 From: Josh Elbahrawy Date: Sat, 23 May 2026 19:11:55 -0400 Subject: [PATCH 1/5] fix(FsspecStore): close owned async filesystem on store.close() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FsspecStore.from_url() and from_mapper() create their own async filesystem instance that zarr is responsible for — but Store.close() never cleaned it up, leaving the underlying aiohttp ClientSession open until garbage collection. This produced "Unclosed client session" ResourceWarnings from aiohttp, and in environments where the finalizer ran on the wrong event loop (e.g. Python 3.12+ with eager_start=True) it could raise RuntimeError. Changes: - Add _close_fs() async helper: calls fs.set_session() then client.close() for filesystems that expose set_session() (e.g. s3fs); no-op for all others. - Add _owns_fs: bool to FsspecStore.__init__ (default False). Set True in from_url() unconditionally; set True in from_mapper() only when _make_async() produced a new instance (sync→async wrap). Direct construction and from_upath() leave _owns_fs=False — the caller supplied the fs and remains responsible for it. - Override close() to invoke zarr_sync(_close_fs(self.fs)) before calling super().close(), guarded by _owns_fs and a bare except so it can never raise from a destructor path. Tests: - Update pytestmark comment (the filter stays for GC-path warnings). - test_from_url_owns_filesystem / test_from_url_close_releases_store - test_direct_construction_does_not_own_filesystem - test_from_upath_does_not_own_filesystem - test_from_mapper_does_not_own_already_async_filesystem - test_from_mapper_owns_wrapped_sync_filesystem - test_close_fs_closes_s3_client / test_close_fs_no_op_for_fs_without_set_session Co-Authored-By: Claude Sonnet 4.6 --- src/zarr/storage/_fsspec.py | 41 ++++++++++- tests/test_store/test_fsspec.py | 124 +++++++++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 14386d1aac..337ff5c1d7 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -35,6 +35,21 @@ ) +async def _close_fs(fs: AsyncFileSystem) -> None: + """ + Best-effort async close of an fsspec async filesystem owned by FsspecStore. + + For filesystems that expose ``set_session()`` (e.g. s3fs) the underlying + aiohttp ``ClientSession`` is closed explicitly, which prevents + "Unclosed client session" ``ResourceWarning``s from aiohttp. For all + other filesystem types the call is a no-op (not every implementation + manages an HTTP session directly). + """ + if hasattr(fs, "set_session"): + session = await fs.set_session() + await session.close() + + def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: """Convert a sync FSSpec filesystem to an async FFSpec filesystem @@ -129,6 +144,9 @@ def __init__( self.fs = fs self.path = path self.allowed_exceptions = allowed_exceptions + # True only when this store created fs itself (from_url / from_mapper with new instance). + # Callers who supply their own fs remain responsible for its lifecycle. + self._owns_fs: bool = False if not self.fs.async_impl: raise TypeError("Filesystem needs to support async operations.") @@ -194,13 +212,17 @@ def from_mapper( ------- FsspecStore """ - fs = _make_async(fs_map.fs) - return cls( + original_fs = fs_map.fs + fs = _make_async(original_fs) + store = cls( fs=fs, path=fs_map.root, read_only=read_only, allowed_exceptions=allowed_exceptions, ) + # _make_async returns a new instance when converting sync→async; own it. + store._owns_fs = fs is not original_fs + return store @classmethod def from_url( @@ -242,7 +264,9 @@ def from_url( if not fs.async_impl: fs = _make_async(fs) - return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store = cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store._owns_fs = True + return store def with_read_only(self, read_only: bool = False) -> FsspecStore: # docstring inherited @@ -253,6 +277,17 @@ def with_read_only(self, read_only: bool = False) -> FsspecStore: read_only=read_only, ) + def close(self) -> None: + # docstring inherited + if self._owns_fs: + from zarr.core.sync import sync as zarr_sync + + try: + zarr_sync(_close_fs(self.fs)) + except Exception: + pass + super().close() + async def clear(self) -> None: # docstring inherited try: diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 142cb3b00d..170452abf0 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -36,7 +36,9 @@ pytest.mark.filterwarnings( re.escape("ignore:datetime.datetime.utcnow() is deprecated:DeprecationWarning") ), - # TODO: fix these warnings + # FsspecStore.from_url() and from_mapper() now close the aiohttp session on store.close(). + # This filter covers stores that are GC'd without an explicit close() call, and any + # residual sessions from aiobotocore's ClientCreatorContext (a separate upstream issue). pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning"), pytest.mark.filterwarnings( "ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning" @@ -283,6 +285,75 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: ): await store.delete_dir("test_prefix") + # ── Filesystem lifecycle (ownership) ────────────────────────────────────── + + def test_from_url_owns_filesystem(self) -> None: + """FsspecStore.from_url() creates the async fs; it must own it.""" + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + assert store._owns_fs + store.close() + + async def test_from_url_close_releases_store(self) -> None: + """ + close() on a from_url() store must succeed without error and mark the + store as closed. For the owned filesystem, _close_fs() is invoked to + release the underlying S3 client / aiohttp connection pool. + """ + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + # Materialise the S3 client and connection pool. + await store.set("probe", cpu.Buffer.from_bytes(b"x")) + + store.close() + + assert not store._is_open + + def test_direct_construction_does_not_own_filesystem(self) -> None: + """Direct FsspecStore() must not claim ownership — the caller owns the fs.""" + try: + from fsspec import url_to_fs + except ImportError: + from fsspec.core import url_to_fs + fs, path = url_to_fs( + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True + ) + store = FsspecStore(fs=fs, path=path) + assert not store._owns_fs + + @pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.03.01"), + reason="Prior bug in from_upath", + ) + def test_from_upath_does_not_own_filesystem(self) -> None: + """from_upath() uses the UPath's existing fs; the store must not own it.""" + upath = pytest.importorskip("upath") + path = upath.UPath( + f"s3://{test_bucket_name}/foo/bar/", + endpoint_url=endpoint_url, + anon=False, + asynchronous=True, + ) + store = FsspecStore.from_upath(path) + assert not store._owns_fs + + def test_from_mapper_does_not_own_already_async_filesystem(self) -> None: + """from_mapper() with an already-async fs must not claim ownership.""" + s3_filesystem = s3fs.S3FileSystem( + asynchronous=True, + endpoint_url=endpoint_url, + anon=False, + skip_instance_cache=True, + ) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/") + store = FsspecStore.from_mapper(mapper) + # _make_async returns the same instance for an already-async fs. + assert not store._owns_fs + def array_roundtrip(store: FsspecStore) -> None: """ @@ -512,6 +583,57 @@ def test_open_s3map_raises() -> None: zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) +async def test_close_fs_closes_s3_client() -> None: + """ + _close_fs() must call set_session() and then close() on the returned + S3 client. This is verified with mocks to avoid a real S3 connection. + """ + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_client = AsyncMock() + mock_fs = AsyncMock() + mock_fs.set_session = AsyncMock(return_value=mock_client) + + await _close_fs(mock_fs) + + mock_fs.set_session.assert_called_once() + mock_client.close.assert_called_once() + + +async def test_close_fs_no_op_for_fs_without_set_session() -> None: + """_close_fs() must be a no-op for filesystems that don't expose set_session().""" + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_fs = AsyncMock(spec=[]) # empty spec — no set_session attribute + await _close_fs(mock_fs) # must not raise + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_from_mapper_owns_wrapped_sync_filesystem(tmp_path: pathlib.Path) -> None: + """ + from_mapper() with a sync fs must wrap it in AsyncFileSystemWrapper and + claim ownership so that close() cleans it up. + + The local filesystem is synchronous; _make_async() produces a new + AsyncFileSystemWrapper instance — a different object from the original fs. + """ + import fsspec as _fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + fs = _fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(str(tmp_path)) + store = FsspecStore.from_mapper(mapper) + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store._owns_fs + + @pytest.mark.parametrize("asynchronous", [True, False]) def test_make_async(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem( From 4eb42318389edcf9d05b56527c94ba259e30bf63 Mon Sep 17 00:00:00 2001 From: Josh Elbahrawy Date: Sat, 23 May 2026 19:20:15 -0400 Subject: [PATCH 2/5] fix lint: use contextlib.suppress instead of try/except/pass (SIM105) --- src/zarr/storage/_fsspec.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 337ff5c1d7..d5714162d7 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -282,10 +282,8 @@ def close(self) -> None: if self._owns_fs: from zarr.core.sync import sync as zarr_sync - try: + with suppress(Exception): zarr_sync(_close_fs(self.fs)) - except Exception: - pass super().close() async def clear(self) -> None: From feaf6a82681d2dcb51c7bd8baee3ee82cc795af9 Mon Sep 17 00:00:00 2001 From: Josh Elbahrawy Date: Sat, 23 May 2026 19:25:57 -0400 Subject: [PATCH 3/5] add towncrier release note for #4003 --- changes/4003.bugfix.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/4003.bugfix.md diff --git a/changes/4003.bugfix.md b/changes/4003.bugfix.md new file mode 100644 index 0000000000..a4c6356794 --- /dev/null +++ b/changes/4003.bugfix.md @@ -0,0 +1,4 @@ +`FsspecStore.from_url()` and `from_mapper()` now close the async filesystem +they create when `store.close()` is called. Previously the underlying aiohttp +`ClientSession` was left open until garbage collection, producing +`"Unclosed client session"` `ResourceWarning`s from aiohttp. From f4d4517db58dc37ef7d18eb726683eeddf903b00 Mon Sep 17 00:00:00 2001 From: Josh Elbahrawy Date: Sat, 23 May 2026 20:20:20 -0400 Subject: [PATCH 4/5] changes: expand 4003 release note to document fix scope and s3fs limitation Co-Authored-By: Claude Sonnet 4.6 --- changes/4003.bugfix.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/changes/4003.bugfix.md b/changes/4003.bugfix.md index a4c6356794..36327b55df 100644 --- a/changes/4003.bugfix.md +++ b/changes/4003.bugfix.md @@ -2,3 +2,18 @@ they create when `store.close()` is called. Previously the underlying aiohttp `ClientSession` was left open until garbage collection, producing `"Unclosed client session"` `ResourceWarning`s from aiohttp. + +The fix introduces `FsspecStore._owns_fs`, a boolean that is ``True`` only when +`FsspecStore` itself created the filesystem (via `from_url` or `from_mapper` +when a sync→async conversion was performed). When `_owns_fs` is ``True``, +`store.close()` calls the new `_close_fs()` helper, which invokes +`fs.set_session()` and closes the returned client. Callers who supply their own +filesystem instance to `FsspecStore()` directly remain responsible for its +lifecycle; `_owns_fs` is ``False`` for those stores. + +**Scope note**: This fix closes the S3 client session that is active at the time +`store.close()` is called. Some S3-backed filesystem implementations (e.g. +s3fs with ``cache_regions=True``) may internally refresh and replace their +client during I/O operations, abandoning prior sessions before ``store.close()`` +is invoked. Those intermediate sessions are outside the scope of this fix and +are an issue in the upstream filesystem library. From 96d786a81902cd0b5411c598cfe708bd8f26b58e Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 28 May 2026 14:53:57 +0200 Subject: [PATCH 5/5] fix(FsspecStore): propagate fs ownership through with_read_only Addresses roborev review findings on the filesystem-ownership change: - with_read_only() transferred fs ownership: it built the derived store with _owns_fs=False while sharing the same fs. In the common from_url(...).with_read_only() chain the only owner (the unreferenced source) was GC'd without close(), reintroducing the unclosed-session leak. Ownership now transfers to the surviving store and is cleared on the source to avoid a double-close. Covered by a new test. - close() now logs at debug before suppressing, so a regression in the close path stays observable instead of silently reverting to leaking. - Documented that set_session() lazily creates a session, so closing a store that never did I/O may instantiate one purely to close it (accepted best-effort behavior). Co-Authored-By: Claude Opus 4.7 --- src/zarr/storage/_fsspec.py | 24 ++++++++++++++++++++++-- tests/test_store/test_fsspec.py | 25 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index d5714162d7..29201a6fee 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -3,6 +3,7 @@ import json import warnings from contextlib import suppress +from logging import getLogger from typing import TYPE_CHECKING, Any from packaging.version import parse as parse_version @@ -18,6 +19,8 @@ from zarr.errors import ZarrUserWarning from zarr.storage._utils import _dereference_path +logger = getLogger(__name__) + if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable @@ -44,6 +47,11 @@ async def _close_fs(fs: AsyncFileSystem) -> None: "Unclosed client session" ``ResourceWarning``s from aiohttp. For all other filesystem types the call is a no-op (not every implementation manages an HTTP session directly). + + Note that ``set_session()`` lazily creates a session if none exists yet, so + closing a store that never performed any I/O may instantiate a session + purely to close it. This is accepted best-effort behavior; fsspec does not + expose a stable, cross-implementation way to test for an existing session. """ if hasattr(fs, "set_session"): session = await fs.set_session() @@ -270,20 +278,32 @@ def from_url( def with_read_only(self, read_only: bool = False) -> FsspecStore: # docstring inherited - return type(self)( + new_store = type(self)( fs=self.fs, path=self.path, allowed_exceptions=self.allowed_exceptions, read_only=read_only, ) + # The derived store shares the same fs. Transfer ownership so the + # surviving store closes it, and clear ours to avoid a double-close. + # Otherwise the common ``from_url(...).with_read_only()`` chain would + # drop the only owner (the unreferenced source) and leak the session. + new_store._owns_fs = self._owns_fs + self._owns_fs = False + return new_store def close(self) -> None: # docstring inherited if self._owns_fs: from zarr.core.sync import sync as zarr_sync - with suppress(Exception): + # Best-effort: a failure to release the session must not block close(), + # but log it so a genuine regression in the close path stays observable + # rather than silently reverting to the leaking behavior. + try: zarr_sync(_close_fs(self.fs)) + except Exception: + logger.debug("Failed to close owned filesystem %r", self.fs, exc_info=True) super().close() async def clear(self) -> None: diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 170452abf0..8006470174 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -634,6 +634,31 @@ def test_from_mapper_owns_wrapped_sync_filesystem(tmp_path: pathlib.Path) -> Non assert store._owns_fs +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_with_read_only_transfers_filesystem_ownership(tmp_path: pathlib.Path) -> None: + """ + with_read_only() must transfer fs ownership to the derived store and clear + it on the source, so the surviving store closes the shared fs exactly once. + + In the common ``from_url(...).with_read_only()`` chain the source store is + immediately unreferenced; if ownership were not transferred, the only owner + would be garbage-collected without close() and the session would leak. + """ + source = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False}) + assert source._owns_fs + + derived = source.with_read_only(read_only=True) + + # Ownership moved to the survivor; the source no longer owns it (no double-close). + assert derived._owns_fs + assert not source._owns_fs + # The derived store shares the same underlying fs. + assert derived.fs is source.fs + + @pytest.mark.parametrize("asynchronous", [True, False]) def test_make_async(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem(