Skip to content

Commit 9b95e6b

Browse files
fix(worker): Cap bundle analysis processor at 10 attempts and fix retry logic (#688)
* fix(worker): Cap bundle analysis processor at 10 attempts and fix retry logic Cap total attempts at 10 (not 10 retries + 1) for BundleAnalysisProcessorTask and LockManager so we stop after 10 tries. Add Redis-backed attempt counter in LockManager for lock contention so broker re-deliveries with unchanged headers do not retry indefinitely. BaseCodecovTask._has_exceeded_max_attempts now takes max_attempts and compares to attempts (retries + 1 or header). On generic exception in bundle processor, return and set upload to error instead of re-raising to avoid unbounded retries. Update tests: mock request for _has_exceeded_max_attempts, set mock_redis.incr.return_value where LockManager compares attempts, and adjust cleanup test to expect return instead of raised ValueError. Refs CCMRG-2042 Co-authored-by: Cursor <cursoragent@cursor.com> * ref(worker): Extract lock clear helper, fix finisher log, clean comments - LockManager: extract _clear_lock_attempt_counter to remove nested try in locked() - Upload finisher: log max_attempts as UPLOAD_PROCESSOR_MAX_RETRIES (not +1) - Lock_manager: comment TTL intent instead of restating 24h - Tests: remove hard-coded (10) from comments; use max_attempts wording Co-authored-by: Cursor <cursoragent@cursor.com> * ref(worker): use max_retries only, remove max_attempts alias - BaseCodecovTask: doc and safe_retry use max_retries; drop max_attempts property - LockRetry: max_attempts -> max_retries (same semantics: max total attempts) - LockManager/bundle_analysis_processor/upload_finisher: log and Sentry use max_retries - Tests: LockRetry(max_retries=...), comments say max_retries - celery_config: one-line convention (max_retries = max total attempts) - Fix duplicate dict keys in lock_manager and upload_finisher Refs CCMRG-2042 Co-authored-by: Cursor <cursoragent@cursor.com> * test(worker): set mock_redis.incr.return_value in BA retry countdown test LockManager uses redis incr for attempt count; mock must return an int so attempts >= max_retries does not raise TypeError. Refs CCMRG-2042 Co-authored-by: Cursor <cursoragent@cursor.com> * fix(worker): honor LockRetry.max_retries_exceeded in ManualTriggerTask When LockManager's Redis attempt counter hits max_retries before the task's attempt count (e.g. re-deliveries), it raises LockRetry(max_retries_exceeded=True, countdown=0). ManualTriggerTask only checked self._has_exceeded_max_attempts(), so it fell through to self.retry(countdown=0) and caused rapid zero-delay retries. Align with preprocess_upload and other callers: check retry.max_retries_exceeded or self._has_exceeded_max_attempts() and return failure dict when either is true. Add test for the Redis-counter path. Co-authored-by: Cursor <cursoragent@cursor.com> * fix(worker): Address PR feedback for bundle analysis and lock manager - Return processing_results in max-retries-exceeded path for consistent defensive isinstance behavior (bundle_analysis_processor) - Consolidate redis exception imports; add blank line between lock_name and attempt_key in LockManager.locked() (lock_manager) Refs CCMRG-2042 Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 10c6c66 commit 9b95e6b

16 files changed

+284
-184
lines changed

apps/worker/services/lock_manager.py

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import sentry_sdk
88
from redis import Redis # type: ignore
9-
from redis.exceptions import LockError # type: ignore
9+
from redis.exceptions import ConnectionError as RedisConnectionError # type: ignore
10+
from redis.exceptions import LockError
11+
from redis.exceptions import TimeoutError as RedisTimeoutError
1012

1113
from database.enums import ReportType
1214
from shared.celery_config import (
@@ -23,6 +25,10 @@
2325
RETRY_BACKOFF_MULTIPLIER = 3
2426
RETRY_COUNTDOWN_RANGE_DIVISOR = 2
2527
LOCK_NAME_SEPARATOR = "_lock_"
28+
LOCK_ATTEMPTS_KEY_PREFIX = "lock_attempts:"
29+
LOCK_ATTEMPTS_TTL_SECONDS = (
30+
86400 # TTL for attempt counter key so it expires after one day
31+
)
2632

2733

2834
class LockType(Enum):
@@ -44,21 +50,21 @@ def __init__(
4450
countdown: int,
4551
max_retries_exceeded: bool = False,
4652
retry_num: int | None = None,
47-
max_attempts: int | None = None,
53+
max_retries: int | None = None,
4854
lock_name: str | None = None,
4955
repoid: int | None = None,
5056
commitid: str | None = None,
5157
):
5258
self.countdown = countdown
5359
self.max_retries_exceeded = max_retries_exceeded
5460
self.retry_num = retry_num
55-
self.max_attempts = max_attempts
61+
self.max_retries = max_retries
5662
self.lock_name = lock_name
5763
self.repoid = repoid
5864
self.commitid = commitid
5965
if max_retries_exceeded:
6066
error_msg = (
61-
f"Lock acquisition failed after {retry_num} retries (max: {max_attempts}). "
67+
f"Lock acquisition failed after {retry_num} retries (max: {max_retries}). "
6268
f"Repo: {repoid}, Commit: {commitid}"
6369
)
6470
super().__init__(error_msg)
@@ -93,6 +99,22 @@ def lock_name(self, lock_type: LockType):
9399
else:
94100
return f"{lock_type.value}{LOCK_NAME_SEPARATOR}{self.repoid}_{self.commitid}_{self.report_type.value}"
95101

102+
def _clear_lock_attempt_counter(self, attempt_key: str, lock_name: str) -> None:
103+
"""Clear the lock attempt counter. Log and swallow Redis errors so teardown does not mask other failures."""
104+
try:
105+
self.redis_connection.delete(attempt_key)
106+
except (RedisConnectionError, RedisTimeoutError, OSError) as e:
107+
log.warning(
108+
"Failed to clear lock attempt counter (Redis unavailable or error)",
109+
extra={
110+
"attempt_key": attempt_key,
111+
"commitid": self.commitid,
112+
"lock_name": lock_name,
113+
"repoid": self.repoid,
114+
},
115+
exc_info=True,
116+
)
117+
96118
@contextmanager
97119
def locked(
98120
self,
@@ -106,9 +128,11 @@ def locked(
106128
lock_type: Type of lock to acquire
107129
retry_num: Attempt count (should be self.attempts from BaseCodecovTask).
108130
Used for both exponential backoff and max retry checking.
109-
max_retries: Maximum number of retries allowed
131+
max_retries: Maximum total attempts (stop when attempts >= this).
110132
"""
111133
lock_name = self.lock_name(lock_type)
134+
135+
attempt_key = f"{LOCK_ATTEMPTS_KEY_PREFIX}{lock_name}"
112136
try:
113137
log.info(
114138
"Acquiring lock",
@@ -134,7 +158,10 @@ def locked(
134158
"repoid": self.repoid,
135159
},
136160
)
137-
yield
161+
try:
162+
yield
163+
finally:
164+
self._clear_lock_attempt_counter(attempt_key, lock_name)
138165
lock_duration = time.time() - lock_acquired_time
139166
log.info(
140167
"Releasing lock",
@@ -147,6 +174,12 @@ def locked(
147174
},
148175
)
149176
except LockError:
177+
# incr/expire can raise RedisConnectionError/RedisTimeoutError when Redis
178+
# is unavailable; we let those propagate so the task fails once (no infinite loop).
179+
attempts = self.redis_connection.incr(attempt_key)
180+
if attempts == 1:
181+
self.redis_connection.expire(attempt_key, LOCK_ATTEMPTS_TTL_SECONDS)
182+
150183
max_retry_unbounded = self.base_retry_countdown * (
151184
RETRY_BACKOFF_MULTIPLIER**retry_num
152185
)
@@ -159,44 +192,42 @@ def locked(
159192
)
160193
countdown = countdown_unbounded
161194

162-
if max_retries is not None and retry_num > max_retries:
163-
max_attempts = max_retries + 1
195+
if max_retries is not None and attempts >= max_retries:
164196
error = LockRetry(
165197
countdown=0,
166198
max_retries_exceeded=True,
167-
retry_num=retry_num,
168-
max_attempts=max_attempts,
199+
retry_num=attempts,
200+
max_retries=max_retries,
169201
lock_name=lock_name,
170202
repoid=self.repoid,
171203
commitid=self.commitid,
172204
)
173205
log.error(
174-
"Not retrying since we already had too many retries",
206+
"Not retrying since we already had too many attempts",
175207
extra={
208+
"attempts": attempts,
176209
"commitid": self.commitid,
177210
"lock_name": lock_name,
178211
"lock_type": lock_type.value,
179-
"max_attempts": max_attempts,
180212
"max_retries": max_retries,
181213
"repoid": self.repoid,
182214
"report_type": self.report_type.value,
183-
"retry_num": retry_num,
184215
},
185216
exc_info=True,
186217
)
187218
sentry_sdk.capture_exception(
188219
error,
189220
contexts={
190221
"lock_acquisition": {
222+
"attempts": attempts,
191223
"blocking_timeout": self.blocking_timeout,
192224
"commitid": self.commitid,
193225
"lock_name": lock_name,
194226
"lock_timeout": self.lock_timeout,
195227
"lock_type": lock_type.value,
196-
"max_attempts": max_attempts,
228+
"max_retries": max_retries,
197229
"repoid": self.repoid,
198230
"report_type": self.report_type.value,
199-
"retry_num": retry_num,
200231
}
201232
},
202233
tags={

apps/worker/services/tests/test_lock_manager.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def test_locked_logs_duration(self, mock_redis, caplog):
144144
def test_locked_lock_error_raises_lock_retry(self, mock_redis):
145145
"""Test that LockError raises LockRetry exception"""
146146
mock_redis.lock.side_effect = LockError()
147+
mock_redis.incr.return_value = 1
147148

148149
manager = LockManager(repoid=123, commitid="abc123")
149150
with pytest.raises(LockRetry) as exc_info:
@@ -157,6 +158,7 @@ def test_locked_lock_error_raises_lock_retry(self, mock_redis):
157158
def test_locked_exponential_backoff_retry_0(self, mock_redis):
158159
"""Test exponential backoff calculation for retry_num=0"""
159160
mock_redis.lock.side_effect = LockError()
161+
mock_redis.incr.return_value = 1
160162

161163
manager = LockManager(repoid=123, commitid="abc123")
162164
with pytest.raises(LockRetry) as exc_info:
@@ -169,6 +171,7 @@ def test_locked_exponential_backoff_retry_0(self, mock_redis):
169171
def test_locked_exponential_backoff_retry_1(self, mock_redis):
170172
"""Test exponential backoff calculation for retry_num=1"""
171173
mock_redis.lock.side_effect = LockError()
174+
mock_redis.incr.return_value = 1
172175

173176
manager = LockManager(repoid=123, commitid="abc123")
174177
with pytest.raises(LockRetry) as exc_info:
@@ -181,6 +184,7 @@ def test_locked_exponential_backoff_retry_1(self, mock_redis):
181184
def test_locked_exponential_backoff_retry_2(self, mock_redis):
182185
"""Test exponential backoff calculation for retry_num=2"""
183186
mock_redis.lock.side_effect = LockError()
187+
mock_redis.incr.return_value = 1
184188

185189
manager = LockManager(repoid=123, commitid="abc123")
186190
with pytest.raises(LockRetry) as exc_info:
@@ -193,6 +197,7 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis):
193197
def test_locked_exponential_backoff_cap(self, mock_redis):
194198
"""Test that exponential backoff is capped at 5 hours"""
195199
mock_redis.lock.side_effect = LockError()
200+
mock_redis.incr.return_value = 1
196201

197202
manager = LockManager(repoid=123, commitid="abc123")
198203
# Use a high retry_num that would exceed the cap
@@ -206,6 +211,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis):
206211
def test_locked_max_retries_not_provided(self, mock_redis, caplog):
207212
"""Test that max_retries=None doesn't log error"""
208213
mock_redis.lock.side_effect = LockError()
214+
mock_redis.incr.return_value = 1
209215

210216
manager = LockManager(repoid=123, commitid="abc123")
211217
with caplog.at_level(logging.ERROR):
@@ -219,6 +225,7 @@ def test_locked_max_retries_not_provided(self, mock_redis, caplog):
219225
def test_locked_max_retries_not_exceeded(self, mock_redis, caplog):
220226
"""Test that max_retries check doesn't log error when not exceeded"""
221227
mock_redis.lock.side_effect = LockError()
228+
mock_redis.incr.return_value = 1
222229

223230
manager = LockManager(repoid=123, commitid="abc123")
224231
with caplog.at_level(logging.ERROR):
@@ -234,20 +241,22 @@ def test_locked_max_retries_not_exceeded(self, mock_redis, caplog):
234241
assert len(error_logs) == 0
235242

236243
def test_locked_max_retries_exceeded(self, mock_redis, caplog):
237-
"""Test that max_retries exceeded raises LockRetry with max_retries_exceeded=True"""
244+
"""Test that max attempts exceeded raises LockRetry with max_retries_exceeded=True"""
238245
mock_redis.lock.side_effect = LockError()
246+
mock_redis.incr.return_value = (
247+
3 # Redis attempt count >= max_retries (3 = 3 attempts)
248+
)
239249

240250
manager = LockManager(repoid=123, commitid="abc123")
241251
with caplog.at_level(logging.ERROR):
242252
with pytest.raises(LockRetry) as exc_info:
243253
with manager.locked(LockType.UPLOAD, retry_num=5, max_retries=3):
244254
pass
245255

246-
# Should raise LockRetry with max_retries_exceeded=True
247256
assert isinstance(exc_info.value, LockRetry)
248257
assert exc_info.value.max_retries_exceeded is True
249-
assert exc_info.value.retry_num == 5
250-
assert exc_info.value.max_attempts == 4 # max_retries + 1
258+
assert exc_info.value.retry_num == 3 # from Redis incr
259+
assert exc_info.value.max_retries == 3
251260
assert exc_info.value.lock_name == "upload_lock_123_abc123"
252261
assert exc_info.value.repoid == 123
253262
assert exc_info.value.commitid == "abc123"
@@ -256,41 +265,40 @@ def test_locked_max_retries_exceeded(self, mock_redis, caplog):
256265
error_logs = [
257266
r
258267
for r in caplog.records
259-
if r.levelname == "ERROR" and "too many retries" in r.message
268+
if r.levelname == "ERROR" and "too many attempts" in r.message
260269
]
261270
assert len(error_logs) == 1
262271
assert error_logs[0].__dict__["max_retries"] == 3
263-
assert error_logs[0].__dict__["retry_num"] == 5
272+
assert error_logs[0].__dict__["attempts"] == 3
264273

265274
def test_locked_max_retries_exceeded_at_boundary(self, mock_redis, caplog):
266-
"""Test that max_retries boundary condition raises LockRetry with max_retries_exceeded=True"""
275+
"""Test that max attempts boundary (attempts >= max_retries) raises LockRetry"""
267276
mock_redis.lock.side_effect = LockError()
277+
mock_redis.incr.return_value = 3 # attempts >= max_retries (3)
268278

269279
manager = LockManager(repoid=123, commitid="abc123")
270280
with caplog.at_level(logging.ERROR):
271281
with pytest.raises(LockRetry) as exc_info:
272-
# retry_num now represents self.attempts (starts at 1)
273-
# max_retries=3 means max_attempts=4, so retry_num=4 should exceed
274282
with manager.locked(LockType.UPLOAD, retry_num=4, max_retries=3):
275283
pass
276284

277-
# Should raise LockRetry with max_retries_exceeded=True
278285
assert isinstance(exc_info.value, LockRetry)
279286
assert exc_info.value.max_retries_exceeded is True
280-
assert exc_info.value.retry_num == 4
281-
assert exc_info.value.max_attempts == 4
287+
assert exc_info.value.retry_num == 3
288+
assert exc_info.value.max_retries == 3
282289
assert exc_info.value.countdown == 0
283290

284291
error_logs = [
285292
r
286293
for r in caplog.records
287-
if r.levelname == "ERROR" and "too many retries" in r.message
294+
if r.levelname == "ERROR" and "too many attempts" in r.message
288295
]
289296
assert len(error_logs) == 1
290297

291298
def test_locked_warning_logged_on_lock_error(self, mock_redis, caplog):
292299
"""Test that warning is logged when lock cannot be acquired"""
293300
mock_redis.lock.side_effect = LockError()
301+
mock_redis.incr.return_value = 1
294302

295303
manager = LockManager(repoid=123, commitid="abc123")
296304
with caplog.at_level(logging.WARNING):
@@ -358,6 +366,7 @@ def test_locked_blocking_timeout_enables_retry_logic(self, mock_redis):
358366
"""
359367
# When blocking_timeout is set, Redis raises LockError after timeout
360368
mock_redis.lock.side_effect = LockError()
369+
mock_redis.incr.return_value = 1
361370

362371
manager = LockManager(repoid=123, commitid="abc123", blocking_timeout=5)
363372

apps/worker/tasks/base.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,12 @@ def _get_request_headers(request) -> dict:
182182

183183

184184
class BaseCodecovTask(celery_app.Task):
185+
"""Base task for Codecov workers.
186+
187+
In this codebase, max_retries is the maximum total attempts (cap on runs).
188+
We stop when attempts >= max_retries. The name matches Celery/config.
189+
"""
190+
185191
Request = BaseCodecovRequest
186192

187193
def __init_subclass__(cls, name=None):
@@ -306,40 +312,31 @@ def attempts(self) -> int:
306312
return attempts_value
307313

308314
def _has_exceeded_max_attempts(self, max_retries: int | None) -> bool:
309-
"""Check if task has exceeded max attempts."""
315+
"""Return True if attempts >= max_retries (max_retries is max total attempts)."""
310316
if max_retries is None:
311317
return False
312318

313-
max_attempts = max_retries + 1
314-
return self.attempts >= max_attempts
319+
return self.attempts >= max_retries
315320

316321
def safe_retry(self, countdown=None, exc=None, **kwargs):
317-
"""Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry.
318-
319-
Uses self.max_retries to determine the retry limit. Tasks define max_retries as a class
320-
attribute, so it's known at instantiation and doesn't change.
321-
"""
322+
"""Safely retry with max retry limit. Returns False if max exceeded, otherwise raises Retry."""
322323
if self._has_exceeded_max_attempts(self.max_retries):
323-
# If we're here, self.max_retries is not None (otherwise _has_exceeded_max_attempts returns False)
324-
max_attempts = self.max_retries + 1
325324
log.error(
326325
f"Task {self.name} exceeded max retries",
327326
extra={
328327
"attempts": self.attempts,
329-
"max_attempts": max_attempts,
330328
"max_retries": self.max_retries,
331329
"task_name": self.name,
332330
},
333331
)
334332
TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc()
335333
sentry_sdk.capture_exception(
336334
MaxRetriesExceededError(
337-
f"Task {self.name} exceeded max retries: {self.attempts} >= {max_attempts}"
335+
f"Task {self.name} exceeded max retries: {self.attempts} >= {self.max_retries}"
338336
),
339337
contexts={
340338
"task": {
341339
"attempts": self.attempts,
342-
"max_attempts": max_attempts,
343340
"max_retries": self.max_retries,
344341
"task_name": self.name,
345342
}

apps/worker/tasks/bundle_analysis_notify.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def run_impl(
7777
"commitid": commitid,
7878
"repoid": repoid,
7979
"retry_num": retry.retry_num,
80-
"max_attempts": retry.max_attempts,
80+
"max_retries": retry.max_retries,
8181
},
8282
)
8383
return {

0 commit comments

Comments
 (0)