How to use the vaping.plugin.get_output function in vaping

To help you get started, we’ve selected a few vaping examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github 20c / vaping / tests / test_plugin.py View on Github external
def test_emission_queuing():
    plugin.instantiate(config["plugin"], None)
    queue_tester = plugin.get_instance("queue_tester")
    emit_store1 = plugin.get_output("emit_store1", None)
    emit_store2 = plugin.get_output("emit_store2", None)

    # daemon handles this, do it manually here since
    # we are not starting the daemon
    queue_tester._emit = [emit_store1, emit_store2]

    message = queue_tester.probe()
    queue_tester.queue_emission(message)

    queue_tester.send_emission()
    assert emit_store1.store[0] == message
    assert emit_store2.store == []

    queue_tester.send_emission()
    assert emit_store1.store[0] == message
    assert len(emit_store1.store) == 1
    assert emit_store2.store[0] == message
github 20c / vaping / tests / test_plugin.py View on Github external
# copy ctor
    obj = plugin.get_instance('fancy_copy', None)
    assert 'reeb' == obj.config['str0']

    obj = plugin.get_instance({'fancy_copy': {'var0': 'luggage'}}, None)
    assert 'reeb' == obj.config['str0']

    with pytest.raises(TypeError):
        plugin.get_probe('emit0', None)
    assert None != plugin.get_probe('probe1', None)
    assert not hasattr(plugin.get_probe('probe1', None), 'emit')

    with pytest.raises(TypeError):
        plugin.get_output('emit_abc', None)
    with pytest.raises(TypeError):
        plugin.get_output('probe1', None)
    assert None != plugin.get_output('emit0', None)
github 20c / vaping / tests / test_plugin.py View on Github external
def test_emission_queuing():
    plugin.instantiate(config["plugin"], None)
    queue_tester = plugin.get_instance("queue_tester")
    emit_store1 = plugin.get_output("emit_store1", None)
    emit_store2 = plugin.get_output("emit_store2", None)

    # daemon handles this, do it manually here since
    # we are not starting the daemon
    queue_tester._emit = [emit_store1, emit_store2]

    message = queue_tester.probe()
    queue_tester.queue_emission(message)

    queue_tester.send_emission()
    assert emit_store1.store[0] == message
    assert emit_store2.store == []

    queue_tester.send_emission()
    assert emit_store1.store[0] == message
    assert len(emit_store1.store) == 1
github 20c / vaping / tests / test_plugin.py View on Github external
assert obj.config == anon_config

    # copy ctor
    obj = plugin.get_instance('fancy_copy', None)
    assert 'reeb' == obj.config['str0']

    obj = plugin.get_instance({'fancy_copy': {'var0': 'luggage'}}, None)
    assert 'reeb' == obj.config['str0']

    with pytest.raises(TypeError):
        plugin.get_probe('emit0', None)
    assert None != plugin.get_probe('probe1', None)
    assert not hasattr(plugin.get_probe('probe1', None), 'emit')

    with pytest.raises(TypeError):
        plugin.get_output('emit_abc', None)
    with pytest.raises(TypeError):
        plugin.get_output('probe1', None)
    assert None != plugin.get_output('emit0', None)
github 20c / vaping / vaping / daemon.py View on Github external
"""
        process
        """
        probes = self.config.get('probes', None)
        if not probes:
            raise ValueError('no probes specified')

        for probe_config in self.config['probes']:
            probe = plugin.get_probe(probe_config, self.plugin_context)
            # FIXME - needs to check for output defined in plugin
            if 'output' not in probe_config:
                raise ValueError("no output specified")

            # get all output targets and start / join them
            for output_name in probe_config['output']:
                output = plugin.get_output(output_name, self.plugin_context)
                if not output.started:
                    output.start()
                    self.joins.append(output)
                probe._emit.append(output)

            probe.start()
            self.joins.append(probe)

        vaping.io.joinall(self.joins)
        return 0