Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_message(self, message):
if not self.chat:
AsyncToSync(self.send_json)(
{
'error': 'NO_CHAT_JOINED'
}
)
else:
try:
blacklist_validator(message)
Messages.objects.send_message(sender=self.scope['user'], receiver_id=self.chat, message=message)
except ValidationError:
AsyncToSync(self.send_json)(
{
'error': 'BLACKLISTED_MESSAGE'
}
Returns None if there is no task.
"""
try:
if hasattr(asyncio, "current_task"):
# Python 3.7 and up
return asyncio.current_task()
else:
# Python 3.6
return asyncio.Task.current_task()
except RuntimeError:
return None
# Lowercase is more sensible for most things
sync_to_async = SyncToAsync
async_to_sync = AsyncToSync
def send_message(self, message):
if not self.chat:
AsyncToSync(self.send_json)(
{
'error': 'NO_CHAT_JOINED'
}
)
else:
try:
blacklist_validator(message)
Messages.objects.send_message(sender=self.scope['user'], receiver_id=self.chat, message=message)
except ValidationError:
AsyncToSync(self.send_json)(
{
'error': 'BLACKLISTED_MESSAGE'
}
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
raise ValueError("WSGI wrapper received a non-HTTP scope")
self.scope = scope
with SpooledTemporaryFile(max_size=65536) as body:
# Alright, wait for the http.request messages
while True:
message = await receive()
if message["type"] != "http.request":
raise ValueError("WSGI wrapper received a non-HTTP-request message")
body.write(message.get("body", b""))
if not message.get("more_body"):
break
body.seek(0)
# Wrap send so it can be called from the subthread
self.sync_send = AsyncToSync(send)
# Call the WSGI app
await self.run_wsgi_app(body)