Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_context_id(self):
"""
Get the ID we should use for looking up variables
"""
# Prevent a circular reference
from .sync import AsyncToSync, SyncToAsync
# First, pull the current task if we can
context_id = SyncToAsync.get_current_task()
context_is_async = True
# OK, let's try for a thread ID
if context_id is None:
context_id = threading.current_thread()
context_is_async = False
# If we're thread-critical, we stop here, as we can't share contexts.
if self._thread_critical:
return context_id
# Now, take those and see if we can resolve them through the launch maps
for i in range(sys.getrecursionlimit()):
try:
if context_is_async:
# Tasks have a source thread in AsyncToSync
context_id = AsyncToSync.launch_map[context_id]
context_is_async = False
else:
async def main_wrap(self, args, kwargs, call_result, source_thread, exc_info):
"""
Wraps the awaitable with something that puts the result into the
result/exception future.
"""
current_task = SyncToAsync.get_current_task()
self.launch_map[current_task] = source_thread
try:
# If we have an exception, run the function inside the except block
# after raising it so exc_info is correctly populated.
if exc_info[1]:
try:
raise exc_info[1]
except:
result = await self.awaitable(*args, **kwargs)
else:
result = await self.awaitable(*args, **kwargs)
except Exception as e:
call_result.set_exception(e)
else:
call_result.set_result(result)
finally: