How to use the nameko.testing.utils.get_container function in nameko

To help you get started, we’ve selected a few nameko examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github etalab / croquemort / tests / test_integrations.py View on Github external
def test_retrieve_group_filtered(runner_factory, web_session):
    runner = runner_factory(HttpService)
    http_container = get_container(runner, HttpService)
    storage = replace_dependencies(http_container, 'storage')
    storage.get_group = lambda group_hash: {
        'url': group_hash,
        'name': 'datagouvfr',
        'url_hash1': 'url1',
        'url_hash2': 'url1',
    }

    def get_url(url_hash):
        result = {'url': url_hash}
        if url_hash == 'url_hash1':
            result['metadata'] = 'meta'
        return result

    storage.get_url = get_url
    runner.start()
github etalab / croquemort / tests / test_integrations.py View on Github external
def test_crawling_head_offender_url(runner_factory, rpc_proxy_factory):
    runner = runner_factory(CrawlerService)
    crawler_container = get_container(runner, CrawlerService)
    storage = replace_dependencies(crawler_container, 'storage')
    runner.start()
    config = {'AMQP_URI': 'amqp://guest:guest@localhost:5672/nameko_test'}
    dispatch = event_dispatcher(config)
    with entrypoint_waiter(crawler_container, 'check_url'):
        dispatch('http_server', 'url_to_check',
                 ['http://www.bnf.fr/test_crawling_url', None, None])
    assert storage.store_url.call_count == 1
    assert storage.store_group.call_count == 0
    assert storage.store_metadata.call_count == 1
github etalab / croquemort / tests / test_services.py View on Github external
def test_http(runner_factory, rpc_proxy_factory):
    runner = runner_factory(HttpService)
    http_server = rpc_proxy_factory('http_server')
    http_container = get_container(runner, HttpService)
    dispatch = replace_dependencies(http_container, 'dispatch')
    runner.start()
    http_server.fetch('http://example.org')
    assert dispatch.call_count == 1
github nameko / nameko / test / testing / test_utils.py View on Github external
def test_get_container(runner_factory, rabbit_config):

    class ServiceX(object):
        name = "service_x"

    class ServiceY(object):
        name = "service_y"

    runner = runner_factory(rabbit_config, ServiceX, ServiceY)

    assert get_container(runner, ServiceX).service_cls is ServiceX
    assert get_container(runner, ServiceY).service_cls is ServiceY
    assert get_container(runner, object) is None
github etalab / croquemort / tests / test_integrations.py View on Github external
def test_retrieve_group_excluded_empty(runner_factory, web_session):
    runner = runner_factory(HttpService)
    http_container = get_container(runner, HttpService)
    storage = replace_dependencies(http_container, 'storage')
    storage.get_group = lambda group_hash: {
        'url': group_hash,
        'name': 'datagouvfr',
        'url_hash1': 'url1',
        'url_hash2': 'url1',
    }

    def get_url(url_hash):
        result = {'url': url_hash}
        if url_hash == 'url_hash1':
            result['metadata'] = ''
        return result

    storage.get_url = get_url
    runner.start()
github nameko / nameko / test / testing / test_services.py View on Github external
def test_entrypoint_hook_with_return(runner_factory, rabbit_config):

    service_classes = (Service, ServiceA, ServiceB, ServiceC)
    runner = runner_factory(rabbit_config, *service_classes)
    runner.start()

    service_container = get_container(runner, Service)

    with entrypoint_hook(service_container, 'working') as working:
        assert working("value") == "value-a-b-c"

    with entrypoint_hook(service_container, 'broken') as broken:
        with pytest.raises(ExampleError):
            broken("value")
github nameko / nameko / test / test_call_id_stack.py View on Github external
name = "event_raiser"
        dispatch = EventDispatcher()

        stack_logger = StackLogger()

        @rpc
        def say_hello(self):
            self.dispatch('hello', self.name)

    runner = runner_factory(rabbit_config)
    runner.add_service(EventListeningServiceOne)
    runner.add_service(EventListeningServiceTwo)
    runner.add_service(EventRaisingService)
    runner.start()

    container = get_container(runner, EventRaisingService)
    listener1 = get_container(runner, EventListeningServiceOne)
    listener2 = get_container(runner, EventListeningServiceTwo)
    with entrypoint_hook(container, "say_hello") as say_hello:
        waiter1 = entrypoint_waiter(listener1, 'hello')
        waiter2 = entrypoint_waiter(listener2, 'hello')
        with waiter1, waiter2:
            say_hello()

    assert predictable_call_ids.call_count == 3

    # order of event handlers and dependencies is non-deterministic,
    # so there are four permutations of valid call stacks
    possible_call_lists = (
        [
            call(['event_raiser.say_hello.0']),
            call(['event_raiser.say_hello.0', 'listener_one.hello.1']),
github nameko / nameko / test / testing / test_services.py View on Github external
def test_entrypoint_hook(runner_factory, rabbit_config):

    service_classes = (Service, ServiceA, ServiceB, ServiceC)
    runner = runner_factory(rabbit_config, *service_classes)
    runner.start()

    service_container = get_container(runner, Service)

    event_payload = "msg"
    with entrypoint_hook(service_container, 'handle') as handle:
        with entrypoint_waiter(service_container, 'handle'):
            handle(event_payload)
    handle_event.assert_called_once_with(event_payload)
github nameko / nameko / test / test_errors.py View on Github external
def test_graceful_stop_on_one_container_error(runner_factory, rabbit_config):

    runner = runner_factory(rabbit_config, ExampleService, SecondService)
    runner.start()

    container = get_container(runner, ExampleService)
    second_container = get_container(runner, SecondService)
    original_stop = second_container.stop
    with patch.object(second_container, 'stop', autospec=True,
                      wraps=original_stop) as stop:
        rpc_consumer = get_extension(container, RpcConsumer)
        with patch.object(
                rpc_consumer, 'handle_result', autospec=True) as handle_result:
            exception = Exception("error")
            handle_result.side_effect = exception

            # use a standalone rpc proxy to call exampleservice.task()
            with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
                # proxy.task() will hang forever because it generates an error
                # in the remote container (so never receives a response).
                proxy.task.call_async()
github nameko / nameko / docs / examples / service_runner.py View on Github external
from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container

class ServiceA:
    name = "service_a"

class ServiceB:
    name = "service_b"

# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)

# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)

# start both services
runner.start()

# stop both services
runner.stop()