Skip to content

Commit b2da3b8

Browse files
refactor(worker): remove redis-expiry finisher reconstruction fallback
Drop the started-upload database fallback path that was only needed for Redis-state expiry recovery and simplify reconstruction to rely on merge-ready processing state directly. Made-with: Cursor
1 parent 6ca4287 commit b2da3b8

File tree

2 files changed

+6
-166
lines changed

2 files changed

+6
-166
lines changed

apps/worker/tasks/tests/unit/test_upload_finisher_task.py

Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from helpers.exceptions import RepositoryWithoutValidBotError
1717
from helpers.log_context import LogContext, set_log_context
1818
from services.lock_manager import LockRetry
19-
from services.processing.intermediate import intermediate_report_key
2019
from services.processing.merging import get_joined_flag, update_uploads
2120
from services.processing.types import MergeResult, ProcessingResult
2221
from services.timeseries import MeasurementName
@@ -1259,105 +1258,18 @@ def test_idempotency_check_proceeds_when_uploads_not_finished(
12591258
# Verify that _process_reports_with_lock WAS called
12601259
mock_process.assert_called_once()
12611260

1262-
@pytest.mark.django_db
1263-
def test_reconstruct_processing_results_falls_back_to_database_when_redis_expires(
1264-
self,
1265-
dbsession,
1266-
mocker,
1267-
mock_storage,
1268-
mock_repo_provider,
1269-
mock_redis,
1270-
mock_self_app,
1271-
):
1272-
"""Test that finisher falls back to database when Redis ProcessingState expires.
1273-
1274-
This tests the edge case where Redis keys expire after 24h TTL, but uploads
1275-
were processed and have intermediate reports. The finisher should find them
1276-
via database query and include them in the final report.
1277-
"""
1278-
commit = CommitFactory.create()
1279-
dbsession.add(commit)
1280-
dbsession.flush()
1281-
1282-
report = CommitReport(commit_id=commit.id_)
1283-
dbsession.add(report)
1284-
dbsession.flush()
1285-
1286-
# Create uploads in "started" state (simulating Redis state expired)
1287-
upload_1 = UploadFactory.create(
1288-
report=report, state="started", state_id=UploadState.UPLOADED.db_id
1289-
)
1290-
upload_2 = UploadFactory.create(
1291-
report=report, state="started", state_id=UploadState.UPLOADED.db_id
1292-
)
1293-
dbsession.add(upload_1)
1294-
dbsession.add(upload_2)
1295-
dbsession.flush()
1296-
1297-
# Mock Redis to simulate intermediate reports exist (confirms uploads were processed)
1298-
mock_redis.exists.side_effect = lambda key: (
1299-
key == intermediate_report_key(upload_1.id)
1300-
or key == intermediate_report_key(upload_2.id)
1301-
)
1302-
1303-
# Mock ProcessingState to return empty (simulating Redis expiration)
1304-
mock_state = mocker.MagicMock()
1305-
mock_state.get_uploads_for_merging.return_value = set() # Redis expired
1306-
mock_state.get_upload_numbers.return_value = mocker.MagicMock(
1307-
processing=0, processed=0
1308-
)
1309-
mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state)
1310-
1311-
# Mock the processing methods
1312-
mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[])
1313-
mocker.patch("tasks.upload_finisher.update_uploads")
1314-
mock_process = mocker.patch.object(
1315-
UploadFinisherTask, "_process_reports_with_lock"
1316-
)
1317-
1318-
# Call run_impl without processing_results to trigger reconstruction
1319-
task = UploadFinisherTask()
1320-
task.run_impl(
1321-
dbsession,
1322-
processing_results=None, # Triggers reconstruction
1323-
repoid=commit.repoid,
1324-
commitid=commit.commitid,
1325-
commit_yaml={},
1326-
)
1327-
1328-
# Verify that _find_started_uploads_with_reports was called (via reconstruction)
1329-
# This is verified by checking that _process_reports_with_lock was called
1330-
# with processing_results containing our uploads
1331-
mock_process.assert_called_once()
1332-
call_args = mock_process.call_args
1333-
# processing_results is the 4th positional argument (index 0 is args tuple)
1334-
processing_results = call_args[0][3]
1335-
1336-
# Verify both uploads are included in processing_results
1337-
upload_ids_in_results = {r["upload_id"] for r in processing_results}
1338-
assert upload_1.id in upload_ids_in_results
1339-
assert upload_2.id in upload_ids_in_results
1340-
assert len(processing_results) == 2
1341-
1342-
# Verify both are marked as successful (have intermediate reports)
1343-
assert all(r["successful"] for r in processing_results)
1344-
13451261
@pytest.mark.django_db
13461262
def test_reconstruct_processing_results_returns_empty_when_no_uploads_found(
13471263
self, dbsession, mocker, mock_redis, mock_self_app
13481264
):
1349-
"""Test that finisher returns empty list when no uploads found in Redis or DB.
1350-
1351-
This tests the edge case where Redis expires AND no uploads exist in database
1352-
in "started" state with intermediate reports.
1353-
"""
1265+
"""Test that finisher returns empty list when no uploads are merge-ready."""
13541266
commit = CommitFactory.create()
13551267
dbsession.add(commit)
13561268
dbsession.flush()
13571269

