Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_context_manager():
with pook.use():
pook.get('server.com/bar', reply=204)
res = requests.get('http://server.com/bar')
assert res.status_code == 204
def test_request_with_context_manager(self):
with pook.use():
pook.get('server.com/bar', reply=204)
res = requests.get('http://server.com/bar')
self.assertEqual(res.status_code, 204)
def test_search_on_github_cache_terraform_releases_does_not_cache_error_404(
tmp_working_dir, terraform_releases_html_after_v0_13_0, # noqa: F811
): # noqa: D103
with mock.patch("io.BytesIO", AutoclosingBytesIO):
with pook.use():
repo = "hashicorp/terraform"
releases_url = "https://github.com/{}/releases?after=v0.13.0".format(repo)
# volatile mocks that can only be invoked once each
pook.get(
releases_url, reply=404, response_headers={"Status": "404 Not found", "Date": formatdate(usegmt=True)}, times=1,
)
pook.get(
releases_url,
reply=200,
response_type="text/plain",
response_body=terraform_releases_html_after_v0_13_0,
response_headers={
"Status": "200 OK",
"ETag": 'W/"df0474ebd25f223a95926ba58e11e77b"',
"Cache-Control": "max-age=0, private, must-revalidate",
def test_search_on_github_cache_terraform_releases_does_not_cache_error_429(
tmp_working_dir, terraform_releases_html_after_v0_13_0, # noqa: F811
): # noqa: D103
with mock.patch("io.BytesIO", AutoclosingBytesIO):
with pook.use():
repo = "hashicorp/terraform"
releases_url = "https://github.com/{}/releases?after=v0.13.0".format(repo)
# volatile mocks that can only be invoked once each
pook.get(
releases_url,
reply=429,
response_headers={"Status": "429 Too Many Requests", "Date": formatdate(usegmt=True), "Retry-After": "120"},
times=1,
)
pook.get(
releases_url,
reply=200,
response_type="text/plain",
response_body=terraform_releases_html_after_v0_13_0,
response_headers={
def test_context_manager():
with pook.use():
pook.get('server.com/baz', reply=204)
res = requests.get('http://server.com/baz')
assert res.status_code == 204
def test_context_manager():
with pook.use():
pook.get('server.com/baz', reply=204)
res = requests.get('http://server.com/baz')
assert res.status_code == 204
def test_search_on_github_cache_terraform_releases_does_not_cache_error_403(
tmp_working_dir, terraform_releases_html_after_v0_13_0, # noqa: F811
): # noqa: D103
with mock.patch("io.BytesIO", AutoclosingBytesIO):
with pook.use():
repo = "hashicorp/terraform"
releases_url = "https://github.com/{}/releases?after=v0.13.0".format(repo)
# volatile mocks that can only be invoked once each
pook.get(
releases_url, reply=403, response_headers={"Status": "403 Forbidden", "Date": formatdate(usegmt=True)}, times=1,
)
pook.get(
releases_url,
reply=200,
response_type="text/plain",
response_body=terraform_releases_html_after_v0_13_0,
response_headers={
"Status": "200 OK",
"ETag": 'W/"df0474ebd25f223a95926ba58e11e77b"',
"Cache-Control": "max-age=0, private, must-revalidate",
import pook
import aiohttp
import asyncio
import async_timeout
async def fetch(session, url, data):
with async_timeout.timeout(10):
async with session.get(url, data=data) as res:
print('Status:', res.status)
print('Headers:', res.headers)
print('Body:', await res.text())
with pook.use(network=True):
pook.get('http://httpbin.org/ip',
reply=404, response_type='json',
response_headers={'Server': 'nginx'},
response_json={'error': 'not found'})
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
await fetch(session, 'http://httpbin.org/ip',
bytearray('foo bar', 'utf-8'))
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))