Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -798,10 +798,20 @@ async def _parse_request_file(cls, file_path: Path) -> Request | None:
return None

async def _discover_existing_requests(self) -> None:
"""Discover and load existing requests into the state when opening an existing request queue."""
"""Discover and load existing requests into the state when opening an existing request queue.

On recovery after a crash, any requests that were previously in-progress are reclaimed as pending,
since there is no active processing after a restart.
"""
request_files = await self._get_request_files(self.path_to_rq)
state = self._state.current_value

if state.in_progress_requests:
logger.info(
f'Reclaiming {len(state.in_progress_requests)} in-progress request(s) from previous run.',
)
state.in_progress_requests.clear()

for request_file in request_files:
request = await self._parse_request_file(request_file)
if request is None:
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/storage_clients/_file_system/test_fs_rq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,51 @@ async def test_data_persistence_across_reopens() -> None:
await reopened_client.drop()


async def test_in_progress_requests_recovered_after_crash() -> None:
"""Test that requests left in-progress at crash time are recovered as pending on reopen.

Simulates a crash: requests are added, one is fetched (in-progress), state is persisted,
then the queue is reopened. The in-progress request should be available for fetching again.
"""
storage_client = FileSystemStorageClient()

# Create RQ and add requests.
original_client = await storage_client.create_rq_client(name='crash-recovery-test')

test_requests = [
Request.from_url('https://example.com/1'),
Request.from_url('https://example.com/2'),
Request.from_url('https://example.com/3'),
]
await original_client.add_batch_of_requests(test_requests)

# Fetch one request, putting it in-progress (simulating work before crash).
fetched = await original_client.fetch_next_request()
assert fetched is not None

# Persist state explicitly (simulating what happens periodically or at crash boundary).
await original_client._state.persist_state()

rq_id = (await original_client.get_metadata()).id

# Simulate crash: reopen the queue without calling mark_request_as_handled or reclaim_request.
reopened_client = await storage_client.create_rq_client(id=rq_id)

# All 3 requests should be fetchable (the in-progress one should have been reclaimed).
fetched_urls = set()
for _ in range(3):
req = await reopened_client.fetch_next_request()
assert req is not None, f'Expected 3 fetchable requests, only got {len(fetched_urls)}'
fetched_urls.add(req.url)

assert fetched_urls == {'https://example.com/1', 'https://example.com/2', 'https://example.com/3'}

# No more requests should be available.
assert await reopened_client.fetch_next_request() is None

await reopened_client.drop()


async def test_get_request_does_not_mark_in_progress(rq_client: FileSystemRequestQueueClient) -> None:
"""Test that get_request does not block a request from being fetched."""
request = Request.from_url('https://example.com/blocked')
Expand Down
Loading