1358-
# Mock ProcessingState to return empty (simulating Redis expiration)
1270+
# Mock ProcessingState to return empty merge-ready set.
13591271
mock_state = mocker.MagicMock()
1360-
mock_state.get_uploads_for_merging.return_value = set() # Redis expired
1272+
mock_state.get_uploads_for_merging.return_value = set()
13611273
mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state)
13621274

13631275
# Call run_impl without processing_results to trigger reconstruction

apps/worker/tasks/upload_finisher.py

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -157,93 +157,21 @@ def _count_remaining_coverage_uploads(self, db_session, commit: Commit) -> int:
157157
.count()
158158
)
159159

160-
def _find_started_uploads_with_reports(
161-
self, db_session, commit: Commit
162-
) -> set[int]:
163-
"""Find uploads in "started" state that have intermediate reports in Redis.
164-
165-
This is the fallback when Redis ProcessingState has expired (TTL: PROCESSING_STATE_TTL).
166-
We check the database for uploads that were processed but never finalized,
167-
and verify they have intermediate reports before including them.
168-
"""
169-
# Query for uploads in "started" state for this commit
170-
started_uploads = (
171-
db_session.query(Upload)
172-
.join(Upload.report)
173-
.filter(
174-
Upload.report.has(commit=commit),
175-
Upload.state == "started",
176-
Upload.state_id == UploadState.UPLOADED.db_id,
177-
)
178-
.all()
179-
)
180-
181-
if not started_uploads:
182-
return set()
183-
184-
log.info(
185-
"Found uploads in started state, checking for intermediate reports",
186-
extra={
187-
"upload_ids": [u.id_ for u in started_uploads],
188-
"count": len(started_uploads),
189-
},
190-
)
191-
192-
# Check which uploads have intermediate reports (confirms they were processed)
193-
redis_connection = get_redis_connection()
194-
upload_ids_with_reports = set()
195-
196-
for upload in started_uploads:
197-
report_key = intermediate_report_key(upload.id_)
198-
if redis_connection.exists(report_key):
199-
upload_ids_with_reports.add(upload.id_)
200-
else:
201-
log.warning(
202-
"Upload in started state but no intermediate report found (may have expired)",
203-
extra={"upload_id": upload.id_},
204-
)
205-
206-
return upload_ids_with_reports
207-
208160
def _reconstruct_processing_results(
209161
self, db_session, state: ProcessingState, commit: Commit
210162
) -> list[ProcessingResult]:
211163
"""Reconstruct processing_results from ProcessingState when finisher is triggered
212164
outside of a chord (e.g., from orphaned upload recovery).
213165
214-
This ensures ALL uploads that were marked as processed in Redis are included
166+
This ensures all uploads marked as processed in state tracking are included
215167
in the final merged report, even if they completed via retry/recovery.
216-
217-
If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database
218-
to find uploads in "started" state that have intermediate reports, preventing data loss.
219168
"""
220169

221-
# Get all upload IDs that are ready to be merged (in "processed" set)
170+
# Get all upload IDs that are ready to be merged.
222171
upload_ids = state.get_uploads_for_merging()
223172

224173
if not upload_ids:
225-
log.warning(
226-
"No uploads found in Redis processed set, checking database for started uploads",
227-
extra={"repoid": commit.repoid, "commitid": commit.commitid},
228-
)
229-
# Fallback: Redis state expired (TTL: PROCESSING_STATE_TTL), check DB for uploads
230-
# in "started" state that might have been processed but never finalized
231-
upload_ids = self._find_started_uploads_with_reports(db_session, commit)
232-
233-
if not upload_ids:
234-
log.warning(
235-
"No started uploads with intermediate reports found in database",
236-
extra={"repoid": commit.repoid, "commitid": commit.commitid},
237-
)
238-
return []
239-
240-
log.info(
241-
"Found started uploads with intermediate reports (Redis state expired)",
242-
extra={
243-
"upload_ids": list(upload_ids),
244-
"count": len(upload_ids),
245-
},
246-
)
174+
return []
247175

248176
log.info(
249177
"Reconstructing processing results from ProcessingState",

0 commit comments

Comments
 (0)