Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
else:
executor = None # Use default
if contextvars is not None:
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
args = (child,)
kwargs = {}
else:
func = self.func
# Run the code in the right thread
async def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
else:
executor = None # Use default
if contextvars is not None:
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
args = (child,)
kwargs = {}
else:
func = self.func