Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def asgi(receive, send):
nonlocal close_code
websocket = WebSocket(scope, receive=receive, send=send)
await websocket.accept()
try:
await websocket.receive_text()
except WebSocketDisconnect as exc:
close_code = exc.code
async def raw_asgi(scope, receive, send):
if scope["type"] == "http":
response = PlainTextResponse("OK")
await response(scope, receive, send)
else:
ws = WebSocket(scope, receive, send)
await ws.accept()
await ws.send_text("OK")
await ws.close()
async def asgi(receive, send):
websocket = WebSocket(scope, receive=receive, send=send)
await websocket.accept()
await websocket.send_json({"url": str(websocket.url)})
await websocket.close()
async def asgi(receive, send):
websocket = WebSocket(scope, receive=receive, send=send)
await websocket.accept()
asyncio.ensure_future(respond(websocket))
try:
# this will block as the client does not send us data
# it should not prevent `respond` from executing though
await websocket.receive_json()
except WebSocketDisconnect:
pass
async def dispatch(self) -> None:
websocket = WebSocket(self.scope, receive=self.receive, send=self.send)
await self.on_connect(websocket)
close_code = status.WS_1000_NORMAL_CLOSURE
try:
while True:
message = await websocket.receive()
if message["type"] == "websocket.receive":
data = await self.decode(websocket, message)
await self.on_receive(websocket, data)
elif message["type"] == "websocket.disconnect":
close_code = int(message.get("code", status.WS_1000_NORMAL_CLOSURE))
break
except Exception as exc:
close_code = status.WS_1011_INTERNAL_ERROR
raise exc from None
def add_non_field_param_to_dependency(
*, param: inspect.Parameter, dependant: Dependant
) -> Optional[bool]:
if lenient_issubclass(param.annotation, Request):
dependant.request_param_name = param.name
return True
elif lenient_issubclass(param.annotation, WebSocket):
dependant.websocket_param_name = param.name
return True
elif lenient_issubclass(param.annotation, Response):
dependant.response_param_name = param.name
return True
elif lenient_issubclass(param.annotation, BackgroundTasks):
dependant.background_tasks_param_name = param.name
return True
elif lenient_issubclass(param.annotation, SecurityScopes):
dependant.security_scopes_param_name = param.name
return True
return None
async def handle_websocket(
self, scope: Scope, receive: Receive, send: Send
):
websocket = WebSocket(scope, receive, send)
if not self.has_required_scope(websocket):
await websocket.close()
return
await self.app(scope, receive, send)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
session = WebSocket(scope, receive=receive, send=send)
await func(session)
def __init__(self, errors: Sequence[ErrorList]) -> None:
if PYDANTIC_1:
super().__init__(errors, WebSocketErrorModel)
else:
super().__init__(errors, WebSocket) # type: ignore # pragma: nocover
async def __call__(self, scope, receive, send):
ws = WebSocket(scope, receive, send)
before_requests = scope.get("before_requests", [])
for before_request in before_requests.get("ws", []):
await before_request(ws)
await self.endpoint(ws)