Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_transport_assertions():
url = "https://foo.bar/"
transport = AsyncMockTransport()
transport.get(url, status_code=404)
transport.post(url, content={"foo": "bar"})
with pytest.raises(AssertionError, match="not called"):
async with httpx.AsyncClient(transport=transport) as client:
response = await client.get(url)
assert response.status_code == 404
async def main():
start = time.time()
yappi.set_clock_type("wall")
yappi.start() # If you don't start yappi, stats.empty() will always be true
client = AsyncClient(app=app, )
async with client:
tasks = [
client.get("http://www.example.org/")
for _ in range(int(sys.argv[1]))
]
resps = await asyncio.gather(*tasks)
for resp in resps:
print(f"Request ID: {resp.json()[0]}")
print(f"Actual timing: {resp.json()[1]* 1000:>8.3f}")
print(f"Server Timing: {resp.headers.get('server-timing')}")
print("-----")
end = time.time()
print(f"TOTAL:{end-start:>8.3f}")
def __init__(self):
self.client = httpx.AsyncClient(verify=settings["VERIFY_HTTPS"])
self,
station: str = None,
lat: float = None,
lon: float = None,
timeout: int = 10,
) -> str:
"""
Asynchronously fetch a report string from the service
"""
if station:
valid_station(station)
elif lat is None or lon is None:
raise ValueError("No valid fetch parameters")
url, params = self._make_url(station, lat, lon)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
if self.method.lower() == "post":
resp = await client.post(
url, params=params, data=self._post_data(station)
)
else:
resp = await client.get(url, params=params)
if resp.status_code != 200:
raise SourceError(
f"{self.__class__.__name__} server returned {resp.status_code}"
)
except (httpx.ConnectTimeout, httpx.ReadTimeout):
raise TimeoutError(f"Timeout from {self.__class__.__name__} server")
except gaierror:
raise ConnectionError(
f"Unable to connect to {self.__class__.__name__} server"
)
async def get_data(self, **kwargs) -> typing.Any:
"""
read the data from a given URL or path to a local file
:param kwargs:
:return: Feeds if Feeds well formed
"""
if 'url_to_parse' not in kwargs:
raise ValueError('you have to provide "url_to_parse" value')
url_to_parse = kwargs.get('url_to_parse', '')
if url_to_parse is False:
raise ValueError('you have to provide "url_to_parse" value')
bypass_bozo = kwargs.get('bypass_bozo', "False")
async with httpx.AsyncClient(timeout=30) as client:
data = await client.get(url_to_parse)
logger.debug(url_to_parse)
data = feedparser.parse(data.text, agent=self.USER_AGENT)
# if the feeds is not well formed, return no data at all
if bypass_bozo is False and data.bozo == 1:
data.entries = ''
log = f"{url_to_parse}: is not valid. You can tick the checkbox "
"'Bypass Feeds error ?' to force the process"
logger.info(log)
return data
log.debug(
(
f"Using {str(self.device.ssl.cert)} to validate connection "
f"to {self.device.name}"
)
)
else:
http_protocol = "http"
endpoint = "{protocol}://{address}:{port}/query/".format(
protocol=http_protocol, address=self.device.address, port=self.device.port
)
log.debug(f"URL endpoint: {endpoint}")
try:
async with httpx.AsyncClient(**client_params) as http_client:
responses = ()
for query in self.query:
encoded_query = await jwt_encode(
payload=query,
secret=self.device.credential.password.get_secret_value(),
duration=params.request_timeout,
)
log.debug(f"Encoded JWT: {encoded_query}")
raw_response = await http_client.post(
endpoint, json={"encoded": encoded_query}
)
log.debug(f"HTTP status code: {raw_response.status_code}")
raw = raw_response.text
async def _update_file(self, timeout: int) -> bool:
"""
Finds and saves the most recent file
"""
# Find the most recent file
async with httpx.AsyncClient(timeout=timeout) as client:
for url in self._urls:
try:
resp = await client.get(url)
if resp.status_code == 200:
break
except (httpx.ConnectTimeout, httpx.ReadTimeout):
return False
except gaierror:
return False
else:
return False
# Save successful file download
new_path = self._new_path()
with new_path.open("wb") as new_file:
new_file.write(resp.content)
return True
async def get_challenge(number: int) -> dict:
async with httpx.AsyncClient() as client:
challenge, *_ = await get_challenges(client, page_index=number - 1, page_size=1)
question = (
await client.post(
constants.Challenges.URL,
headers=dict(accessToken=constants.Challenges.TOKEN),
json={
"pageIndex": 0,
"pageSize": 1,
"where": [
{"field": "sys.slug", "equalTo": challenge["sys"]["slug"]},
{"field": "sys.versionStatus", "equalTo": "published"},
],
},
)
).json()["items"][0]["question"]