Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if context_id is None:
context_id = threading.current_thread()
context_is_async = False
# If we're thread-critical, we stop here, as we can't share contexts.
if self._thread_critical:
return context_id
# Now, take those and see if we can resolve them through the launch maps
for i in range(sys.getrecursionlimit()):
try:
if context_is_async:
# Tasks have a source thread in AsyncToSync
context_id = AsyncToSync.launch_map[context_id]
context_is_async = False
else:
# Threads have a source task in SyncToAsync
context_id = SyncToAsync.launch_map[context_id]
context_is_async = True
except KeyError:
break
else:
# Catch infinite loops (they happen if you are screwing around
# with AsyncToSync implementations)
raise RuntimeError("Infinite launch_map loops")
return context_id
from django.db import close_old_connections
from asgiref.sync import SyncToAsync
class DatabaseSyncToAsync(SyncToAsync):
"""
SyncToAsync version that cleans up old database connections when it exits.
"""
def thread_handler(self, loop, *args, **kwargs):
close_old_connections()
try:
return super().thread_handler(loop, *args, **kwargs)
finally:
close_old_connections()
# The class is TitleCased, but we want to encourage use as a callable/decorator
database_sync_to_async = DatabaseSyncToAsync
self.awaitable = awaitable
try:
self.__self__ = self.awaitable.__self__
except AttributeError:
pass
if force_new_loop:
# They have asked that we always run in a new sub-loop.
self.main_event_loop = None
else:
try:
self.main_event_loop = asyncio.get_event_loop()
except RuntimeError:
# There's no event loop in this thread. Look for the threadlocal if
# we're inside SyncToAsync
self.main_event_loop = getattr(
SyncToAsync.threadlocal, "main_event_loop", None
)
Returns None if there is no task.
"""
try:
if hasattr(asyncio, "current_task"):
# Python 3.7 and up
return asyncio.current_task()
else:
# Python 3.6
return asyncio.Task.current_task()
except RuntimeError:
return None
# Lowercase is more sensible for most things
sync_to_async = SyncToAsync
async_to_sync = AsyncToSync
asyncio.TimeoutError,
websockets.InvalidHandshake,
websockets.WebSocketProtocolError) as e:
if any((isinstance(e, ConnectionClosed) and e.code == 1000, # clean disconnect
not isinstance(e, ConnectionClosed))):
await wrapped(*args, **kwargs)
else:
raise
return wrapped
return wrapper
# somehow asgiref messed up their class decorators??? so this is necessary
class SyncToAsyncThreadSafe(SyncToAsync):
def __init__(self, func):
super().__init__(func, thread_sensitive=True)
sync_to_async = SyncToAsync
sync_to_async_threadsafe = SyncToAsyncThreadSafe
async_to_sync = AsyncToSync
class AsyncUsingDB(SyncToAsyncThreadSafe):
@property
def sync(self):
return self.func
async_using_db = AsyncUsingDB