-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Expand file tree
/
Copy patherrorhandler.py
More file actions
424 lines (340 loc) · 14.5 KB
/
errorhandler.py
File metadata and controls
424 lines (340 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import argparse
import logging
import signal
from botocore.exceptions import (
ClientError,
NoCredentialsError,
NoRegionError,
)
from botocore.exceptions import (
ParamValidationError as BotocoreParamValidationError,
)
from awscli.argparser import USAGE, ArgParseException
from awscli.argprocess import ParamError, ParamSyntaxError
from awscli.arguments import UnknownArgumentError
from awscli.autoprompt.factory import PrompterKeyboardInterrupt
from awscli.constants import (
CLIENT_ERROR_RC,
CONFIGURATION_ERROR_RC,
GENERAL_ERROR_RC,
PARAM_VALIDATION_ERROR_RC,
)
from awscli.customizations.exceptions import (
ConfigurationError,
ParamValidationError,
)
from awscli.errorformat import write_error
from awscli.formatter import get_formatter
from awscli.utils import PagerInitializationException
LOG = logging.getLogger(__name__)
VALID_ERROR_FORMATS = ['legacy', 'json', 'yaml', 'text', 'table', 'enhanced']
# Maximum number of items to display inline for collections
MAX_INLINE_ITEMS = 5
class EnhancedErrorFormatter:
def format_error(self, error_info, stream):
additional_fields = self._get_additional_fields(error_info)
if not additional_fields:
return
stream.write('\nAdditional error details:\n')
has_complex_value = False
for key, value in additional_fields.items():
if self._is_simple_value(value):
stream.write(f'{key}: {value}\n')
elif self._is_small_collection(value):
stream.write(f'{key}: {self._format_inline(value)}\n')
else:
stream.write(f'{key}: <complex value>\n')
has_complex_value = True
if has_complex_value:
stream.write(
'Use "--cli-error-format json" or another error format '
'to see the full details.\n'
)
def _is_simple_value(self, value):
return isinstance(value, (str, int, float, bool, type(None)))
def _is_small_collection(self, value):
if isinstance(value, list):
return len(value) < MAX_INLINE_ITEMS and all(
self._is_simple_value(item) for item in value
)
elif isinstance(value, dict):
return len(value) < MAX_INLINE_ITEMS and all(
self._is_simple_value(v) for v in value.values()
)
return False
def _format_inline(self, value):
if isinstance(value, list):
return f"[{', '.join(str(item) for item in value)}]"
elif isinstance(value, dict):
items = ', '.join(f'{k}: {v}' for k, v in value.items())
return f'{{{items}}}'
return str(value)
def _get_additional_fields(self, error_info):
standard_keys = {'code', 'message'}
return {
k: v
for k, v in error_info.items()
if k.lower() not in standard_keys
}
def construct_entry_point_handlers_chain():
handlers = [
ParamValidationErrorsHandler(),
PrompterInterruptExceptionHandler(),
InterruptExceptionHandler(),
GeneralExceptionHandler(),
]
return ChainedExceptionHandler(exception_handlers=handlers)
def construct_cli_error_handlers_chain(session=None):
# UnknownArgumentErrorHandler and InterruptExceptionHandler are
# intentionally excluded from structured formatting
handlers = [
ParamValidationErrorsHandler(session),
UnknownArgumentErrorHandler(),
ConfigurationErrorHandler(session),
NoRegionErrorHandler(session),
NoCredentialsErrorHandler(session),
PagerErrorHandler(session),
InterruptExceptionHandler(),
ClientErrorHandler(session),
GeneralExceptionHandler(session),
]
return ChainedExceptionHandler(exception_handlers=handlers)
class BaseExceptionHandler:
def handle_exception(self, exception, stdout, stderr, **kwargs):
raise NotImplementedError('handle_exception')
class FilteredExceptionHandler(BaseExceptionHandler):
EXCEPTIONS_TO_HANDLE = ()
RC = None
def __init__(self, session=None):
self._session = session
def handle_exception(self, exception, stdout, stderr, **kwargs):
if isinstance(exception, self.EXCEPTIONS_TO_HANDLE):
return_val = self._do_handle_exception(
exception, stdout, stderr, **kwargs
)
if return_val is not None:
return return_val
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
parsed_globals = kwargs.get('parsed_globals')
error_info = self._extract_error_info(exception)
if error_info:
formatted_message = self._get_formatted_message(
error_info, exception
)
displayed_structured = self._display_structured_error(
error_info, formatted_message, stderr, parsed_globals
)
if displayed_structured:
return self.RC
message = (error_info or {}).get('Message', str(exception))
write_error(stderr, message)
return self.RC
def _extract_error_info(self, exception):
"""Extract error information for structured formatting.
Returns None by default. Subclasses should override to provide
error information as a dict with 'Code' and 'Message' keys.
"""
return None
def _get_formatted_message(self, error_info, exception):
code = error_info.get('Code', 'Unknown')
message = error_info.get('Message', str(exception))
return f"An error occurred ({code}): {message}"
def _resolve_error_format(self, parsed_globals):
if parsed_globals:
error_format = getattr(parsed_globals, 'cli_error_format', None)
if error_format:
return error_format.lower()
if self._session:
try:
error_format = self._session.get_config_variable(
'cli_error_format'
)
if error_format:
return error_format.lower()
except (KeyError, AttributeError) as e:
LOG.debug('Failed to get cli_error_format from config: %s', e)
return 'enhanced'
def _display_structured_error(
self, error_info, formatted_message, stderr, parsed_globals=None
):
try:
error_format = self._resolve_error_format(parsed_globals)
modeled_fields = error_info.pop('_modeled_fields', None)
if modeled_fields is not None:
modeled_lower = {f.lower() for f in modeled_fields}
for key in list(error_info.keys()):
if key.lower() not in modeled_lower:
del error_info[key]
else:
# No model info available (e.g. manually constructed
# ClientError). Remove all non-standard fields.
standard = {'code', 'message'}
for key in list(error_info.keys()):
if key.lower() not in standard:
del error_info[key]
if error_format == 'legacy':
return False
if error_format not in VALID_ERROR_FORMATS:
LOG.warning(
f"Invalid cli_error_format: '{error_format}'. "
f"Using 'enhanced' format."
)
error_format = 'enhanced'
if error_format == 'enhanced':
write_error(stderr, formatted_message)
EnhancedErrorFormatter().format_error(error_info, stderr)
return True
color = getattr(parsed_globals, 'color', 'auto')
formatter_args = argparse.Namespace(query=None, color=color)
formatter = get_formatter(error_format, formatter_args)
formatter('error', error_info, stderr)
return True
except Exception as e:
LOG.debug(
'Failed to display structured error: %s', e, exc_info=True
)
return False
class ParamValidationErrorsHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = (
ParamError,
ParamSyntaxError,
ArgParseException,
ParamValidationError,
BotocoreParamValidationError,
)
RC = PARAM_VALIDATION_ERROR_RC
def _extract_error_info(self, exception):
return {'Code': 'ParamValidation', 'Message': str(exception)}
class SilenceParamValidationMsgErrorHandler(ParamValidationErrorsHandler):
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
return self.RC
class ClientErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = ClientError
RC = CLIENT_ERROR_RC
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
parsed_globals = kwargs.get('parsed_globals')
error_info = self._extract_error_info(exception)
if error_info:
formatted_message = self._get_formatted_message(
error_info, exception
)
displayed_structured = self._display_structured_error(
error_info, formatted_message, stderr, parsed_globals
)
if displayed_structured:
return self.RC
write_error(stderr, str(exception))
return self.RC
def _get_formatted_message(self, error_info, exception):
return str(exception)
def _extract_error_info(self, exception):
error_response = self._extract_error_response(exception)
if error_response and 'Error' in error_response:
error_info = error_response['Error']
modeled_fields = getattr(exception, 'modeled_fields', None)
error_info['_modeled_fields'] = modeled_fields
return error_info
return None
@staticmethod
def _extract_error_response(exception):
if not isinstance(exception, ClientError):
return None
if hasattr(exception, 'response') and 'Error' in exception.response:
error_dict = dict(exception.response['Error'])
# AWS services return modeled error fields
# at the top level of the error response,
# not nested under an Error key. Botocore preserves this structure.
# Include these fields to provide complete error information.
# Exclude response metadata and avoid duplicates.
excluded_keys = {'Error', 'ResponseMetadata', 'Code', 'Message'}
for key, value in exception.response.items():
if key not in excluded_keys and key not in error_dict:
error_dict[key] = value
return {'Error': error_dict}
return None
class ConfigurationErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = ConfigurationError
RC = CONFIGURATION_ERROR_RC
def _extract_error_info(self, exception):
return {'Code': 'Configuration', 'Message': str(exception)}
class NoRegionErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = NoRegionError
RC = CONFIGURATION_ERROR_RC
def _extract_error_info(self, exception):
message = (
f'{exception} You can also configure your region by running '
f'"aws configure".'
)
return {'Code': 'NoRegion', 'Message': message}
class NoCredentialsErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = NoCredentialsError
RC = CONFIGURATION_ERROR_RC
def _extract_error_info(self, exception):
message = (
f'{exception}. You can configure credentials '
f'by running "aws login".'
)
return {'Code': 'NoCredentials', 'Message': message}
class PagerErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = PagerInitializationException
RC = CONFIGURATION_ERROR_RC
def _extract_error_info(self, exception):
message = (
f'Unable to redirect output to pager. Received the '
f'following error when opening pager:\n{exception}\n\n'
f'Learn more about configuring the output pager by running '
f'"aws help config-vars".'
)
return {'Code': 'Pager', 'Message': message}
class UnknownArgumentErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = UnknownArgumentError
RC = PARAM_VALIDATION_ERROR_RC
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
stderr.write("\n")
stderr.write(f'usage: {USAGE}\n')
write_error(stderr, str(exception))
return self.RC
class InterruptExceptionHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = KeyboardInterrupt
RC = 128 + signal.SIGINT
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
stdout.write("\n")
return self.RC
class PrompterInterruptExceptionHandler(InterruptExceptionHandler):
EXCEPTIONS_TO_HANDLE = PrompterKeyboardInterrupt
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
stderr.write(f'{exception}')
stderr.write("\n")
return self.RC
class GeneralExceptionHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = Exception
RC = GENERAL_ERROR_RC
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
# Generic exceptions don't have meaningful structure,
# so always use plain text formatting
write_error(stderr, str(exception))
return self.RC
class ChainedExceptionHandler(BaseExceptionHandler):
def __init__(self, exception_handlers):
self._exception_handlers = exception_handlers
def inject_handler(self, position, handler):
self._exception_handlers.insert(position, handler)
def handle_exception(self, exception, stdout, stderr, **kwargs):
for handler in self._exception_handlers:
return_value = handler.handle_exception(
exception, stdout, stderr, **kwargs
)
if return_value is not None:
return return_value