From bb4dc932705502ab30c4896f3dd98a771883f54d Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 05:47:21 +0900 Subject: [PATCH 1/4] fix(worker): use finite blocking_timeout and refresh DB after lock acquisition Replace blocking_timeout=None with DEFAULT_BLOCKING_TIMEOUT_SECONDS (5s) in both LockManager instantiations in upload_finisher. This activates the existing LockRetry backoff mechanism that was dead code -- tasks now fail fast and retry via Celery instead of blocking worker threads indefinitely (up to the 600s soft time limit). Remove max_retries from locked() calls to disable the shared Redis counter that incorrectly kills first-attempt tasks when concurrent tasks fail. Per-task retry limiting via self._has_exceeded_max_attempts() remains as the sole guard. Add db_session.rollback() after each lock acquisition to force SQLAlchemy to release potentially stale connections before any DB operations. This fixes psycopg2.OperationalError crashes from idle connections that go stale during the lock wait. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 144 ++++++++++++++++++ apps/worker/tasks/upload_finisher.py | 9 +- 2 files changed, 149 insertions(+), 4 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 31f806f182..bcbed5a041 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -21,6 +21,7 @@ from services.processing.types import MergeResult, ProcessingResult from services.timeseries import MeasurementName from shared.celery_config import ( + DEFAULT_BLOCKING_TIMEOUT_SECONDS, compute_comparison_task_name, notify_task_name, pulls_task_name, @@ -1376,6 +1377,149 @@ def mock_process_reports( mock_handle_finisher_lock.assert_called_once() +class TestLockManagerConfiguration: + """Tests for lock manager configuration: finite blocking_timeout, + no shared max_retries counter, and db_session.rollback() after lock acquisition.""" + + @pytest.mark.django_db + def test_lock_manager_uses_finite_blocking_timeout( + self, dbsession, mocker, mock_redis, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager") + lock_manager_cls.return_value.locked.return_value.__enter__ = mocker.MagicMock() + lock_manager_cls.return_value.locked.return_value.__exit__ = mocker.MagicMock( + return_value=False + ) + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) + mocker.patch("tasks.upload_finisher.update_uploads") + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mock_redis.scard.return_value = 0 + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + first_call = lock_manager_cls.call_args_list[0] + assert first_call.kwargs["blocking_timeout"] == DEFAULT_BLOCKING_TIMEOUT_SECONDS + + @pytest.mark.django_db + def test_locked_called_without_max_retries( + self, dbsession, mocker, mock_redis, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager") + mock_locked = lock_manager_cls.return_value.locked + mock_locked.return_value.__enter__ = mocker.MagicMock() + mock_locked.return_value.__exit__ = mocker.MagicMock(return_value=False) + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) + mocker.patch("tasks.upload_finisher.update_uploads") + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mock_redis.scard.return_value = 0 + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + locked_call = mock_locked.call_args + assert ( + "max_retries" not in locked_call.kwargs + or locked_call.kwargs.get("max_retries") is None + ) + + @pytest.mark.django_db + def test_db_session_rollback_called_after_lock_acquired( + self, dbsession, mocker, mock_redis, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + rollback_calls = [] + original_rollback = dbsession.rollback + + def tracking_rollback(): + rollback_calls.append("rollback") + return original_rollback() + + mocker.patch.object(dbsession, "rollback", side_effect=tracking_rollback) + mock_redis.scard.return_value = 0 + mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) + mocker.patch("tasks.upload_finisher.update_uploads") + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert len(rollback_calls) >= 1 + + @pytest.mark.django_db + def test_per_task_retry_limit_still_enforced( + self, dbsession, mocker, mock_redis, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + m = mocker.MagicMock() + m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60) + mocker.patch("tasks.upload_finisher.LockManager", m) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {"attempts": 10} + + result = task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result is None + + class TestCommitRefreshAfterLock: """Tests for CCMRG-2028: commit must be refreshed after acquiring lock.""" diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 498b6f104b..97dd9bf31b 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -32,6 +32,7 @@ from services.timeseries import repository_datasets_query from services.yaml import read_yaml_field from shared.celery_config import ( + DEFAULT_BLOCKING_TIMEOUT_SECONDS, DEFAULT_LOCK_TIMEOUT_SECONDS, UPLOAD_PROCESSOR_MAX_RETRIES, compute_comparison_task_name, @@ -418,15 +419,15 @@ def _process_reports_with_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=None, + blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, ) try: with lock_manager.locked( LockType.UPLOAD_PROCESSING, - max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, retry_num=self.attempts, ): + db_session.rollback() db_session.refresh(commit) report_service = ReportService(commit_yaml) @@ -517,15 +518,15 @@ def _handle_finisher_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=None, + blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, ) try: with lock_manager.locked( LockType.UPLOAD_FINISHER, - max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, retry_num=self.attempts, ): + db_session.rollback() result = self.finish_reports_processing( db_session, commit, commit_yaml, processing_results ) From 8d230f558757d91d339ca9694a3458f03d17a0ba Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 06:01:21 +0900 Subject: [PATCH 2/4] fix(worker): remove db_session.rollback() that breaks session state rollback() expunges objects from the session, causing "Instance is not persistent within this Session" errors on the subsequent refresh/query calls. With blocking_timeout=5s, connections won't go stale during the short lock wait, so the rollback is unnecessary. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 42 +------------------ apps/worker/tasks/upload_finisher.py | 2 - 2 files changed, 2 insertions(+), 42 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index bcbed5a041..fabfa6816c 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1378,8 +1378,8 @@ def mock_process_reports( class TestLockManagerConfiguration: - """Tests for lock manager configuration: finite blocking_timeout, - no shared max_retries counter, and db_session.rollback() after lock acquisition.""" + """Tests for lock manager configuration: finite blocking_timeout + and no shared max_retries counter.""" @pytest.mark.django_db def test_lock_manager_uses_finite_blocking_timeout( @@ -1455,44 +1455,6 @@ def test_locked_called_without_max_retries( or locked_call.kwargs.get("max_retries") is None ) - @pytest.mark.django_db - def test_db_session_rollback_called_after_lock_acquired( - self, dbsession, mocker, mock_redis, mock_self_app - ): - commit = CommitFactory.create() - dbsession.add(commit) - dbsession.flush() - - rollback_calls = [] - original_rollback = dbsession.rollback - - def tracking_rollback(): - rollback_calls.append("rollback") - return original_rollback() - - mocker.patch.object(dbsession, "rollback", side_effect=tracking_rollback) - mock_redis.scard.return_value = 0 - mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) - mocker.patch("tasks.upload_finisher.update_uploads") - mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") - mocker.patch.object( - UploadFinisherTask, "_handle_finisher_lock", return_value={} - ) - - task = UploadFinisherTask() - task.request.retries = 0 - task.request.headers = {} - - task.run_impl( - dbsession, - [{"upload_id": 0, "successful": True, "arguments": {}}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - ) - - assert len(rollback_calls) >= 1 - @pytest.mark.django_db def test_per_task_retry_limit_still_enforced( self, dbsession, mocker, mock_redis, mock_self_app diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 97dd9bf31b..ce1a5dd98c 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -427,7 +427,6 @@ def _process_reports_with_lock( LockType.UPLOAD_PROCESSING, retry_num=self.attempts, ): - db_session.rollback() db_session.refresh(commit) report_service = ReportService(commit_yaml) @@ -526,7 +525,6 @@ def _handle_finisher_lock( LockType.UPLOAD_FINISHER, retry_num=self.attempts, ): - db_session.rollback() result = self.finish_reports_processing( db_session, commit, commit_yaml, processing_results ) From 6ab96fb0f9a4257f36b903724cef1b62b8fcb470 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 06:05:45 +0900 Subject: [PATCH 3/4] fix(worker): tune finisher lock to blocking_timeout=30s, retry_countdown=10s The default 5s blocking_timeout is shorter than typical lock hold times (20-60s for large repos), causing unnecessary retry bounces that add 100-200s latency. 30s lets the next waiter pick up the lock immediately in the common case. The default 200s base_retry_countdown is far too slow when the lock is genuinely contested. 10s matches bundle-analysis settings and keeps recovery fast. Made-with: Cursor --- .../tasks/tests/unit/test_upload_finisher_task.py | 11 +++++++++-- apps/worker/tasks/upload_finisher.py | 10 +++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index fabfa6816c..308462e686 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -21,7 +21,6 @@ from services.processing.types import MergeResult, ProcessingResult from services.timeseries import MeasurementName from shared.celery_config import ( - DEFAULT_BLOCKING_TIMEOUT_SECONDS, compute_comparison_task_name, notify_task_name, pulls_task_name, @@ -38,6 +37,8 @@ from shared.torngit.exceptions import TorngitObjectNotFoundError from shared.yaml import UserYaml from tasks.upload_finisher import ( + FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, + FINISHER_BLOCKING_TIMEOUT_SECONDS, ReportService, ShouldCallNotifyResult, UploadFinisherTask, @@ -1415,7 +1416,13 @@ def test_lock_manager_uses_finite_blocking_timeout( ) first_call = lock_manager_cls.call_args_list[0] - assert first_call.kwargs["blocking_timeout"] == DEFAULT_BLOCKING_TIMEOUT_SECONDS + assert ( + first_call.kwargs["blocking_timeout"] == FINISHER_BLOCKING_TIMEOUT_SECONDS + ) + assert ( + first_call.kwargs["base_retry_countdown"] + == FINISHER_BASE_RETRY_COUNTDOWN_SECONDS + ) @pytest.mark.django_db def test_locked_called_without_max_retries( diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index ce1a5dd98c..c536726d13 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -32,7 +32,6 @@ from services.timeseries import repository_datasets_query from services.yaml import read_yaml_field from shared.celery_config import ( - DEFAULT_BLOCKING_TIMEOUT_SECONDS, DEFAULT_LOCK_TIMEOUT_SECONDS, UPLOAD_PROCESSOR_MAX_RETRIES, compute_comparison_task_name, @@ -54,6 +53,9 @@ log = logging.getLogger(__name__) +FINISHER_BLOCKING_TIMEOUT_SECONDS = 30 +FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10 + UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER = Counter( "upload_finisher_already_completed", "Number of times finisher skipped work because uploads were already in final state", @@ -419,7 +421,8 @@ def _process_reports_with_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + blocking_timeout=FINISHER_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, ) try: @@ -517,7 +520,8 @@ def _handle_finisher_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, + blocking_timeout=FINISHER_BLOCKING_TIMEOUT_SECONDS, + base_retry_countdown=FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, ) try: From c8fdaf1ad5e238c9bf3e6fe6e1f08956ffd0abc6 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 07:37:36 +0900 Subject: [PATCH 4/4] fix(worker): recover stale DB connections after lock acquisition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After waiting up to 30s for a Redis lock, PgBouncer may have closed the idle DB connection. Add _ping_db() that does SELECT 1 after acquiring the lock — if the connection is dead, rollback discards it and the next query gets a fresh one from the pool. Unlike the previous db_session.rollback() approach, this only rolls back when the connection is actually stale, so it never expunges healthy session objects. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 32 +++++++++++++++++++ apps/worker/tasks/upload_finisher.py | 18 +++++++++++ 2 files changed, 50 insertions(+) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 308462e686..4a13359495 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -4,6 +4,7 @@ import pytest from celery.exceptions import Retry, SoftTimeLimitExceeded +from sqlalchemy.exc import OperationalError from celery_config import notify_error_task_name from database.enums import ReportType @@ -1462,6 +1463,37 @@ def test_locked_called_without_max_retries( or locked_call.kwargs.get("max_retries") is None ) + @pytest.mark.django_db + def test_ping_db_recovers_stale_connection(self, dbsession, mocker): + """_ping_db rolls back on OperationalError so the next query gets a fresh connection.""" + execute_call_count = 0 + original_execute = dbsession.execute + + def failing_execute(*args, **kwargs): + nonlocal execute_call_count + execute_call_count += 1 + if execute_call_count == 1: + raise OperationalError("SELECT 1", {}, Exception("connection closed")) + return original_execute(*args, **kwargs) + + mocker.patch.object(dbsession, "execute", side_effect=failing_execute) + mock_rollback = mocker.patch.object(dbsession, "rollback") + + task = UploadFinisherTask() + task._ping_db(dbsession) + + mock_rollback.assert_called_once() + + @pytest.mark.django_db + def test_ping_db_noop_on_healthy_connection(self, dbsession, mocker): + """_ping_db does nothing when the connection is alive.""" + mock_rollback = mocker.patch.object(dbsession, "rollback") + + task = UploadFinisherTask() + task._ping_db(dbsession) + + mock_rollback.assert_not_called() + @pytest.mark.django_db def test_per_task_retry_limit_still_enforced( self, dbsession, mocker, mock_redis, mock_self_app diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index c536726d13..cdc4e2f64b 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -6,6 +6,8 @@ import sentry_sdk from asgiref.sync import async_to_sync from celery.exceptions import Retry, SoftTimeLimitExceeded +from sqlalchemy import text +from sqlalchemy.exc import OperationalError from app import celery_app from celery_config import notify_error_task_name @@ -86,6 +88,20 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES + @staticmethod + def _ping_db(db_session): + """Recover from stale DB connections after waiting for a Redis lock. + + PgBouncer or the DB server may close idle connections while the task + blocks on lock acquisition (up to FINISHER_BLOCKING_TIMEOUT_SECONDS). + A lightweight SELECT 1 will surface a dead connection; rollback then + discards it so the next query gets a fresh one from the pool. + """ + try: + db_session.execute(text("SELECT 1")) + except OperationalError: + db_session.rollback() + def _find_started_uploads_with_reports( self, db_session, commit: Commit ) -> set[int]: @@ -430,6 +446,7 @@ def _process_reports_with_lock( LockType.UPLOAD_PROCESSING, retry_num=self.attempts, ): + self._ping_db(db_session) db_session.refresh(commit) report_service = ReportService(commit_yaml) @@ -529,6 +546,7 @@ def _handle_finisher_lock( LockType.UPLOAD_FINISHER, retry_num=self.attempts, ): + self._ping_db(db_session) result = self.finish_reports_processing( db_session, commit, commit_yaml, processing_results )