diff --git a/changes/4003.bugfix.md b/changes/4003.bugfix.md new file mode 100644 index 0000000000..36327b55df --- /dev/null +++ b/changes/4003.bugfix.md @@ -0,0 +1,19 @@ +`FsspecStore.from_url()` and `from_mapper()` now close the async filesystem +they create when `store.close()` is called. Previously the underlying aiohttp +`ClientSession` was left open until garbage collection, producing +`"Unclosed client session"` `ResourceWarning`s from aiohttp. + +The fix introduces `FsspecStore._owns_fs`, a boolean that is ``True`` only when +`FsspecStore` itself created the filesystem (via `from_url` or `from_mapper` +when a sync→async conversion was performed). When `_owns_fs` is ``True``, +`store.close()` calls the new `_close_fs()` helper, which invokes +`fs.set_session()` and closes the returned client. Callers who supply their own +filesystem instance to `FsspecStore()` directly remain responsible for its +lifecycle; `_owns_fs` is ``False`` for those stores. + +**Scope note**: This fix closes the S3 client session that is active at the time +`store.close()` is called. Some S3-backed filesystem implementations (e.g. +s3fs with ``cache_regions=True``) may internally refresh and replace their +client during I/O operations, abandoning prior sessions before ``store.close()`` +is invoked. Those intermediate sessions are outside the scope of this fix and +are an issue in the upstream filesystem library. diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 14386d1aac..d5714162d7 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -35,6 +35,21 @@ ) +async def _close_fs(fs: AsyncFileSystem) -> None: + """ + Best-effort async close of an fsspec async filesystem owned by FsspecStore. + + For filesystems that expose ``set_session()`` (e.g. s3fs) the underlying + aiohttp ``ClientSession`` is closed explicitly, which prevents + "Unclosed client session" ``ResourceWarning``s from aiohttp. For all + other filesystem types the call is a no-op (not every implementation + manages an HTTP session directly). + """ + if hasattr(fs, "set_session"): + session = await fs.set_session() + await session.close() + + def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: """Convert a sync FSSpec filesystem to an async FFSpec filesystem @@ -129,6 +144,9 @@ def __init__( self.fs = fs self.path = path self.allowed_exceptions = allowed_exceptions + # True only when this store created fs itself (from_url / from_mapper with new instance). + # Callers who supply their own fs remain responsible for its lifecycle. + self._owns_fs: bool = False if not self.fs.async_impl: raise TypeError("Filesystem needs to support async operations.") @@ -194,13 +212,17 @@ def from_mapper( ------- FsspecStore """ - fs = _make_async(fs_map.fs) - return cls( + original_fs = fs_map.fs + fs = _make_async(original_fs) + store = cls( fs=fs, path=fs_map.root, read_only=read_only, allowed_exceptions=allowed_exceptions, ) + # _make_async returns a new instance when converting sync→async; own it. + store._owns_fs = fs is not original_fs + return store @classmethod def from_url( @@ -242,7 +264,9 @@ def from_url( if not fs.async_impl: fs = _make_async(fs) - return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store = cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions) + store._owns_fs = True + return store def with_read_only(self, read_only: bool = False) -> FsspecStore: # docstring inherited @@ -253,6 +277,15 @@ def with_read_only(self, read_only: bool = False) -> FsspecStore: read_only=read_only, ) + def close(self) -> None: + # docstring inherited + if self._owns_fs: + from zarr.core.sync import sync as zarr_sync + + with suppress(Exception): + zarr_sync(_close_fs(self.fs)) + super().close() + async def clear(self) -> None: # docstring inherited try: diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 142cb3b00d..170452abf0 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -36,7 +36,9 @@ pytest.mark.filterwarnings( re.escape("ignore:datetime.datetime.utcnow() is deprecated:DeprecationWarning") ), - # TODO: fix these warnings + # FsspecStore.from_url() and from_mapper() now close the aiohttp session on store.close(). + # This filter covers stores that are GC'd without an explicit close() call, and any + # residual sessions from aiobotocore's ClientCreatorContext (a separate upstream issue). pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning"), pytest.mark.filterwarnings( "ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning" @@ -283,6 +285,75 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: ): await store.delete_dir("test_prefix") + # ── Filesystem lifecycle (ownership) ────────────────────────────────────── + + def test_from_url_owns_filesystem(self) -> None: + """FsspecStore.from_url() creates the async fs; it must own it.""" + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + assert store._owns_fs + store.close() + + async def test_from_url_close_releases_store(self) -> None: + """ + close() on a from_url() store must succeed without error and mark the + store as closed. For the owned filesystem, _close_fs() is invoked to + release the underlying S3 client / aiohttp connection pool. + """ + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/lifecycle/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) + # Materialise the S3 client and connection pool. + await store.set("probe", cpu.Buffer.from_bytes(b"x")) + + store.close() + + assert not store._is_open + + def test_direct_construction_does_not_own_filesystem(self) -> None: + """Direct FsspecStore() must not claim ownership — the caller owns the fs.""" + try: + from fsspec import url_to_fs + except ImportError: + from fsspec.core import url_to_fs + fs, path = url_to_fs( + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True + ) + store = FsspecStore(fs=fs, path=path) + assert not store._owns_fs + + @pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.03.01"), + reason="Prior bug in from_upath", + ) + def test_from_upath_does_not_own_filesystem(self) -> None: + """from_upath() uses the UPath's existing fs; the store must not own it.""" + upath = pytest.importorskip("upath") + path = upath.UPath( + f"s3://{test_bucket_name}/foo/bar/", + endpoint_url=endpoint_url, + anon=False, + asynchronous=True, + ) + store = FsspecStore.from_upath(path) + assert not store._owns_fs + + def test_from_mapper_does_not_own_already_async_filesystem(self) -> None: + """from_mapper() with an already-async fs must not claim ownership.""" + s3_filesystem = s3fs.S3FileSystem( + asynchronous=True, + endpoint_url=endpoint_url, + anon=False, + skip_instance_cache=True, + ) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/") + store = FsspecStore.from_mapper(mapper) + # _make_async returns the same instance for an already-async fs. + assert not store._owns_fs + def array_roundtrip(store: FsspecStore) -> None: """ @@ -512,6 +583,57 @@ def test_open_s3map_raises() -> None: zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) +async def test_close_fs_closes_s3_client() -> None: + """ + _close_fs() must call set_session() and then close() on the returned + S3 client. This is verified with mocks to avoid a real S3 connection. + """ + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_client = AsyncMock() + mock_fs = AsyncMock() + mock_fs.set_session = AsyncMock(return_value=mock_client) + + await _close_fs(mock_fs) + + mock_fs.set_session.assert_called_once() + mock_client.close.assert_called_once() + + +async def test_close_fs_no_op_for_fs_without_set_session() -> None: + """_close_fs() must be a no-op for filesystems that don't expose set_session().""" + from unittest.mock import AsyncMock + + from zarr.storage._fsspec import _close_fs + + mock_fs = AsyncMock(spec=[]) # empty spec — no set_session attribute + await _close_fs(mock_fs) # must not raise + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_from_mapper_owns_wrapped_sync_filesystem(tmp_path: pathlib.Path) -> None: + """ + from_mapper() with a sync fs must wrap it in AsyncFileSystemWrapper and + claim ownership so that close() cleans it up. + + The local filesystem is synchronous; _make_async() produces a new + AsyncFileSystemWrapper instance — a different object from the original fs. + """ + import fsspec as _fsspec + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + fs = _fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(str(tmp_path)) + store = FsspecStore.from_mapper(mapper) + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store._owns_fs + + @pytest.mark.parametrize("asynchronous", [True, False]) def test_make_async(asynchronous: bool) -> None: s3_filesystem = s3fs.S3FileSystem(