Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def app(scope, receive, send):
request = Request(scope, receive)
body = await request.body()
response = JSONResponse({"body": body.decode()})
await response(scope, receive, send)
async def app(scope, receive, send):
nonlocal disconnected_after_response
request = Request(scope, receive)
await request.body()
disconnected = await request.is_disconnected()
response = JSONResponse({"disconnected": disconnected})
await response(scope, receive, send)
disconnected_after_response = await request.is_disconnected()
async def app(scope, receive, send):
request = Request(scope, receive)
headers = dict(request.headers)
response = JSONResponse({"headers": headers})
await response(scope, receive, send)
async def test_get_from_emtpy_cache(cache: Cache, method: str) -> None:
scope: Scope = {
"type": "http",
"method": method,
"path": "/path",
"headers": [],
}
request = Request(scope)
response = await get_from_cache(request, cache=cache)
assert response is None
async def app(scope, receive, send):
request = Request(scope, receive)
response = JSONResponse(
{"host": request.client.host, "port": request.client.port}
)
await response(scope, receive, send)
async def app(scope, receive, send):
request = Request(scope, receive)
data = {"method": request.method, "url": str(request.url)}
response = JSONResponse(data)
await response(scope, receive, send)
async def app(scope, receive, send):
request = Request(scope, receive)
body = await request.body()
chunks = b""
async for chunk in request.stream():
chunks += chunk
response = JSONResponse({"body": body.decode(), "stream": chunks.decode()})
await response(scope, receive, send)
def __init__(self, app: ASGIApp, *, cache: Cache) -> None:
self.app = app
self.cache = cache
self.send: Send = unattached_send
self.initial_message: Message = {}
self.is_response_cachable = True
self.request: typing.Optional[Request] = None
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.executor is None and self.executor_class is not None:
self.executor = self.executor_class()
request = Request(scope, receive=receive)
response = await self.handle_graphql(request)
await response(scope, receive, send)
async def __call__(self, receive: Receive, send: Send) -> None:
request = Request(self.scope, receive=receive)
response = await self.dispatch(request)
await response(receive, send)