Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_raise_over_query_limit(self):
query = self.params
responses.add(
responses.POST,
'https://httpbin.org/post',
json=query,
status=429,
content_type='application/json'
)
with self.assertRaises(routingpy.exceptions.OverQueryLimit):
client = RouterMock(base_url="https://httpbin.org", retry_over_query_limit=False)
client.directions(url='/post', post_params=query)
def _get_body(response):
status_code = response.status_code
try:
body = response.json()
except json.decoder.JSONDecodeError:
raise exceptions.JSONParseError("Can't decode JSON response:{}".format(response.text))
if status_code == 429:
raise exceptions.OverQueryLimit(status_code, body)
if 400 <= status_code < 500:
raise exceptions.RouterApiError(status_code, body)
if 500 <= status_code:
raise exceptions.RouterServerError(status_code, body)
if status_code != 200:
raise exceptions.RouterError(status_code, body)
return body
result = self._get_body(response)
return result
except exceptions.RouterApiError:
if self.skip_api_error:
warnings.warn(
"Router {} returned an API error with "
"the following message:\n{}".format(self.__class__.__name__, response.text)
)
return
raise
except exceptions.RetriableRequest as e:
if isinstance(e, exceptions.OverQueryLimit) and not self.retry_over_query_limit:
raise
warnings.warn(
'Rate limit exceeded.\nRetrying for the {}{} time.'.format(tried, get_ordinal(tried)),
UserWarning
)
# Retry request.
return self._request(
url, get_params, post_params, first_request_time, retry_counter + 1, requests_kwargs
)