Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
assert self.in_play_service.connect_timeout == 3.05
assert self.in_play_service.read_timeout == 16
assert self.in_play_service._error == APIError
assert self.in_play_service.client == self.client
def test_request_error(self, mock_post, mock_headers):
params = {'test': 'me'}
method = 'test'
url = 'https://historicdata.betfair.com/api/test'
mock_post.side_effect = ConnectionError()
with self.assertRaises(APIError):
self.historic.request(params=params, method=method, session=None)
mock_post.side_effect = ValueError()
with self.assertRaises(APIError):
self.historic.request(params=params, method=method, session=None)
def test_request_error(self, mock_post, mock_headers):
params = {'test': 'me'}
method = 'test'
url = 'https://historicdata.betfair.com/api/test'
mock_post.side_effect = ConnectionError()
with self.assertRaises(APIError):
self.historic.request(params=params, method=method, session=None)
mock_post.side_effect = ValueError()
with self.assertRaises(APIError):
self.historic.request(params=params, method=method, session=None)
def request(
self, method: str = None, params: dict = None, session: requests.Session = None
) -> (dict, float):
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
url = "%s%s" % (self.url, method)
try:
response = session.get(url, params=params, headers=self.headers)
except requests.ConnectionError as e:
raise APIError(None, method, params, e)
except Exception as e:
raise APIError(None, method, params, e)
elapsed_time = (datetime.datetime.utcnow() - date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = json_loads(response.text)
except ValueError:
raise InvalidResponse(response.text)
return response_data, elapsed_time
"""
:param str method: Betfair api-ng method to be used.
:param dict params: Params to be used in request
:param Session session: Requests session to be used, reduces latency.
"""
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
try:
response = session.post(
'%s%s' % (self.url, method),
data=json.dumps(params),
headers=self.headers,
timeout=(self.connect_timeout, self.read_timeout)
)
except ConnectionError:
raise APIError(None, method, params, 'ConnectionError')
except Exception as e:
raise APIError(None, method, params, e)
elapsed_time = (datetime.datetime.utcnow()-date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
return response_data, elapsed_time
def request(self, payload=None, params=None, session=None):
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
try:
response = session.post(self.url, headers=self.client.keep_alive_headers)
except ConnectionError:
raise APIError(None, exception='ConnectionError')
except Exception as e:
raise APIError(None, exception=e)
elapsed_time = (datetime.datetime.utcnow() - date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
if self._error_handler:
self._error_handler(response_data)
return response_data, elapsed_time
def request(self, method=None, params=None, session=None, url=None):
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
try:
response = session.get(url, params=params, headers=self.headers)
except ConnectionError:
raise APIError(None, method, params, 'ConnectionError')
except Exception as e:
raise APIError(None, method, params, e)
elapsed_time = (datetime.datetime.utcnow() - date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
return response_data, elapsed_time
def request(self, method=None, params=None, session=None):
session = session or self.client.session
try:
response = session.get(self.url, headers=self.client.request_headers,
timeout=(self.connect_timeout, self.read_timeout))
except ConnectionError:
raise APIError(None, method, params, 'ConnectionError')
except Exception as e:
raise APIError(None, method, params, e)
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
return response_data
def request(self, method=None, params=None, session=None, url=None):
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
try:
response = session.get(url, params=params, headers=self.headers)
except ConnectionError:
raise APIError(None, method, params, 'ConnectionError')
except Exception as e:
raise APIError(None, method, params, e)
elapsed_time = (datetime.datetime.utcnow() - date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
return response_data, elapsed_time