Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_http_methods(client):
async with respx.mock:
url = "https://foo.bar/"
m = respx.get(url, status_code=404)
respx.post(url, status_code=201)
respx.put(url, status_code=202)
respx.patch(url, status_code=500)
respx.delete(url, status_code=204)
respx.head(url, status_code=405)
respx.options(url, status_code=501)
response = httpx.get(url)
assert response.status_code == 404
response = await client.get(url)
assert response.status_code == 404
response = httpx.post(url)
assert response.status_code == 201
response = await client.post(url)
assert response.status_code == 201
response = httpx.put(url)
assert response.status_code == 202
response = await client.put(url)
assert response.status_code == 202
response = httpx.patch(url)
assert response.status_code == 500
async def test_callable_content(client):
async with MockTransport() as respx_mock:
url_pattern = re.compile(r"https://foo.bar/(?P\w+)/")
content = lambda request, slug: f"hello {slug}"
request = respx_mock.get(url_pattern, content=content)
async_response = await client.get("https://foo.bar/world/")
assert request.called is True
assert async_response.status_code == 200
assert async_response.text == "hello world"
respx_mock.reset()
sync_response = httpx.get("https://foo.bar/world/")
assert request.called is True
assert sync_response.status_code == 200
assert sync_response.text == "hello world"
def get_latest_tag(self, app):
resp = httpx.get(
f"{self.config.cs_url}/publish/api/{app['owner']}/{app['title']}/deployments/",
headers={"Authorization": f"Token {self.cs_api_token}"},
)
assert (
resp.status_code == 200
), f"Got: {resp.url} {resp.status_code} {resp.text}"
return resp.json()["latest_tag"]
def _get_status(self) -> _Status:
url = self._get_base_url() + "status.json"
response_json = httpx.get(url).json()
status_dict = response_json["status"]
return Status(**status_dict)
def get_content_from_url(self, url):
"""
Fetch url and try to get the title and description from the response
"""
title = None
description = None
timeout = httpx.Timeout(10.0, connect_timeout=2.0, read_timeout=5.0)
try:
r = httpx.get(url, timeout=timeout)
except Exception as e:
self.logger.error(f"Failed fetching url {url}. Error: {e}")
return (title, description)
if r.status_code != 200:
self.logger.info(f"Failed fetching url {url}. Status code: {r.status_code}")
return (title, description)
# try parse and get the title
try:
soup = BeautifulSoup(r.text, "html.parser")
# Prefer og:title first (for example Youtube uses this)
ogtitle = soup.find("meta", property="og:title")
if ogtitle:
title = ogtitle["content"]
elif soup.head and soup.head.title:
def _get_status(self) -> _Status:
url = self._get_base_url()
response_json = httpx.get(url).json()
status_dict = response_json["result"]["status_overall"]
return _Status(**status_dict)
def pr_status():
"""Show status of PR found in requirement files."""
for pr in search_prs():
r = httpx.get(f"https://api.github.com/repos/{pr.org}/{pr.repo}/pulls/{pr.pr}")
r.raise_for_status()
state = display_state(r.json())
click.echo(f"https://github.com/{pr.org}/{pr.repo}/pull/{pr.pr} is {state}")
def urlfetch(url, etag, headers=None):
if headers is None:
headers = {}
if etag is not None:
headers[HTTP_HDR_IF_NONE_MATCH] = etag
logging.info("requesting url '%s', headers = %s", url, headers)
result = httpx.get(url, headers=headers, timeout=URLFETCH_TIMEOUT_SECONDS)
logging.info("response status for url %s is %s", url, result.status_code)
return result
def get_comments(github_event_data: dict) -> typing.List[dict]:
comments_link = get_comments_link(github_event_data)
comments_request = httpx.get(comments_link)
return comments_request.json()