Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from celery.exceptions import Retry, SoftTimeLimitExceeded
from sqlalchemy.exc import OperationalError

from celery_config import notify_error_task_name
from database.enums import ReportType
Expand Down Expand Up @@ -37,6 +38,8 @@
from shared.torngit.exceptions import TorngitObjectNotFoundError
from shared.yaml import UserYaml
from tasks.upload_finisher import (
FINISHER_BASE_RETRY_COUNTDOWN_SECONDS,
FINISHER_BLOCKING_TIMEOUT_SECONDS,
ReportService,
ShouldCallNotifyResult,
UploadFinisherTask,
Expand Down Expand Up @@ -1376,6 +1379,148 @@ def mock_process_reports(
mock_handle_finisher_lock.assert_called_once()


class TestLockManagerConfiguration:
"""Tests for lock manager configuration: finite blocking_timeout
and no shared max_retries counter."""

@pytest.mark.django_db
def test_lock_manager_uses_finite_blocking_timeout(
self, dbsession, mocker, mock_redis, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager")
lock_manager_cls.return_value.locked.return_value.__enter__ = mocker.MagicMock()
lock_manager_cls.return_value.locked.return_value.__exit__ = mocker.MagicMock(
return_value=False
)
mocker.patch.object(
UploadFinisherTask, "_handle_finisher_lock", return_value={}
)
mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[])
mocker.patch("tasks.upload_finisher.update_uploads")
mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports")
mock_redis.scard.return_value = 0

task = UploadFinisherTask()
task.request.retries = 0
task.request.headers = {}

task.run_impl(
dbsession,
[{"upload_id": 0, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

first_call = lock_manager_cls.call_args_list[0]
assert (
first_call.kwargs["blocking_timeout"] == FINISHER_BLOCKING_TIMEOUT_SECONDS
)
assert (
first_call.kwargs["base_retry_countdown"]
== FINISHER_BASE_RETRY_COUNTDOWN_SECONDS
)

@pytest.mark.django_db
def test_locked_called_without_max_retries(
self, dbsession, mocker, mock_redis, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager")
mock_locked = lock_manager_cls.return_value.locked
mock_locked.return_value.__enter__ = mocker.MagicMock()
mock_locked.return_value.__exit__ = mocker.MagicMock(return_value=False)
mocker.patch.object(
UploadFinisherTask, "_handle_finisher_lock", return_value={}
)
mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[])
mocker.patch("tasks.upload_finisher.update_uploads")
mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports")
mock_redis.scard.return_value = 0

task = UploadFinisherTask()
task.request.retries = 0
task.request.headers = {}

task.run_impl(
dbsession,
[{"upload_id": 0, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

locked_call = mock_locked.call_args
assert (
"max_retries" not in locked_call.kwargs
or locked_call.kwargs.get("max_retries") is None
)

@pytest.mark.django_db
def test_ping_db_recovers_stale_connection(self, dbsession, mocker):
"""_ping_db rolls back on OperationalError so the next query gets a fresh connection."""
execute_call_count = 0
original_execute = dbsession.execute

def failing_execute(*args, **kwargs):
nonlocal execute_call_count
execute_call_count += 1
if execute_call_count == 1:
raise OperationalError("SELECT 1", {}, Exception("connection closed"))
return original_execute(*args, **kwargs)

mocker.patch.object(dbsession, "execute", side_effect=failing_execute)
mock_rollback = mocker.patch.object(dbsession, "rollback")

task = UploadFinisherTask()
task._ping_db(dbsession)

mock_rollback.assert_called_once()

@pytest.mark.django_db
def test_ping_db_noop_on_healthy_connection(self, dbsession, mocker):
"""_ping_db does nothing when the connection is alive."""
mock_rollback = mocker.patch.object(dbsession, "rollback")

task = UploadFinisherTask()
task._ping_db(dbsession)

mock_rollback.assert_not_called()

@pytest.mark.django_db
def test_per_task_retry_limit_still_enforced(
self, dbsession, mocker, mock_redis, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

m = mocker.MagicMock()
m.return_value.locked.return_value.__enter__.side_effect = LockRetry(60)
mocker.patch("tasks.upload_finisher.LockManager", m)

task = UploadFinisherTask()
task.request.retries = 0
task.request.headers = {"attempts": 10}

result = task.run_impl(
dbsession,
[{"upload_id": 0, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result is None


class TestCommitRefreshAfterLock:
"""Tests for CCMRG-2028: commit must be refreshed after acquiring lock."""

Expand Down
29 changes: 25 additions & 4 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import sentry_sdk
from asgiref.sync import async_to_sync
from celery.exceptions import Retry, SoftTimeLimitExceeded
from sqlalchemy import text
from sqlalchemy.exc import OperationalError

from app import celery_app
from celery_config import notify_error_task_name
Expand Down Expand Up @@ -53,6 +55,9 @@

log = logging.getLogger(__name__)

FINISHER_BLOCKING_TIMEOUT_SECONDS = 30
FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10

UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER = Counter(
"upload_finisher_already_completed",
"Number of times finisher skipped work because uploads were already in final state",
Expand Down Expand Up @@ -83,6 +88,20 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name):

max_retries = UPLOAD_PROCESSOR_MAX_RETRIES

@staticmethod
def _ping_db(db_session):
"""Recover from stale DB connections after waiting for a Redis lock.

PgBouncer or the DB server may close idle connections while the task
blocks on lock acquisition (up to FINISHER_BLOCKING_TIMEOUT_SECONDS).
A lightweight SELECT 1 will surface a dead connection; rollback then
discards it so the next query gets a fresh one from the pool.
"""
try:
db_session.execute(text("SELECT 1"))
except OperationalError:
db_session.rollback()

def _find_started_uploads_with_reports(
self, db_session, commit: Commit
) -> set[int]:
Expand Down Expand Up @@ -418,15 +437,16 @@ def _process_reports_with_lock(
repoid=repoid,
commitid=commitid,
lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS),
blocking_timeout=None,
blocking_timeout=FINISHER_BLOCKING_TIMEOUT_SECONDS,
base_retry_countdown=FINISHER_BASE_RETRY_COUNTDOWN_SECONDS,
)

try:
with lock_manager.locked(
LockType.UPLOAD_PROCESSING,
max_retries=UPLOAD_PROCESSOR_MAX_RETRIES,
retry_num=self.attempts,
):
self._ping_db(db_session)
db_session.refresh(commit)
report_service = ReportService(commit_yaml)

Expand Down Expand Up @@ -517,15 +537,16 @@ def _handle_finisher_lock(
repoid=repoid,
commitid=commitid,
lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS),
blocking_timeout=None,
blocking_timeout=FINISHER_BLOCKING_TIMEOUT_SECONDS,
base_retry_countdown=FINISHER_BASE_RETRY_COUNTDOWN_SECONDS,
)

try:
with lock_manager.locked(
LockType.UPLOAD_FINISHER,
max_retries=UPLOAD_PROCESSOR_MAX_RETRIES,
retry_num=self.attempts,
):
self._ping_db(db_session)
result = self.finish_reports_processing(
db_session, commit, commit_yaml, processing_results
)
Expand Down