diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 23200ff05c..4954d8a4d2 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -798,10 +798,20 @@ async def _parse_request_file(cls, file_path: Path) -> Request | None: return None async def _discover_existing_requests(self) -> None: - """Discover and load existing requests into the state when opening an existing request queue.""" + """Discover and load existing requests into the state when opening an existing request queue. + + On recovery after a crash, any requests that were previously in-progress are reclaimed as pending, + since there is no active processing after a restart. + """ request_files = await self._get_request_files(self.path_to_rq) state = self._state.current_value + if state.in_progress_requests: + logger.info( + f'Reclaiming {len(state.in_progress_requests)} in-progress request(s) from previous run.', + ) + state.in_progress_requests.clear() + for request_file in request_files: request = await self._parse_request_file(request_file) if request is None: diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index a18d4813a4..275665d9d5 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -175,6 +175,51 @@ async def test_data_persistence_across_reopens() -> None: await reopened_client.drop() +async def test_in_progress_requests_recovered_after_crash() -> None: + """Test that requests left in-progress at crash time are recovered as pending on reopen. + + Simulates a crash: requests are added, one is fetched (in-progress), state is persisted, + then the queue is reopened. The in-progress request should be available for fetching again. + """ + storage_client = FileSystemStorageClient() + + # Create RQ and add requests. + original_client = await storage_client.create_rq_client(name='crash-recovery-test') + + test_requests = [ + Request.from_url('https://example.com/1'), + Request.from_url('https://example.com/2'), + Request.from_url('https://example.com/3'), + ] + await original_client.add_batch_of_requests(test_requests) + + # Fetch one request, putting it in-progress (simulating work before crash). + fetched = await original_client.fetch_next_request() + assert fetched is not None + + # Persist state explicitly (simulating what happens periodically or at crash boundary). + await original_client._state.persist_state() + + rq_id = (await original_client.get_metadata()).id + + # Simulate crash: reopen the queue without calling mark_request_as_handled or reclaim_request. + reopened_client = await storage_client.create_rq_client(id=rq_id) + + # All 3 requests should be fetchable (the in-progress one should have been reclaimed). + fetched_urls = set() + for _ in range(3): + req = await reopened_client.fetch_next_request() + assert req is not None, f'Expected 3 fetchable requests, only got {len(fetched_urls)}' + fetched_urls.add(req.url) + + assert fetched_urls == {'https://example.com/1', 'https://example.com/2', 'https://example.com/3'} + + # No more requests should be available. + assert await reopened_client.fetch_next_request() is None + + await reopened_client.drop() + + async def test_get_request_does_not_mark_in_progress(rq_client: FileSystemRequestQueueClient) -> None: """Test that get_request does not block a request from being fetched.""" request = Request.from_url('https://example.com/blocked')