How to use the pfun.aio_trampoline.Done function in pfun

To help you get started, we’ve selected a few pfun examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github suned / pfun / tests / strategies.py View on Github external
def aio_trampolines(value_strategy=anything()):
    dones = builds(aio_trampoline.Done, value_strategy)

    @composite
    def call(draw):
        t = draw(aio_trampolines(value_strategy))

        async def f():
            return t
        return aio_trampoline.Call(f)

    @composite
    def and_then(draw):
        t = draw(aio_trampolines(value_strategy))
        cont = lambda _: t
        return aio_trampoline.AndThen(
            draw(aio_trampolines(value_strategy)), cont
        )
github suned / pfun / pfun / files.py View on Github external
async def run_e(_) -> Trampoline[Either[OSError, str]]:
            try:
                with open(path) as f:
                    contents = f.read()
                return Done(Right(contents))
            except OSError as e:
                return Done(Left(e))
github suned / pfun / pfun / logging.py View on Github external
async def run_e(_):
            self.logger.info(msg, stack_info=stack_info, exc_info=exc_info)
            return Done(Right(None))
github suned / pfun / pfun / files.py View on Github external
async def run_e(_) -> Trampoline[Either[OSError, None]]:
            try:
                with open(path, 'wb') as f:
                    f.write(content)
                return Done(Right(None))
            except OSError as e:
                return Done(Left(e))
github suned / pfun / pfun / logging.py View on Github external
async def run_e(_):
            logging.debug(msg, stack_info=stack_info, exc_info=exc_info)
            return Done(Right(None))
github suned / pfun / pfun / files.py View on Github external
async def run_e(_) -> Trampoline[Either[OSError, None]]:
            try:
                with open(path, 'a+') as f:
                    f.write(content)
                return Done(Right(None))
            except OSError as e:
                return Done(Left(e))
github suned / pfun / pfun / effect.py View on Github external
async def run_e(_):
        return Done(Right(value))
github suned / pfun / pfun / effect.py View on Github external
def cont(either: Either):
                    if isinstance(either, Left):
                        return Done(either)

                    async def thunk():
                        next_ = f(either.get)
                        if asyncio.iscoroutine(next_):
                            effect = await next_
                        else:
                            effect = next_
                        return await effect.run_e(r)

                    return Call(thunk)