-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathbundle_analysis_notify.py
More file actions
159 lines (141 loc) · 4.89 KB
/
bundle_analysis_notify.py
File metadata and controls
159 lines (141 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import logging
from typing import Any
import sentry_sdk
from app import celery_app
from database.models import Commit
from helpers.github_installation import get_installation_name_for_owner_for_task
from services.bundle_analysis.notify import BundleAnalysisNotifyService
from services.bundle_analysis.notify.types import NotificationSuccess
from services.lock_manager import (
LockRetry,
LockType,
get_bundle_analysis_lock_manager,
)
from shared.celery_config import (
BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES,
bundle_analysis_notify_task_name,
)
from shared.yaml import UserYaml
from tasks.base import BaseCodecovTask
log = logging.getLogger(__name__)
class BundleAnalysisNotifyTask(BaseCodecovTask, name=bundle_analysis_notify_task_name):
max_retries = BUNDLE_ANALYSIS_NOTIFY_MAX_RETRIES
def run_impl(
self,
db_session,
# Celery `chain` injects this argument - it's the list of processing results
# from prior processor tasks in the chain
previous_result: list[dict[str, Any]],
*,
repoid: int,
commitid: str,
commit_yaml: dict,
**kwargs,
):
repoid = int(repoid)
commit_yaml = UserYaml.from_dict(commit_yaml)
log.info(
"Starting bundle analysis notify",
extra={
"repoid": repoid,
"commit": commitid,
"commit_yaml": commit_yaml,
},
)
lock_manager = get_bundle_analysis_lock_manager(
repoid=repoid,
commitid=commitid,
)
try:
with lock_manager.locked(
LockType.BUNDLE_ANALYSIS_NOTIFY,
max_retries=self.max_retries,
retry_num=self.attempts,
):
return self.process_impl_within_lock(
db_session=db_session,
repoid=repoid,
commitid=commitid,
commit_yaml=commit_yaml,
previous_result=previous_result,
**kwargs,
)
except LockRetry as retry:
if retry.max_retries_exceeded:
log.error(
"Not retrying lock acquisition - max retries exceeded",
extra={
"commitid": commitid,
"repoid": repoid,
"retry_num": retry.retry_num,
"max_retries": retry.max_retries,
},
)
return {
"notify_attempted": False,
"notify_succeeded": None,
}
self.retry(
max_retries=self.max_retries,
countdown=retry.countdown,
)
@sentry_sdk.trace
def process_impl_within_lock(
self,
*,
db_session,
repoid: int,
commitid: str,
commit_yaml: UserYaml,
previous_result: list[dict[str, Any]],
**kwargs,
):
log.info(
"Running bundle analysis notify",
extra={
"repoid": repoid,
"commit": commitid,
"commit_yaml": commit_yaml,
"parent_task": self.request.parent_id,
},
)
commit = (
db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first()
)
assert commit, "commit not found"
# previous_result is the list of processing results from prior processor tasks
# (they get accumulated as we execute each task in succession)
processing_results = (
previous_result if isinstance(previous_result, list) else []
)
if all(result["error"] is not None for result in processing_results):
# every processor errored, nothing to notify on
return {
"notify_attempted": False,
"notify_succeeded": NotificationSuccess.ALL_ERRORED,
}
installation_name_to_use = get_installation_name_for_owner_for_task(
self.name, commit.repository.author
)
notifier = BundleAnalysisNotifyService(
commit, commit_yaml, gh_app_installation_name=installation_name_to_use
)
result = notifier.notify()
log.info(
"Finished bundle analysis notify",
extra={
"repoid": repoid,
"commit": commitid,
"commit_yaml": commit_yaml,
"parent_task": self.request.parent_id,
"result": result,
},
)
return {
"notify_attempted": True,
"notify_succeeded": result.to_NotificationSuccess(),
}
RegisteredBundleAnalysisNotifyTask = celery_app.register_task(
BundleAnalysisNotifyTask()
)
bundle_analysis_notify_task = celery_app.tasks[RegisteredBundleAnalysisNotifyTask.name]