Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_soft_limit(server):
"""
The soft_limit config should limit the maximum number of keep-alive connections.
"""
pool_limits = httpx.PoolLimits(soft_limit=1)
async with ConnectionPool(pool_limits=pool_limits) as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 1
response = await http.request("GET", "http://localhost:8000/")
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 1
def test_limits_repr():
limits = httpx.PoolLimits(hard_limit=100)
assert repr(limits) == "PoolLimits(soft_limit=None, hard_limit=100)"
def test_limits_eq():
limits = httpx.PoolLimits(hard_limit=100)
assert limits == httpx.PoolLimits(hard_limit=100)
async def test_pool_timeout(server):
pool_limits = httpx.PoolLimits(hard_limit=1)
timeout = httpx.Timeout(pool_timeout=1e-4)
async with httpx.Client(pool_limits=pool_limits, timeout=timeout) as client:
async with client.stream("GET", server.url):
with pytest.raises(httpx.PoolTimeout):
await client.get("http://localhost:8000/")
def test_limits_eq():
limits = httpx.PoolLimits(hard_limit=100)
assert limits == httpx.PoolLimits(hard_limit=100)
def setup_session(proxy: Optional[str], concurrency: int):
s = httpx.AsyncClient(
pool_limits = httpx.PoolLimits(
soft_limit = concurrency * 2,
hard_limit = concurrency,
),
timeout = httpx.Timeout(20, pool_timeout=None),
headers = {'User-Agent': DEFAULT_USER_AGENT},
http2 = True,
proxies = proxy,
)
session.set(Session(s, concurrency))
def get(self, *args, **kwargs) -> ResponseManager:
proxy = kwargs.pop('proxy', None)
client = self.clients.get(proxy)
if not client:
client = httpx.AsyncClient(
pool_limits = httpx.PoolLimits(
max_keepalive = self.concurrency * 2,
max_connections = self.concurrency,
),
timeout = httpx.Timeout(20, pool_timeout=None),
headers = {'User-Agent': DEFAULT_USER_AGENT},
http2 = True,
proxies = proxy,
)
self.clients[proxy] = client
return ResponseManager(client, args, kwargs)