Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_emission_queuing():
plugin.instantiate(config["plugin"], None)
queue_tester = plugin.get_instance("queue_tester")
emit_store1 = plugin.get_output("emit_store1", None)
emit_store2 = plugin.get_output("emit_store2", None)
# daemon handles this, do it manually here since
# we are not starting the daemon
queue_tester._emit = [emit_store1, emit_store2]
message = queue_tester.probe()
queue_tester.queue_emission(message)
queue_tester.send_emission()
assert emit_store1.store[0] == message
assert emit_store2.store == []
queue_tester.send_emission()
assert emit_store1.store[0] == message
assert len(emit_store1.store) == 1
assert emit_store2.store[0] == message
# copy ctor
obj = plugin.get_instance('fancy_copy', None)
assert 'reeb' == obj.config['str0']
obj = plugin.get_instance({'fancy_copy': {'var0': 'luggage'}}, None)
assert 'reeb' == obj.config['str0']
with pytest.raises(TypeError):
plugin.get_probe('emit0', None)
assert None != plugin.get_probe('probe1', None)
assert not hasattr(plugin.get_probe('probe1', None), 'emit')
with pytest.raises(TypeError):
plugin.get_output('emit_abc', None)
with pytest.raises(TypeError):
plugin.get_output('probe1', None)
assert None != plugin.get_output('emit0', None)
def test_emission_queuing():
plugin.instantiate(config["plugin"], None)
queue_tester = plugin.get_instance("queue_tester")
emit_store1 = plugin.get_output("emit_store1", None)
emit_store2 = plugin.get_output("emit_store2", None)
# daemon handles this, do it manually here since
# we are not starting the daemon
queue_tester._emit = [emit_store1, emit_store2]
message = queue_tester.probe()
queue_tester.queue_emission(message)
queue_tester.send_emission()
assert emit_store1.store[0] == message
assert emit_store2.store == []
queue_tester.send_emission()
assert emit_store1.store[0] == message
assert len(emit_store1.store) == 1
assert obj.config == anon_config
# copy ctor
obj = plugin.get_instance('fancy_copy', None)
assert 'reeb' == obj.config['str0']
obj = plugin.get_instance({'fancy_copy': {'var0': 'luggage'}}, None)
assert 'reeb' == obj.config['str0']
with pytest.raises(TypeError):
plugin.get_probe('emit0', None)
assert None != plugin.get_probe('probe1', None)
assert not hasattr(plugin.get_probe('probe1', None), 'emit')
with pytest.raises(TypeError):
plugin.get_output('emit_abc', None)
with pytest.raises(TypeError):
plugin.get_output('probe1', None)
assert None != plugin.get_output('emit0', None)
"""
process
"""
probes = self.config.get('probes', None)
if not probes:
raise ValueError('no probes specified')
for probe_config in self.config['probes']:
probe = plugin.get_probe(probe_config, self.plugin_context)
# FIXME - needs to check for output defined in plugin
if 'output' not in probe_config:
raise ValueError("no output specified")
# get all output targets and start / join them
for output_name in probe_config['output']:
output = plugin.get_output(output_name, self.plugin_context)
if not output.started:
output.start()
self.joins.append(output)
probe._emit.append(output)
probe.start()
self.joins.append(probe)
vaping.io.joinall(self.joins)
return 0