From da77d07ce4fbdd032d92a06c655c3f272896d82d Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Fri, 27 Feb 2026 10:06:34 +0900 Subject: [PATCH 1/2] feat(worker): cooperative finisher merging from Redis processed set Instead of each finisher task only merging its own chord's 1-2 uploads, the lock holder now queries the full Redis "processed" set and merges ALL pending uploads in one lock acquisition. This means one finisher does the work of many, and redundant finishers exit immediately. Changes: - Add get_all_processed_uploads() to ProcessingState (unbounded smembers) - _process_reports_with_lock: after acquiring lock, reconstruct processing_results from Redis instead of using chord arguments - Early exit in run_impl when Redis processed set is empty (another finisher already merged our uploads) - Reduce base_retry_countdown to 30s (from 200s default) so retried finishers discover "nothing to merge" sooner and exit faster Made-with: Cursor --- apps/worker/services/processing/state.py | 8 + .../services/tests/test_processing_state.py | 15 ++ .../tests/unit/test_upload_finisher_task.py | 139 ++++++++++++++++++ apps/worker/tasks/upload_finisher.py | 89 ++++++----- 4 files changed, 216 insertions(+), 35 deletions(-) diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 4a1c7d973b..16134a1b06 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -134,5 +134,13 @@ def get_uploads_for_merging(self) -> set[int]: ) } + def get_all_processed_uploads(self) -> set[int]: + """Return ALL upload IDs in the 'processed' set (no batch limit). + + Used by the cooperative finisher to merge every pending upload in one + lock acquisition instead of processing them in MERGE_BATCH_SIZE chunks. + """ + return {int(id) for id in self._redis.smembers(self._redis_key("processed"))} + def _redis_key(self, state: str) -> str: return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 0416e49a63..e37dd01a62 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -74,6 +74,21 @@ def test_batch_merging_many_uploads(): assert should_trigger_postprocessing(state.get_upload_numbers()) +def test_get_all_processed_uploads_returns_full_set(): + """get_all_processed_uploads returns every upload without the MERGE_BATCH_SIZE cap.""" + state = ProcessingState(1234, uuid4().hex) + ids = list(range(1, 25)) + state.mark_uploads_as_processing(ids) + for i in ids: + state.mark_upload_as_processed(i) + + # get_uploads_for_merging is capped at MERGE_BATCH_SIZE (10) + assert len(state.get_uploads_for_merging()) <= 10 + + # get_all_processed_uploads returns everything + assert state.get_all_processed_uploads() == set(ids) + + class TestProcessingStateEmptyListGuards: """Tests for empty list guards in ProcessingState methods.""" diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index c21c580b04..c8f57ceec0 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1414,6 +1414,145 @@ def mock_process_reports( # This is the key assertion - notifications should NOT be blocked by test_results uploads mock_handle_finisher_lock.assert_called_once() + @pytest.mark.django_db + def test_early_exit_when_redis_processed_set_empty( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """When the Redis 'processed' set is empty (another finisher already merged), + run_impl should return early without acquiring the lock.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Redis reports nothing pending + mock_redis.scard.return_value = 0 + + mock_process = mocker.patch.object( + UploadFinisherTask, "_process_reports_with_lock" + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + result = task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result == {"already_completed": True, "upload_ids": [0]} + mock_process.assert_not_called() + + @pytest.mark.django_db + def test_cooperative_merge_uses_all_redis_uploads( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """Inside the lock, the finisher should merge ALL uploads from Redis, + not just the ones from its own chord.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload_1 = UploadFactory.create(report=report, state="started") + upload_2 = UploadFactory.create(report=report, state="started") + upload_3 = UploadFactory.create(report=report, state="started") + dbsession.add_all([upload_1, upload_2, upload_3]) + dbsession.flush() + + # Redis: scard says there are pending uploads (so early exit is skipped) + mock_redis.scard.side_effect = lambda key: (3 if "processed" in key else 0) + # smembers returns ALL three uploads (cooperative: from multiple chords) + mock_redis.smembers.return_value = { + str(upload_1.id).encode(), + str(upload_2.id).encode(), + str(upload_3.id).encode(), + } + # Each upload has an intermediate report + mock_redis.exists.return_value = True + + mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) + mock_perform = mocker.patch( + "tasks.upload_finisher.perform_report_merging", + return_value=Report(), + ) + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + # Chord only knows about upload_1, but cooperative merge should include all 3 + task.run_impl( + dbsession, + [{"upload_id": upload_1.id, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + # perform_report_merging should have received all 3 uploads + mock_perform.assert_called_once() + merged_results = mock_perform.call_args[0][3] + merged_ids = {r["upload_id"] for r in merged_results} + assert merged_ids == {upload_1.id, upload_2.id, upload_3.id} + + @pytest.mark.django_db + def test_cooperative_merge_exits_when_lock_holder_already_merged( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """If a finisher acquires the lock but Redis 'processed' set is now empty + (another finisher raced and merged), it should exit gracefully.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + # Before lock: scard says uploads pending (so early exit is skipped) + # Inside lock: smembers returns empty (another finisher merged them) + call_count = {"n": 0} + + def scard_side_effect(key): + if "processed" in key: + call_count["n"] += 1 + # First call (early exit check): pretend there are uploads + # Second call (inside reconstruct): also return > 0 to be safe + return 1 if call_count["n"] <= 2 else 0 + return 0 + + mock_redis.scard.side_effect = scard_side_effect + mock_redis.smembers.return_value = set() + mock_redis.exists.return_value = False + + mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) + mock_perform = mocker.patch( + "tasks.upload_finisher.perform_report_merging", + ) + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + # perform_report_merging should NOT have been called + mock_perform.assert_not_called() + class TestCommitRefreshAfterLock: """Tests for CCMRG-2028: commit must be refreshed after acquiring lock.""" diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 26ca55c29c..b5dba8cff7 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -135,18 +135,20 @@ def _find_started_uploads_with_reports( def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit ) -> list[ProcessingResult]: - """Reconstruct processing_results from ProcessingState when finisher is triggered - outside of a chord (e.g., from orphaned upload recovery). + """Reconstruct processing_results from the full Redis "processed" set. - This ensures ALL uploads that were marked as processed in Redis are included - in the final merged report, even if they completed via retry/recovery. + This is used both when the finisher is triggered without chord arguments + (orphaned upload recovery) AND inside the lock for cooperative merging, + so every pending upload is included regardless of which chord spawned + this finisher. If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database to find uploads in "started" state that have intermediate reports, preventing data loss. """ - # Get all upload IDs that are ready to be merged (in "processed" set) - upload_ids = state.get_uploads_for_merging() + # Get ALL upload IDs in the "processed" set (no batch limit) so one + # finisher can merge uploads from every pending chord. + upload_ids = state.get_all_processed_uploads() if not upload_ids: log.warning( @@ -282,31 +284,19 @@ def run_impl( upload_ids = [upload["upload_id"] for upload in processing_results] - # Idempotency check: Skip if all uploads are already processed - # This prevents wasted work if multiple finishers are triggered (e.g., from - # visibility timeout re-queuing) or if finisher is manually retried - if upload_ids: - uploads_in_db = ( - db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).all() + # Early exit: if the Redis "processed" set is already empty, a previous + # cooperative finisher merged our uploads. Skip without acquiring the lock. + upload_numbers = state.get_upload_numbers() + if upload_numbers.processed == 0 and upload_numbers.processing == 0: + log.info( + "No pending uploads in Redis, another finisher already merged them", + extra={"upload_ids": upload_ids}, ) - # Only skip if ALL uploads exist in DB and ALL are in final states - if len(uploads_in_db) == len(upload_ids): - all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db - ) - if all_already_processed: - log.info( - "All uploads already in final state, skipping finisher work", - extra={ - "upload_ids": upload_ids, - "states": [u.state for u in uploads_in_db], - }, - ) - inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) - return { - "already_completed": True, - "upload_ids": upload_ids, - } + inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) + return { + "already_completed": True, + "upload_ids": upload_ids, + } try: log.info("run_impl: Processing reports with lock") @@ -408,7 +398,13 @@ def _process_reports_with_lock( upload_ids: list, state: ProcessingState, ): - """Process reports with a lock to prevent concurrent modifications.""" + """Process reports with a lock to prevent concurrent modifications. + + Cooperative merging: once the lock is acquired, this method queries + Redis for ALL pending uploads (not just the chord's batch) so that one + finisher task can merge uploads from every queued finisher, eliminating + redundant lock acquisitions. + """ diff = load_commit_diff(commit, self.name) repoid = commit.repoid commitid = commit.commitid @@ -420,6 +416,7 @@ def _process_reports_with_lock( commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=30, ) try: @@ -429,17 +426,38 @@ def _process_reports_with_lock( retry_num=self.attempts, ): db_session.refresh(commit) + + all_results = self._reconstruct_processing_results( + db_session, state, commit + ) + if not all_results: + log.info( + "run_impl: No pending uploads in Redis after acquiring lock, " + "another finisher already merged them", + ) + return + + all_upload_ids = [r["upload_id"] for r in all_results] + + log.info( + "run_impl: Cooperative merge — processing all pending uploads", + extra={ + "chord_upload_ids": upload_ids, + "all_upload_ids": all_upload_ids, + }, + ) + report_service = ReportService(commit_yaml) log.info("run_impl: Performing report merging") report = perform_report_merging( - report_service, commit_yaml, commit, processing_results + report_service, commit_yaml, commit, all_results ) log.info( "run_impl: Saving combined report", - extra={"processing_results": processing_results}, + extra={"upload_count": len(all_results)}, ) if diff: @@ -452,10 +470,10 @@ def _process_reports_with_lock( db_session.commit() log.info("run_impl: Marking uploads as merged") - state.mark_uploads_as_merged(upload_ids) + state.mark_uploads_as_merged(all_upload_ids) log.info("run_impl: Cleaning up intermediate reports") - cleanup_intermediate_reports(upload_ids) + cleanup_intermediate_reports(all_upload_ids) log.info("run_impl: Finished upload_finisher task") @@ -519,6 +537,7 @@ def _handle_finisher_lock( commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=30, ) try: From 977b1fa0a3391e97a4fc0385b133909c835b9d8e Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 5 Mar 2026 14:37:33 -0800 Subject: [PATCH 2/2] fix(worker): fix cooperative finisher tests and early exit logic - Skip the Redis-based early exit when processing_results were reconstructed from DB (Redis TTL may have expired while uploads still need merging). - Add _setup_mock_redis_for_processing helper so tests exercising the full merge flow correctly bypass the cooperative early exit. - Update _reconstruct_processing_results callers to use get_all_processed_uploads instead of get_uploads_for_merging. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 95 ++++++++++++++----- apps/worker/tasks/upload_finisher.py | 28 +++--- 2 files changed, 85 insertions(+), 38 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index c8f57ceec0..92647ef298 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -66,6 +66,27 @@ def mock_self_app(mocker, celery_app): ) +def _setup_mock_redis_for_processing(mock_redis, upload_ids=None): + """Configure mock_redis so the cooperative early exit check is skipped. + + The early exit checks ``state.get_upload_numbers()`` via ``scard`` and + ``state.get_all_processed_uploads()`` via ``smembers``. This helper makes + those calls indicate that there are pending uploads so tests exercising the + full merge flow aren't short-circuited. + """ + if upload_ids is None: + upload_ids = [0] + + def scard_side_effect(key): + if "processed" in key: + return len(upload_ids) + return 0 + + mock_redis.scard.side_effect = scard_side_effect + mock_redis.smembers.return_value = {str(uid).encode() for uid in upload_ids} + mock_redis.exists.return_value = True + + def _start_upload_flow(mocker): mocker.patch( "helpers.checkpoint_logger._get_milli_timestamp", @@ -173,7 +194,7 @@ def test_upload_finisher_task_call( mock_redis, mock_self_app, ): - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -279,8 +300,10 @@ def test_upload_finisher_task_call_no_author( dbsession, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -349,8 +372,10 @@ def test_upload_finisher_task_call_different_branch( dbsession, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -748,8 +773,15 @@ def test_finish_reports_processing_no_notification( @pytest.mark.django_db def test_upload_finisher_task_calls_save_commit_measurements_task( - self, mocker, dbsession, mock_storage, mock_repo_provider, mock_self_app + self, + mocker, + dbsession, + mock_storage, + mock_repo_provider, + mock_redis, + mock_self_app, ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -834,6 +866,7 @@ def test_lock_manager_uses_finite_blocking_timeout( ): """LockManager must use a finite blocking_timeout so workers are not blocked indefinitely when the lock is held by another task.""" + _setup_mock_redis_for_processing(mock_redis) commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -868,11 +901,11 @@ def test_lock_manager_uses_finite_blocking_timeout( @pytest.mark.django_db def test_retry_on_report_lock(self, dbsession, mocker, mock_redis, mock_self_app): + _setup_mock_redis_for_processing(mock_redis) commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() - # Mock LockManager to raise LockRetry for UPLOAD_PROCESSING lock m = mocker.MagicMock() m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) mocker.patch("tasks.upload_finisher.LockManager", m) @@ -929,7 +962,7 @@ def test_die_on_finisher_lock( mock_redis, mock_self_app, ): - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.perform_report_merging", return_value=Report(), @@ -995,7 +1028,10 @@ def test_die_on_finisher_lock( ) @pytest.mark.django_db - def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): + def test_soft_time_limit_handling( + self, dbsession, mocker, mock_redis, mock_self_app + ): + _setup_mock_redis_for_processing(mock_redis) mocker.patch( "tasks.upload_finisher.load_commit_diff", side_effect=SoftTimeLimitExceeded ) @@ -1030,11 +1066,13 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): ) @pytest.mark.django_db - def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): + def test_generic_exception_handling( + self, dbsession, mocker, mock_redis, mock_self_app + ): """Test that the generic exception handler captures and logs unexpected errors.""" + _setup_mock_redis_for_processing(mock_redis) mock_sentry = mocker.patch("tasks.upload_finisher.sentry_sdk.capture_exception") - # Mock an unexpected error during the _process_reports_with_lock call mocker.patch( "tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock", side_effect=ValueError("Unexpected error occurred"), @@ -1085,16 +1123,15 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): @pytest.mark.django_db def test_idempotency_check_skips_already_processed_uploads( - self, dbsession, mocker, mock_self_app + self, dbsession, mocker, mock_redis, mock_self_app ): - """Test that finisher skips work if all uploads are already in final state. - - This test validates the idempotency check that prevents wasted work when: - - Multiple finishers are triggered (e.g., visibility timeout re-queuing) - - Finisher is manually retried + """Test that finisher skips work when Redis shows no pending uploads. - The check only skips when ALL uploads exist in DB and are in final states. + The cooperative early exit checks Redis: if both 'processed' and + 'processing' sets are empty, a previous finisher already merged + everything, so this finisher returns early. """ + mock_redis.scard.return_value = 0 commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -1139,9 +1176,15 @@ def test_idempotency_check_skips_already_processed_uploads( @pytest.mark.django_db def test_idempotency_check_proceeds_when_uploads_not_finished( - self, dbsession, mocker, mock_storage, mock_repo_provider, mock_self_app + self, + dbsession, + mocker, + mock_storage, + mock_repo_provider, + mock_redis, + mock_self_app, ): - """Test that finisher proceeds normally if uploads are still in 'started' state.""" + """Test that finisher proceeds normally when Redis shows pending uploads.""" mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") @@ -1153,12 +1196,12 @@ def test_idempotency_check_proceeds_when_uploads_not_finished( dbsession.add(report) dbsession.flush() - # Create uploads that are still in "started" state upload_1 = UploadFactory.create(report=report, state="started") dbsession.add(upload_1) dbsession.flush() - # Mock the _process_reports_with_lock to verify it IS called + _setup_mock_redis_for_processing(mock_redis, upload_ids=[upload_1.id]) + mock_process = mocker.patch.object( UploadFinisherTask, "_process_reports_with_lock" ) @@ -1175,7 +1218,6 @@ def test_idempotency_check_proceeds_when_uploads_not_finished( commit_yaml={}, ) - # Verify that _process_reports_with_lock WAS called mock_process.assert_called_once() @pytest.mark.django_db @@ -1222,12 +1264,12 @@ def test_reconstruct_processing_results_falls_back_to_database_when_redis_expire # Mock ProcessingState to return empty (simulating Redis expiration) mock_state = mocker.MagicMock() mock_state.get_uploads_for_merging.return_value = set() # Redis expired + mock_state.get_all_processed_uploads.return_value = set() # Redis expired mock_state.get_upload_numbers.return_value = mocker.MagicMock( processing=0, processed=0 ) mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) - # Mock the processing methods mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") mock_process = mocker.patch.object( @@ -1235,6 +1277,7 @@ def test_reconstruct_processing_results_falls_back_to_database_when_redis_expire ) # Call run_impl without processing_results to trigger reconstruction + # (early exit is skipped for reconstructed results) task = UploadFinisherTask() task.run_impl( dbsession, @@ -1274,16 +1317,13 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found( dbsession.add(commit) dbsession.flush() - # Mock ProcessingState to return empty (simulating Redis expiration) mock_state = mocker.MagicMock() - mock_state.get_uploads_for_merging.return_value = set() # Redis expired + mock_state.get_all_processed_uploads.return_value = set() # Redis expired mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) - # Call run_impl without processing_results to trigger reconstruction task = UploadFinisherTask() result = task._reconstruct_processing_results(dbsession, mock_state, commit) - # Verify empty list returned when no uploads found assert result == [] @pytest.mark.django_db @@ -1293,6 +1333,7 @@ def test_coverage_notifications_not_blocked_by_test_results_uploads( mocker, mock_storage, mock_repo_provider, + mock_redis, mock_self_app, ): """ @@ -1348,6 +1389,8 @@ def test_coverage_notifications_not_blocked_by_test_results_uploads( dbsession.add(test_results_upload) dbsession.flush() + _setup_mock_redis_for_processing(mock_redis, upload_ids=[coverage_upload.id_]) + processing_results = [ {"upload_id": coverage_upload.id_, "successful": True, "arguments": {}}, ] @@ -1577,7 +1620,7 @@ def tracking_refresh(obj): return original_refresh(obj) mocker.patch.object(dbsession, "refresh", side_effect=tracking_refresh) - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") @@ -1620,7 +1663,7 @@ def test_stale_commit_sees_report_after_refresh( # Simulate stale commit by expiring the cached attributes dbsession.expire(commit, ["_report_json", "_report_json_storage_path"]) - mock_redis.scard.return_value = 0 + _setup_mock_redis_for_processing(mock_redis) mocker.patch("tasks.upload_finisher.load_commit_diff", return_value=None) mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index b5dba8cff7..1639fd69b1 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -267,7 +267,8 @@ def run_impl( # If processing_results not provided (e.g., from orphaned upload recovery), # reconstruct it from ProcessingState to ensure ALL uploads are included - if processing_results is None: + reconstructed = processing_results is None + if reconstructed: log.info( "run_impl: processing_results not provided, reconstructing from ProcessingState" ) @@ -286,17 +287,20 @@ def run_impl( # Early exit: if the Redis "processed" set is already empty, a previous # cooperative finisher merged our uploads. Skip without acquiring the lock. - upload_numbers = state.get_upload_numbers() - if upload_numbers.processed == 0 and upload_numbers.processing == 0: - log.info( - "No pending uploads in Redis, another finisher already merged them", - extra={"upload_ids": upload_ids}, - ) - inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) - return { - "already_completed": True, - "upload_ids": upload_ids, - } + # Skip this check when processing_results were reconstructed from DB + # (Redis may have expired but uploads still need merging). + if not reconstructed: + upload_numbers = state.get_upload_numbers() + if upload_numbers.processed == 0 and upload_numbers.processing == 0: + log.info( + "No pending uploads in Redis, another finisher already merged them", + extra={"upload_ids": upload_ids}, + ) + inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) + return { + "already_completed": True, + "upload_ids": upload_ids, + } try: log.info("run_impl: Processing reports with lock")