1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
|
import abc
import json
import logging
import random
import time
from typing import Any, Dict, Optional, Union
from requests import Response, Session
from requests.exceptions import ConnectionError
from requests.structures import CaseInsensitiveDict
from typing_extensions import TypeGuard
from jira.exceptions import JIRAError
LOG = logging.getLogger(__name__)
class PrepareRequestForRetry(metaclass=abc.ABCMeta):
"""This class allows for the manipulation of the Request keyword arguments before a retry.
The :py:meth:`.prepare` handles the processing of the Request keyword arguments.
"""
@abc.abstractmethod
def prepare(
self, original_request_kwargs: CaseInsensitiveDict
) -> CaseInsensitiveDict:
"""Process the Request's keyword arguments before retrying the Request.
Args:
original_request_kwargs (CaseInsensitiveDict): The keyword arguments of the Request.
Returns:
CaseInsensitiveDict: The new keyword arguments to use in the retried Request.
"""
return original_request_kwargs
class PassthroughRetryPrepare(PrepareRequestForRetry):
"""Returns the Request's keyword arguments unchanged, when no change needs to be made before a retry."""
def prepare(
self, original_request_kwargs: CaseInsensitiveDict
) -> CaseInsensitiveDict:
return super().prepare(original_request_kwargs)
def raise_on_error(resp: Optional[Response], **kwargs) -> TypeGuard[Response]:
"""Handle errors from a Jira Request
Args:
resp (Optional[Response]): Response from Jira request
Raises:
JIRAError: If Response is None
JIRAError: for unhandled 400 status codes.
Returns:
TypeGuard[Response]: True if the passed in Response is all good.
"""
request = kwargs.get("request", None)
if resp is None:
raise JIRAError("Empty Response!", response=resp, **kwargs)
if not resp.ok:
error = parse_error_msg(resp=resp)
raise JIRAError(
error,
status_code=resp.status_code,
url=resp.url,
request=request,
response=resp,
**kwargs,
)
return True # if no exception was raised, we have a valid Response
def parse_error_msg(resp: Response) -> str:
"""Parse a Jira Error message from the Response.
https://developer.atlassian.com/cloud/jira/platform/rest/v2/intro/#status-codes
Args:
resp (Response): The Jira API request's response.
Returns:
str: The error message parsed from the Response. An empty string if no error.
"""
resp_data: Dict[str, Any] = {} # json parsed from the response
parsed_error = "" # error message parsed from the response
if resp.status_code == 403 and "x-authentication-denied-reason" in resp.headers:
parsed_error = resp.headers["x-authentication-denied-reason"]
elif resp.text:
try:
resp_data = resp.json()
except ValueError:
parsed_error = resp.text
if "message" in resp_data:
# Jira 5.1 errors
parsed_error = resp_data["message"]
elif "errorMessages" in resp_data:
# Jira 5.0.x error messages sometimes come wrapped in this array
# Sometimes this is present but empty
error_messages = resp_data["errorMessages"]
if len(error_messages) > 0:
if isinstance(error_messages, (list, tuple)):
parsed_error = "\n".join(error_messages)
else:
parsed_error = error_messages
elif "errors" in resp_data:
resp_errors = resp_data["errors"]
if len(resp_errors) > 0 and isinstance(resp_errors, dict):
# Catching only 'errors' that are dict. See https://github.com/pycontribs/jira/issues/350
# Jira 6.x error messages are found in this array.
error_list = resp_errors.values()
parsed_error = ", ".join(error_list)
return parsed_error
class ResilientSession(Session):
"""This class is supposed to retry requests that do return temporary errors.
:py:meth:`__recoverable` handles all retry-able errors.
"""
def __init__(self, timeout=None, max_retries: int = 3, max_retry_delay: int = 60):
"""A Session subclass catered for the Jira API with exponential delaying retry.
Args:
timeout (Optional[int]): Timeout. Defaults to None.
max_retries (int): Max number of times to retry a request. Defaults to 3.
max_retry_delay (int): Max delay allowed between retries. Defaults to 60.
"""
self.timeout = timeout # TODO: Unused?
self.max_retries = max_retries
self.max_retry_delay = max_retry_delay
super().__init__()
# Indicate our preference for JSON to avoid https://bitbucket.org/bspeakmon/jira-python/issue/46 and https://jira.atlassian.com/browse/JRA-38551
self.headers.update({"Accept": "application/json,*.*;q=0.9"})
# Warn users on instantiation the debug level shouldn't be used for prod
LOG.debug(
"WARNING: On error, will dump Response headers and body to logs. "
+ f"Log level debug in '{__name__}' is not safe for production code!"
)
def _jira_prepare(self, **original_kwargs) -> dict:
"""Do any pre-processing of our own and return the updated kwargs."""
prepared_kwargs = original_kwargs.copy()
request_headers = self.headers.copy()
request_headers.update(original_kwargs.get("headers", {}))
prepared_kwargs["headers"] = request_headers
data = original_kwargs.get("data", None)
if isinstance(data, dict):
# mypy ensures we don't do this,
# but for people subclassing we should preserve old behaviour
prepared_kwargs["data"] = json.dumps(data)
return prepared_kwargs
def request( # type: ignore[override] # An intentionally different override
self,
method: str,
url: Union[str, bytes],
_prepare_retry_class: PrepareRequestForRetry = PassthroughRetryPrepare(),
**kwargs,
) -> Response:
"""This is an intentional override of `Session.request()` to inject some error handling and retry logic.
Raises:
Exception: Various exceptions as defined in py:method:`raise_on_error`.
Returns:
Response: The response.
"""
retry_number = 0
exception: Optional[Exception] = None
response: Optional[Response] = None
response_or_exception: Optional[Union[ConnectionError, Response]]
processed_kwargs = self._jira_prepare(**kwargs)
def is_allowed_to_retry() -> bool:
"""Helper method to say if we should still be retrying."""
return retry_number <= self.max_retries
while is_allowed_to_retry():
response = None
exception = None
try:
response = super().request(method, url, **processed_kwargs)
if response.ok:
self.__handle_known_ok_response_errors(response)
return response
# Can catch further exceptions as required below
except ConnectionError as e:
exception = e
# Decide if we should keep retrying
response_or_exception = response if response is not None else exception
retry_number += 1
if is_allowed_to_retry() and self.__recoverable(
response_or_exception, url, method.upper(), retry_number
):
_prepare_retry_class.prepare(processed_kwargs) # type: ignore[arg-type] # Dict and CaseInsensitiveDict are fine here
else:
retry_number = self.max_retries + 1 # exit the while loop, as above max
if exception is not None:
# We got an exception we could not recover from
raise exception
elif raise_on_error(response, **processed_kwargs):
# raise_on_error will raise an exception if the response is invalid
return response
else:
# Shouldn't reach here...(but added for mypy's benefit)
raise RuntimeError("Expected a Response or Exception to raise!")
def __handle_known_ok_response_errors(self, response: Response):
"""Responses that report ok may also have errors.
We can either log the error or raise the error as appropriate here.
Args:
response (Response): The response.
"""
if not response.ok:
return # We use self.__recoverable() to handle these
if (
len(response.content) == 0
and "X-Seraph-LoginReason" in response.headers
and "AUTHENTICATED_FAILED" in response.headers["X-Seraph-LoginReason"]
):
LOG.warning("Atlassian's bug https://jira.atlassian.com/browse/JRA-41559")
def __recoverable(
self,
response: Optional[Union[ConnectionError, Response]],
url: Union[str, bytes],
request_method: str,
counter: int = 1,
):
"""Return whether the request is recoverable and hence should be retried.
Exponentially delays if recoverable.
At this moment it supports: 429
Args:
response (Optional[Union[ConnectionError, Response]]): The response or exception.
Note: the response here is expected to be ``not response.ok``.
url (Union[str, bytes]): The URL.
request_method (str): The request method.
counter (int, optional): The retry counter to use when calculating the exponential delay. Defaults to 1.
Returns:
bool: True if the request should be retried.
"""
is_recoverable = False # Controls return value AND whether we delay or not, Not-recoverable by default
msg = str(response)
if isinstance(response, ConnectionError):
is_recoverable = True
LOG.warning(
f"Got ConnectionError [{response}] errno:{response.errno} on {request_method} "
+ f"{url}\n" # type: ignore[str-bytes-safe]
)
if LOG.level > logging.DEBUG:
LOG.warning(
"Response headers for ConnectionError are only printed for log level DEBUG."
)
if isinstance(response, Response):
if response.status_code in [429]:
is_recoverable = True
number_of_tokens_issued_per_interval = response.headers[
"X-RateLimit-FillRate"
]
token_issuing_rate_interval_seconds = response.headers[
"X-RateLimit-Interval-Seconds"
]
maximum_number_of_tokens = response.headers["X-RateLimit-Limit"]
retry_after = response.headers["retry-after"]
msg = f"{response.status_code} {response.reason}"
LOG.warning(
f"Request rate limited by Jira: request should be retried after {retry_after} seconds.\n"
+ f"{number_of_tokens_issued_per_interval} tokens are issued every {token_issuing_rate_interval_seconds} seconds. "
+ f"You can accumulate up to {maximum_number_of_tokens} tokens.\n"
+ "Consider adding an exemption for the user as explained in: "
+ "https://confluence.atlassian.com/adminjiraserver/improving-instance-stability-with-rate-limiting-983794911.html"
)
if is_recoverable:
# Exponential backoff with full jitter.
delay = min(self.max_retry_delay, 10 * 2**counter) * random.random()
LOG.warning(
"Got recoverable error from %s %s, will retry [%s/%s] in %ss. Err: %s"
% (request_method, url, counter, self.max_retries, delay, msg) # type: ignore[str-bytes-safe]
)
if isinstance(response, Response):
LOG.debug(
"response.headers:\n%s",
json.dumps(dict(response.headers), indent=4),
)
LOG.debug("response.body:\n%s", response.content)
time.sleep(delay)
return is_recoverable
|