Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_retrieve_group_filtered(runner_factory, web_session):
runner = runner_factory(HttpService)
http_container = get_container(runner, HttpService)
storage = replace_dependencies(http_container, 'storage')
storage.get_group = lambda group_hash: {
'url': group_hash,
'name': 'datagouvfr',
'url_hash1': 'url1',
'url_hash2': 'url1',
}
def get_url(url_hash):
result = {'url': url_hash}
if url_hash == 'url_hash1':
result['metadata'] = 'meta'
return result
storage.get_url = get_url
runner.start()
def test_crawling_head_offender_url(runner_factory, rpc_proxy_factory):
runner = runner_factory(CrawlerService)
crawler_container = get_container(runner, CrawlerService)
storage = replace_dependencies(crawler_container, 'storage')
runner.start()
config = {'AMQP_URI': 'amqp://guest:guest@localhost:5672/nameko_test'}
dispatch = event_dispatcher(config)
with entrypoint_waiter(crawler_container, 'check_url'):
dispatch('http_server', 'url_to_check',
['http://www.bnf.fr/test_crawling_url', None, None])
assert storage.store_url.call_count == 1
assert storage.store_group.call_count == 0
assert storage.store_metadata.call_count == 1
def test_http(runner_factory, rpc_proxy_factory):
runner = runner_factory(HttpService)
http_server = rpc_proxy_factory('http_server')
http_container = get_container(runner, HttpService)
dispatch = replace_dependencies(http_container, 'dispatch')
runner.start()
http_server.fetch('http://example.org')
assert dispatch.call_count == 1
def test_get_container(runner_factory, rabbit_config):
class ServiceX(object):
name = "service_x"
class ServiceY(object):
name = "service_y"
runner = runner_factory(rabbit_config, ServiceX, ServiceY)
assert get_container(runner, ServiceX).service_cls is ServiceX
assert get_container(runner, ServiceY).service_cls is ServiceY
assert get_container(runner, object) is None
def test_retrieve_group_excluded_empty(runner_factory, web_session):
runner = runner_factory(HttpService)
http_container = get_container(runner, HttpService)
storage = replace_dependencies(http_container, 'storage')
storage.get_group = lambda group_hash: {
'url': group_hash,
'name': 'datagouvfr',
'url_hash1': 'url1',
'url_hash2': 'url1',
}
def get_url(url_hash):
result = {'url': url_hash}
if url_hash == 'url_hash1':
result['metadata'] = ''
return result
storage.get_url = get_url
runner.start()
def test_entrypoint_hook_with_return(runner_factory, rabbit_config):
service_classes = (Service, ServiceA, ServiceB, ServiceC)
runner = runner_factory(rabbit_config, *service_classes)
runner.start()
service_container = get_container(runner, Service)
with entrypoint_hook(service_container, 'working') as working:
assert working("value") == "value-a-b-c"
with entrypoint_hook(service_container, 'broken') as broken:
with pytest.raises(ExampleError):
broken("value")
name = "event_raiser"
dispatch = EventDispatcher()
stack_logger = StackLogger()
@rpc
def say_hello(self):
self.dispatch('hello', self.name)
runner = runner_factory(rabbit_config)
runner.add_service(EventListeningServiceOne)
runner.add_service(EventListeningServiceTwo)
runner.add_service(EventRaisingService)
runner.start()
container = get_container(runner, EventRaisingService)
listener1 = get_container(runner, EventListeningServiceOne)
listener2 = get_container(runner, EventListeningServiceTwo)
with entrypoint_hook(container, "say_hello") as say_hello:
waiter1 = entrypoint_waiter(listener1, 'hello')
waiter2 = entrypoint_waiter(listener2, 'hello')
with waiter1, waiter2:
say_hello()
assert predictable_call_ids.call_count == 3
# order of event handlers and dependencies is non-deterministic,
# so there are four permutations of valid call stacks
possible_call_lists = (
[
call(['event_raiser.say_hello.0']),
call(['event_raiser.say_hello.0', 'listener_one.hello.1']),
def test_entrypoint_hook(runner_factory, rabbit_config):
service_classes = (Service, ServiceA, ServiceB, ServiceC)
runner = runner_factory(rabbit_config, *service_classes)
runner.start()
service_container = get_container(runner, Service)
event_payload = "msg"
with entrypoint_hook(service_container, 'handle') as handle:
with entrypoint_waiter(service_container, 'handle'):
handle(event_payload)
handle_event.assert_called_once_with(event_payload)
def test_graceful_stop_on_one_container_error(runner_factory, rabbit_config):
runner = runner_factory(rabbit_config, ExampleService, SecondService)
runner.start()
container = get_container(runner, ExampleService)
second_container = get_container(runner, SecondService)
original_stop = second_container.stop
with patch.object(second_container, 'stop', autospec=True,
wraps=original_stop) as stop:
rpc_consumer = get_extension(container, RpcConsumer)
with patch.object(
rpc_consumer, 'handle_result', autospec=True) as handle_result:
exception = Exception("error")
handle_result.side_effect = exception
# use a standalone rpc proxy to call exampleservice.task()
with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
# proxy.task() will hang forever because it generates an error
# in the remote container (so never receives a response).
proxy.task.call_async()
from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container
class ServiceA:
name = "service_a"
class ServiceB:
name = "service_b"
# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)
# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)
# start both services
runner.start()
# stop both services
runner.stop()