diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 4a1c7d973b..16134a1b06 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -134,5 +134,13 @@ def get_uploads_for_merging(self) -> set[int]: ) } + def get_all_processed_uploads(self) -> set[int]: + """Return ALL upload IDs in the 'processed' set (no batch limit). + + Used by the cooperative finisher to merge every pending upload in one + lock acquisition instead of processing them in MERGE_BATCH_SIZE chunks. + """ + return {int(id) for id in self._redis.smembers(self._redis_key("processed"))} + def _redis_key(self, state: str) -> str: return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 0416e49a63..e37dd01a62 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -74,6 +74,21 @@ def test_batch_merging_many_uploads(): assert should_trigger_postprocessing(state.get_upload_numbers()) +def test_get_all_processed_uploads_returns_full_set(): + """get_all_processed_uploads returns every upload without the MERGE_BATCH_SIZE cap.""" + state = ProcessingState(1234, uuid4().hex) + ids = list(range(1, 25)) + state.mark_uploads_as_processing(ids) + for i in ids: + state.mark_upload_as_processed(i) + + # get_uploads_for_merging is capped at MERGE_BATCH_SIZE (10) + assert len(state.get_uploads_for_merging()) <= 10 + + # get_all_processed_uploads returns everything + assert state.get_all_processed_uploads() == set(ids) + + class TestProcessingStateEmptyListGuards: """Tests for empty list guards in ProcessingState methods.""" diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index c21c580b04..92647ef298 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -66,6 +66,27 @@ def mock_self_app(mocker, celery_app): ) +def _setup_mock_redis_for_processing(mock_redis, upload_ids=None): + """Configure mock_redis so the cooperative early exit check is skipped. + + The early exit checks ``state.get_upload_numbers()`` via ``scard`` and + ``state.get_all_processed_uploads()`` via ``smembers``. This helper makes + those calls indicate that there are pending uploads so tests exercising the + full merge flow aren't short-circuited. + """ + if upload_ids is None: + upload_ids = [0] + + def scard_side_effect(key): + if "processed" in key: + return len(upload_ids) + return 0 + + mock_redis.scard.side_effect = scard_side_effect + mock_redis.smembers.return_value = {str(uid).encode() for uid in upload_ids} + mock_redis.exists.return_value = True + + def _start_upload_flow(mocker): mocker.patch( "helpers.checkpoint_logger._get_milli_timestamp", @@ -173,7 +194,7 @@ def test_upload_finisher_task_call( mock_redis, mock_self_app, ): - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -279,8 +300,10 @@ def test_upload_finisher_task_call_no_author( dbsession, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -349,8 +372,10 @@ def test_upload_finisher_task_call_different_branch( dbsession, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -748,8 +773,15 @@ def test_finish_reports_processing_no_notification( @pytest.mark.django_db def test_upload_finisher_task_calls_save_commit_measurements_task( - self, mocker, dbsession, mock_storage, mock_repo_provider, mock_self_app + self, + mocker, + dbsession, + mock_storage, + mock_repo_provider, + mock_redis, + mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -834,6 +866,7 @@ def test_lock_manager_uses_finite_blocking_timeout( ): """LockManager must use a finite blocking_timeout so workers are not blocked indefinitely when the lock is held by another task.""" + _setup_mock_redis_for_processing(mock_redis) commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -868,11 +901,11 @@ def test_lock_manager_uses_finite_blocking_timeout( @pytest.mark.django_db def test_retry_on_report_lock(self, dbsession, mocker, mock_redis, mock_self_app): + _setup_mock_redis_for_processing(mock_redis) commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() - # Mock LockManager to raise LockRetry for UPLOAD_PROCESSING lock m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.upload_finisher.LockManager", m) @@ -929,7 +962,7 @@ def test_die_on_finisher_lock( mock_redis, mock_self_app, ): - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -995,7 +1028,10 @@ def test_die_on_finisher_lock( ) @pytest.mark.django_db - def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): + def test_soft_time_limit_handling( + self, dbsession, mocker, mock_redis, mock_self_app + ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.load_commit_diff", side_effect=SoftTimeLimitExceeded ) @@ -1030,11 +1066,13 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): ) @pytest.mark.django_db - def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): + def test_generic_exception_handling( + self, dbsession, mocker, mock_redis, mock_self_app + ): """Test that the generic exception handler captures and logs unexpected errors.""" + _setup_mock_redis_for_processing(mock_redis) mock_sentry = mocker.patch("tasks.upload_finisher.sentry_sdk.capture_exception") - # Mock an unexpected error during the _process_reports_with_lock call mocker.patch( "tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock", side_effect=ValueError("Unexpected error occurred"), @@ -1085,16 +1123,15 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): @pytest.mark.django_db def test_idempotency_check_skips_already_processed_uploads( - self, dbsession, mocker, mock_self_app + self, dbsession, mocker, mock_redis, mock_self_app ): - """Test that finisher skips work if all uploads are already in final state. - - This test validates the idempotency check that prevents wasted work when: - - Multiple finishers are triggered (e.g., visibility timeout re-queuing) - - Finisher is manually retried + """Test that finisher skips work when Redis shows no pending uploads. - The check only skips when ALL uploads exist in DB and are in final states. + The cooperative early exit checks Redis: if both 'processed' and + 'processing' sets are empty, a previous finisher already merged + everything, so this finisher returns early. """ + mock_redis.scard.return_value = 0 commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -1139,9 +1176,15 @@ def test_idempotency_check_skips_already_processed_uploads( @pytest.mark.django_db def test_idempotency_check_proceeds_when_uploads_not_finished( - self, dbsession, mocker, mock_storage, mock_repo_provider, mock_self_app + self, + dbsession, + mocker, + mock_storage, + mock_repo_provider, + mock_redis, + mock_self_app, ): - """Test that finisher proceeds normally if uploads are still in 'started' state.""" + """Test that finisher proceeds normally when Redis shows pending uploads.""" mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") @@ -1153,12 +1196,12 @@ def test_idempotency_check_proceeds_when_uploads_not_finished( dbsession.add(report) dbsession.flush() - # Create uploads that are still in "started" state upload_1 = UploadFactory.create(report=report, state="started") dbsession.add(upload_1) dbsession.flush() - # Mock the _process_reports_with_lock to verify it IS called + _setup_mock_redis_for_processing(mock_redis, upload_ids=[upload_1.id]) + mock_process = mocker.patch.object( UploadFinisherTask, "_process_reports_with_lock" ) @@ -1175,7 +1218,6 @@ def test_idempotency_check_proceeds_when_uploads_not_finished( commit_yaml={}, ) - # Verify that _process_reports_with_lock WAS called mock_process.assert_called_once() @pytest.mark.django_db @@ -1222,12 +1264,12 @@ def test_reconstruct_processing_results_falls_back_to_database_when_redis_expire # Mock ProcessingState to return empty (simulating Redis expiration) mock_state = mocker.MagicMock() mock_state.get_uploads_for_merging.return_value = set() # Redis expired + mock_state.get_all_processed_uploads.return_value = set() # Redis expired mock_state.get_upload_numbers.return_value = mocker.MagicMock( processing=0, processed=0 ) mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) - # Mock the processing methods mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") mock_process = mocker.patch.object( @@ -1235,6 +1277,7 @@ def test_reconstruct_processing_results_falls_back_to_database_when_redis_expire ) # Call run_impl without processing_results to trigger reconstruction + # (early exit is skipped for reconstructed results) task = UploadFinisherTask() task.run_impl( dbsession, @@ -1274,16 +1317,13 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found( dbsession.add(commit) dbsession.flush() - # Mock ProcessingState to return empty (simulating Redis expiration) mock_state = mocker.MagicMock() - mock_state.get_uploads_for_merging.return_value = set() # Redis expired + mock_state.get_all_processed_uploads.return_value = set() # Redis expired mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) - # Call run_impl without processing_results to trigger reconstruction task = UploadFinisherTask() result = task._reconstruct_processing_results(dbsession, mock_state, commit) - # Verify empty list returned when no uploads found assert result == [] @pytest.mark.django_db @@ -1293,6 +1333,7 @@ def test_coverage_notifications_not_blocked_by_test_results_uploads( mocker, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): """ @@ -1348,6 +1389,8 @@ def test_coverage_notifications_not_blocked_by_test_results_uploads( dbsession.add(test_results_upload) dbsession.flush() + _setup_mock_redis_for_processing(mock_redis, upload_ids=[coverage_upload.id_]) + processing_results = [ {"upload_id": coverage_upload.id_, "successful": True, "arguments": {}}, ] @@ -1414,6 +1457,145 @@ def mock_process_reports( # This is the key assertion - notifications should NOT be blocked by test_results uploads mock_handle_finisher_lock.assert_called_once() + @pytest.mark.django_db + def test_early_exit_when_redis_processed_set_empty( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """When the Redis 'processed' set is empty (another finisher already merged), + run_impl should return early without acquiring the lock.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Redis reports nothing pending + mock_redis.scard.return_value = 0 + + mock_process = mocker.patch.object( + UploadFinisherTask, "_process_reports_with_lock" + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + result = task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result == {"already_completed": True, "upload_ids": [0]} + mock_process.assert_not_called() + + @pytest.mark.django_db + def test_cooperative_merge_uses_all_redis_uploads( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """Inside the lock, the finisher should merge ALL uploads from Redis, + not just the ones from its own chord.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload_1 = UploadFactory.create(report=report, state="started") + upload_2 = UploadFactory.create(report=report, state="started") + upload_3 = UploadFactory.create(report=report, state="started") + dbsession.add_all([upload_1, upload_2, upload_3]) + dbsession.flush() + + # Redis: scard says there are pending uploads (so early exit is skipped) + mock_redis.scard.side_effect = lambda key: (3 if "processed" in key else 0) + # smembers returns ALL three uploads (cooperative: from multiple chords) + mock_redis.smembers.return_value = { + str(upload_1.id).encode(), + str(upload_2.id).encode(), + str(upload_3.id).encode(), + } + # Each upload has an intermediate report + mock_redis.exists.return_value = True + + mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) + mock_perform = mocker.patch( + "tasks.upload_finisher.perform_report_merging", + return_value=Report(), + ) + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + # Chord only knows about upload_1, but cooperative merge should include all 3 + task.run_impl( + dbsession, + [{"upload_id": upload_1.id, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + # perform_report_merging should have received all 3 uploads + mock_perform.assert_called_once() + merged_results = mock_perform.call_args[0][3] + merged_ids = {r["upload_id"] for r in merged_results} + assert merged_ids == {upload_1.id, upload_2.id, upload_3.id} + + @pytest.mark.django_db + def test_cooperative_merge_exits_when_lock_holder_already_merged( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """If a finisher acquires the lock but Redis 'processed' set is now empty + (another finisher raced and merged), it should exit gracefully.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Before lock: scard says uploads pending (so early exit is skipped) + # Inside lock: smembers returns empty (another finisher merged them) + call_count = {"n": 0} + + def scard_side_effect(key): + if "processed" in key: + call_count["n"] += 1 + # First call (early exit check): pretend there are uploads + # Second call (inside reconstruct): also return > 0 to be safe + return 1 if call_count["n"] <= 2 else 0 + return 0 + + mock_redis.scard.side_effect = scard_side_effect + mock_redis.smembers.return_value = set() + mock_redis.exists.return_value = False + + mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) + mock_perform = mocker.patch( + "tasks.upload_finisher.perform_report_merging", + ) + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + # perform_report_merging should NOT have been called + mock_perform.assert_not_called() + class TestCommitRefreshAfterLock: """Tests for CCMRG-2028: commit must be refreshed after acquiring lock.""" @@ -1438,7 +1620,7 @@ def tracking_refresh(obj): return original_refresh(obj) mocker.patch.object(dbsession, "refresh", side_effect=tracking_refresh) - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") @@ -1481,7 +1663,7 @@ def test_stale_commit_sees_report_after_refresh( # Simulate stale commit by expiring the cached attributes dbsession.expire(commit, ["_report_json", "_report_json_storage_path"]) - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 26ca55c29c..1639fd69b1 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -135,18 +135,20 @@ def _find_started_uploads_with_reports( def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit ) -> list[ProcessingResult]: - """Reconstruct processing_results from ProcessingState when finisher is triggered - outside of a chord (e.g., from orphaned upload recovery). + """Reconstruct processing_results from the full Redis "processed" set. - This ensures ALL uploads that were marked as processed in Redis are included - in the final merged report, even if they completed via retry/recovery. + This is used both when the finisher is triggered without chord arguments + (orphaned upload recovery) AND inside the lock for cooperative merging, + so every pending upload is included regardless of which chord spawned + this finisher. If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database to find uploads in "started" state that have intermediate reports, preventing data loss. """ - # Get all upload IDs that are ready to be merged (in "processed" set) - upload_ids = state.get_uploads_for_merging() + # Get ALL upload IDs in the "processed" set (no batch limit) so one + # finisher can merge uploads from every pending chord. + upload_ids = state.get_all_processed_uploads() if not upload_ids: log.warning( @@ -265,7 +267,8 @@ def run_impl( # If processing_results not provided (e.g., from orphaned upload recovery), # reconstruct it from ProcessingState to ensure ALL uploads are included - if processing_results is None: + reconstructed = processing_results is None + if reconstructed: log.info( "run_impl: processing_results not provided, reconstructing from ProcessingState" ) @@ -282,31 +285,22 @@ def run_impl( upload_ids = [upload["upload_id"] for upload in processing_results] - # Idempotency check: Skip if all uploads are already processed - # This prevents wasted work if multiple finishers are triggered (e.g., from - # visibility timeout re-queuing) or if finisher is manually retried - if upload_ids: - uploads_in_db = ( - db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).all() - ) - # Only skip if ALL uploads exist in DB and ALL are in final states - if len(uploads_in_db) == len(upload_ids): - all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db + # Early exit: if the Redis "processed" set is already empty, a previous + # cooperative finisher merged our uploads. Skip without acquiring the lock. + # Skip this check when processing_results were reconstructed from DB + # (Redis may have expired but uploads still need merging). + if not reconstructed: + upload_numbers = state.get_upload_numbers() + if upload_numbers.processed == 0 and upload_numbers.processing == 0: + log.info( + "No pending uploads in Redis, another finisher already merged them", + extra={"upload_ids": upload_ids}, ) - if all_already_processed: - log.info( - "All uploads already in final state, skipping finisher work", - extra={ - "upload_ids": upload_ids, - "states": [u.state for u in uploads_in_db], - }, - ) - inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) - return { - "already_completed": True, - "upload_ids": upload_ids, - } + inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) + return { + "already_completed": True, + "upload_ids": upload_ids, + } try: log.info("run_impl: Processing reports with lock") @@ -408,7 +402,13 @@ def _process_reports_with_lock( upload_ids: list, state: ProcessingState, ): - """Process reports with a lock to prevent concurrent modifications.""" + """Process reports with a lock to prevent concurrent modifications. + + Cooperative merging: once the lock is acquired, this method queries + Redis for ALL pending uploads (not just the chord's batch) so that one + finisher task can merge uploads from every queued finisher, eliminating + redundant lock acquisitions. + """ diff = load_commit_diff(commit, self.name) repoid = commit.repoid commitid = commit.commitid @@ -420,6 +420,7 @@ def _process_reports_with_lock( commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=30, ) try: @@ -429,17 +430,38 @@ def _process_reports_with_lock( retry_num=self.attempts, ): db_session.refresh(commit) + + all_results = self._reconstruct_processing_results( + db_session, state, commit + ) + if not all_results: + log.info( + "run_impl: No pending uploads in Redis after acquiring lock, " + "another finisher already merged them", + ) + return + + all_upload_ids = [r["upload_id"] for r in all_results] + + log.info( + "run_impl: Cooperative merge — processing all pending uploads", + extra={ + "chord_upload_ids": upload_ids, + "all_upload_ids": all_upload_ids, + }, + ) + report_service = ReportService(commit_yaml) log.info("run_impl: Performing report merging") report = perform_report_merging( - report_service, commit_yaml, commit, processing_results + report_service, commit_yaml, commit, all_results ) log.info( "run_impl: Saving combined report", - extra={"processing_results": processing_results}, + extra={"upload_count": len(all_results)}, ) if diff: @@ -452,10 +474,10 @@ def _process_reports_with_lock( db_session.commit() log.info("run_impl: Marking uploads as merged") - state.mark_uploads_as_merged(upload_ids) + state.mark_uploads_as_merged(all_upload_ids) log.info("run_impl: Cleaning up intermediate reports") - cleanup_intermediate_reports(upload_ids) + cleanup_intermediate_reports(all_upload_ids) log.info("run_impl: Finished upload_finisher task") @@ -519,6 +541,7 @@ def _handle_finisher_lock( commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=30, ) try: