|
| 1 | +import base64 |
| 2 | +import json |
| 3 | +import os |
| 4 | +import sys |
| 5 | +from dataclasses import dataclass |
| 6 | +from typing import Optional |
| 7 | + |
| 8 | +from socketsecurity import USER_AGENT |
| 9 | +from socketsecurity.core import log |
| 10 | +from socketsecurity.core.classes import Comment |
| 11 | +from socketsecurity.core.scm_comments import Comments |
| 12 | +from socketsecurity.socketcli import CliClient |
| 13 | + |
| 14 | + |
| 15 | +@dataclass |
| 16 | +class BitbucketConfig: |
| 17 | + """Configuration from Bitbucket Pipelines environment variables.""" |
| 18 | + api_url: str |
| 19 | + workspace: str |
| 20 | + repo_slug: str |
| 21 | + repository: str |
| 22 | + pr_id: Optional[str] |
| 23 | + source_branch: Optional[str] |
| 24 | + destination_branch: Optional[str] |
| 25 | + default_branch: Optional[str] |
| 26 | + commit_sha: str |
| 27 | + is_default_branch: bool |
| 28 | + token: str |
| 29 | + username: Optional[str] |
| 30 | + headers: dict |
| 31 | + |
| 32 | + @classmethod |
| 33 | + def from_env(cls, pr_number: Optional[str] = None) -> "BitbucketConfig": |
| 34 | + """Create config from Bitbucket Pipelines env vars. |
| 35 | +
|
| 36 | + Supports two auth styles: |
| 37 | + - Bearer: BITBUCKET_TOKEN (workspace/repo/project access tokens, OAuth) |
| 38 | + - Basic: BITBUCKET_USERNAME + BITBUCKET_APP_PASSWORD |
| 39 | + """ |
| 40 | + token = os.getenv("BITBUCKET_TOKEN", "") |
| 41 | + username = os.getenv("BITBUCKET_USERNAME") |
| 42 | + app_password = os.getenv("BITBUCKET_APP_PASSWORD") |
| 43 | + |
| 44 | + if not token and not (username and app_password): |
| 45 | + log.error( |
| 46 | + "Unable to get Bitbucket credentials. Set BITBUCKET_TOKEN, " |
| 47 | + "or BITBUCKET_USERNAME + BITBUCKET_APP_PASSWORD." |
| 48 | + ) |
| 49 | + sys.exit(2) |
| 50 | + |
| 51 | + api_url = os.getenv("BITBUCKET_API_URL", "https://api.bitbucket.org/2.0").rstrip("/") |
| 52 | + |
| 53 | + repo_full_name = os.getenv("BITBUCKET_REPO_FULL_NAME", "") |
| 54 | + workspace = os.getenv("BITBUCKET_WORKSPACE", "") |
| 55 | + repo_slug = os.getenv("BITBUCKET_REPO_SLUG", "") |
| 56 | + if repo_full_name and "/" in repo_full_name: |
| 57 | + full_workspace, full_slug = repo_full_name.split("/", 1) |
| 58 | + workspace = workspace or full_workspace |
| 59 | + repo_slug = repo_slug or full_slug |
| 60 | + |
| 61 | + pr_id = pr_number or os.getenv("BITBUCKET_PR_ID") |
| 62 | + if pr_id == "0": |
| 63 | + pr_id = None |
| 64 | + |
| 65 | + source_branch = os.getenv("BITBUCKET_BRANCH") |
| 66 | + destination_branch = os.getenv("BITBUCKET_PR_DESTINATION_BRANCH") |
| 67 | + default_branch = os.getenv("BITBUCKET_DEFAULT_BRANCH") |
| 68 | + commit_sha = os.getenv("BITBUCKET_COMMIT", "") |
| 69 | + |
| 70 | + is_default_branch = bool( |
| 71 | + source_branch and default_branch and source_branch == default_branch |
| 72 | + ) |
| 73 | + |
| 74 | + headers = cls._get_auth_headers(token, username, app_password) |
| 75 | + |
| 76 | + return cls( |
| 77 | + api_url=api_url, |
| 78 | + workspace=workspace, |
| 79 | + repo_slug=repo_slug, |
| 80 | + repository=repo_slug, |
| 81 | + pr_id=pr_id, |
| 82 | + source_branch=source_branch, |
| 83 | + destination_branch=destination_branch, |
| 84 | + default_branch=default_branch, |
| 85 | + commit_sha=commit_sha, |
| 86 | + is_default_branch=is_default_branch, |
| 87 | + token=token, |
| 88 | + username=username, |
| 89 | + headers=headers, |
| 90 | + ) |
| 91 | + |
| 92 | + @staticmethod |
| 93 | + def _get_auth_headers( |
| 94 | + token: str, |
| 95 | + username: Optional[str], |
| 96 | + app_password: Optional[str], |
| 97 | + ) -> dict: |
| 98 | + base_headers = { |
| 99 | + "User-Agent": USER_AGENT, |
| 100 | + "Accept": "application/json", |
| 101 | + "Content-Type": "application/json", |
| 102 | + } |
| 103 | + if token: |
| 104 | + return {**base_headers, "Authorization": f"Bearer {token}"} |
| 105 | + encoded = base64.b64encode(f"{username}:{app_password}".encode()).decode("ascii") |
| 106 | + return {**base_headers, "Authorization": f"Basic {encoded}"} |
| 107 | + |
| 108 | + |
| 109 | +class Bitbucket: |
| 110 | + def __init__(self, client: CliClient, config: Optional[BitbucketConfig] = None): |
| 111 | + self.config = config or BitbucketConfig.from_env() |
| 112 | + self.client = client |
| 113 | + |
| 114 | + def check_event_type(self) -> str: |
| 115 | + """Bitbucket Pipelines does not expose a 'comment' trigger. |
| 116 | +
|
| 117 | + If a PR id is set we treat the run as a diff; otherwise main branch. |
| 118 | + """ |
| 119 | + if self.config.pr_id: |
| 120 | + return "diff" |
| 121 | + return "main" |
| 122 | + |
| 123 | + def _pr_comments_path(self, comment_id: Optional[str] = None) -> str: |
| 124 | + base = ( |
| 125 | + f"repositories/{self.config.workspace}/{self.config.repo_slug}" |
| 126 | + f"/pullrequests/{self.config.pr_id}/comments" |
| 127 | + ) |
| 128 | + if comment_id: |
| 129 | + return f"{base}/{comment_id}" |
| 130 | + return base |
| 131 | + |
| 132 | + def post_comment(self, body: str) -> None: |
| 133 | + path = self._pr_comments_path() |
| 134 | + payload = json.dumps({"content": {"raw": body}}) |
| 135 | + self.client.request( |
| 136 | + path=path, |
| 137 | + payload=payload, |
| 138 | + method="POST", |
| 139 | + headers=self.config.headers, |
| 140 | + base_url=self.config.api_url, |
| 141 | + ) |
| 142 | + |
| 143 | + def update_comment(self, body: str, comment_id: str) -> None: |
| 144 | + path = self._pr_comments_path(comment_id) |
| 145 | + payload = json.dumps({"content": {"raw": body}}) |
| 146 | + self.client.request( |
| 147 | + path=path, |
| 148 | + payload=payload, |
| 149 | + method="PUT", |
| 150 | + headers=self.config.headers, |
| 151 | + base_url=self.config.api_url, |
| 152 | + ) |
| 153 | + |
| 154 | + def get_comments_for_pr(self) -> dict: |
| 155 | + log.debug( |
| 156 | + f"Getting Bitbucket comments for Repo {self.config.repo_slug} " |
| 157 | + f"for PR {self.config.pr_id}" |
| 158 | + ) |
| 159 | + comments: dict = {} |
| 160 | + if not self.config.pr_id: |
| 161 | + return comments |
| 162 | + |
| 163 | + next_url: Optional[str] = None |
| 164 | + first_path = f"{self._pr_comments_path()}?pagelen=100" |
| 165 | + |
| 166 | + while True: |
| 167 | + if next_url: |
| 168 | + # Bitbucket returns absolute 'next' URLs; pass via base_url=''. |
| 169 | + response = self.client.request( |
| 170 | + path=next_url, |
| 171 | + headers=self.config.headers, |
| 172 | + base_url="", |
| 173 | + ) |
| 174 | + else: |
| 175 | + response = self.client.request( |
| 176 | + path=first_path, |
| 177 | + headers=self.config.headers, |
| 178 | + base_url=self.config.api_url, |
| 179 | + ) |
| 180 | + data = Comments.process_response(response) |
| 181 | + if not isinstance(data, dict): |
| 182 | + log.error(f"Unexpected Bitbucket comments response: {data}") |
| 183 | + break |
| 184 | + if data.get("type") == "error" or "error" in data: |
| 185 | + log.error(data) |
| 186 | + break |
| 187 | + |
| 188 | + for raw in data.get("values", []): |
| 189 | + normalized = self._normalize_comment(raw) |
| 190 | + if normalized is None: |
| 191 | + continue |
| 192 | + comment = Comment(**normalized) |
| 193 | + comment.body_list = comment.body.split("\n") |
| 194 | + comments[comment.id] = comment |
| 195 | + |
| 196 | + next_url = data.get("next") |
| 197 | + if not next_url: |
| 198 | + break |
| 199 | + |
| 200 | + return Comments.check_for_socket_comments(comments) |
| 201 | + |
| 202 | + @staticmethod |
| 203 | + def _normalize_comment(raw: dict) -> Optional[dict]: |
| 204 | + """Map a Bitbucket Cloud comment payload to the Comment shape.""" |
| 205 | + if not isinstance(raw, dict): |
| 206 | + return None |
| 207 | + if raw.get("deleted"): |
| 208 | + return None |
| 209 | + content = raw.get("content") or {} |
| 210 | + body = content.get("raw") or content.get("markup") or "" |
| 211 | + user = raw.get("user") or {} |
| 212 | + normalized_user = { |
| 213 | + "login": user.get("nickname") or user.get("display_name", ""), |
| 214 | + "username": user.get("nickname") or user.get("display_name", ""), |
| 215 | + "id": user.get("uuid", ""), |
| 216 | + "display_name": user.get("display_name", ""), |
| 217 | + } |
| 218 | + return { |
| 219 | + "id": raw.get("id"), |
| 220 | + "body": body, |
| 221 | + "user": normalized_user, |
| 222 | + "created_at": raw.get("created_on", ""), |
| 223 | + "updated_at": raw.get("updated_on", ""), |
| 224 | + "html_url": (raw.get("links") or {}).get("html", {}).get("href", ""), |
| 225 | + "url": (raw.get("links") or {}).get("self", {}).get("href", ""), |
| 226 | + "reactions": {}, |
| 227 | + } |
| 228 | + |
| 229 | + def add_socket_comments( |
| 230 | + self, |
| 231 | + security_comment: str, |
| 232 | + overview_comment: str, |
| 233 | + comments: dict, |
| 234 | + new_security_comment: bool = True, |
| 235 | + new_overview_comment: bool = True, |
| 236 | + ) -> None: |
| 237 | + if not self.config.pr_id: |
| 238 | + log.debug("No Bitbucket PR id, skipping comment posting") |
| 239 | + return |
| 240 | + |
| 241 | + if new_overview_comment: |
| 242 | + log.debug("New Dependency Overview comment") |
| 243 | + if overview := comments.get("overview"): |
| 244 | + log.debug("Previous version of Dependency Overview, updating") |
| 245 | + self.update_comment(overview_comment, str(overview.id)) |
| 246 | + else: |
| 247 | + log.debug("No previous version of Dependency Overview, posting") |
| 248 | + self.post_comment(overview_comment) |
| 249 | + |
| 250 | + if new_security_comment: |
| 251 | + log.debug("New Security Issue Comment") |
| 252 | + if security := comments.get("security"): |
| 253 | + log.debug("Previous version of Security Issue comment, updating") |
| 254 | + self.update_comment(security_comment, str(security.id)) |
| 255 | + else: |
| 256 | + log.debug("No previous version of Security Issue comment, posting") |
| 257 | + self.post_comment(security_comment) |
| 258 | + |
| 259 | + def handle_ignore_reactions(self, comments: dict) -> None: |
| 260 | + """Bitbucket Cloud comments have no native reactions API equivalent. |
| 261 | +
|
| 262 | + We mark ignore comments as processed by editing them to append a |
| 263 | + hidden Socket marker. Subsequent runs check for this marker via |
| 264 | + has_thumbsup_reaction(). |
| 265 | + """ |
| 266 | + for comment in comments.get("ignore", []): |
| 267 | + if "SocketSecurity ignore" in comment.body and not self.has_thumbsup_reaction(comment.id): |
| 268 | + self._mark_comment_processed(comment) |
| 269 | + |
| 270 | + PROCESSED_MARKER = "<!-- socket-ignore-processed -->" |
| 271 | + |
| 272 | + def has_thumbsup_reaction(self, comment_id) -> bool: |
| 273 | + """Bitbucket has no reactions; detect our hidden processed marker.""" |
| 274 | + if not self.config.pr_id: |
| 275 | + return False |
| 276 | + try: |
| 277 | + response = self.client.request( |
| 278 | + path=self._pr_comments_path(str(comment_id)), |
| 279 | + headers=self.config.headers, |
| 280 | + base_url=self.config.api_url, |
| 281 | + ) |
| 282 | + data = response.json() if hasattr(response, "json") else {} |
| 283 | + body = ((data or {}).get("content") or {}).get("raw", "") |
| 284 | + return self.PROCESSED_MARKER in body |
| 285 | + except Exception as error: |
| 286 | + log.debug(f"Could not fetch Bitbucket comment {comment_id} for marker check: {error}") |
| 287 | + return False |
| 288 | + |
| 289 | + def _mark_comment_processed(self, comment) -> None: |
| 290 | + if self.PROCESSED_MARKER in comment.body: |
| 291 | + return |
| 292 | + new_body = f"{comment.body}\n\n{self.PROCESSED_MARKER}" |
| 293 | + try: |
| 294 | + self.update_comment(new_body, str(comment.id)) |
| 295 | + except Exception as error: |
| 296 | + log.debug(f"Failed to mark Bitbucket ignore comment {comment.id} as processed: {error}") |
| 297 | + |
| 298 | + def remove_comment_alerts(self, comments: dict) -> None: |
| 299 | + if security_alert := comments.get("security"): |
| 300 | + new_body = Comments.process_security_comment(security_alert, comments) |
| 301 | + self.handle_ignore_reactions(comments) |
| 302 | + self.update_comment(new_body, str(security_alert.id)) |
0 commit comments