Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def aio_trampolines(value_strategy=anything()):
dones = builds(aio_trampoline.Done, value_strategy)
@composite
def call(draw):
t = draw(aio_trampolines(value_strategy))
async def f():
return t
return aio_trampoline.Call(f)
@composite
def and_then(draw):
t = draw(aio_trampolines(value_strategy))
cont = lambda _: t
return aio_trampoline.AndThen(
draw(aio_trampolines(value_strategy)), cont
)
async def run_e(_) -> Trampoline[Either[OSError, str]]:
try:
with open(path) as f:
contents = f.read()
return Done(Right(contents))
except OSError as e:
return Done(Left(e))
async def run_e(_):
self.logger.info(msg, stack_info=stack_info, exc_info=exc_info)
return Done(Right(None))
async def run_e(_) -> Trampoline[Either[OSError, None]]:
try:
with open(path, 'wb') as f:
f.write(content)
return Done(Right(None))
except OSError as e:
return Done(Left(e))
async def run_e(_):
logging.debug(msg, stack_info=stack_info, exc_info=exc_info)
return Done(Right(None))
async def run_e(_) -> Trampoline[Either[OSError, None]]:
try:
with open(path, 'a+') as f:
f.write(content)
return Done(Right(None))
except OSError as e:
return Done(Left(e))
async def run_e(_):
return Done(Right(value))
def cont(either: Either):
if isinstance(either, Left):
return Done(either)
async def thunk():
next_ = f(either.get)
if asyncio.iscoroutine(next_):
effect = await next_
else:
effect = next_
return await effect.run_e(r)
return Call(thunk)