How to use the vaping.plugins.EmitBase function in vaping

To help you get started, we’ve selected a few vaping examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github 20c / vaping / tests / test_plugin.py View on Github external
],
}

anon_config = {
    'type': 'plugin0',
    'var0': 999,
}


@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
    pass


@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
    def emit(self, message):
        pass


@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
    # emit not defined to test TypeError
    pass

@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
    def init(self):
        super(EmitPluginStore, self).init()
        self.store = []
    def emit(self, message):
        self.store.append(message)
github 20c / vaping / tests / test_plugin.py View on Github external
pass


@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
    def emit(self, message):
        pass


@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
    # emit not defined to test TypeError
    pass

@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
    def init(self):
        super(EmitPluginStore, self).init()
        self.store = []
    def emit(self, message):
        self.store.append(message)


@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
    def probe(self):
        msg = self.new_message()
        return msg


@plugin.register('probe0')
class TimedPlugin0(plugins.TimedProbe):
github 20c / vaping / tests / test_plugin.py View on Github external
}


@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
    pass


@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
    def emit(self, message):
        pass


@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
    # emit not defined to test TypeError
    pass

@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
    def init(self):
        super(EmitPluginStore, self).init()
        self.store = []
    def emit(self, message):
        self.store.append(message)


@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
    def probe(self):
        msg = self.new_message()
github 20c / vaping / vaping / plugins / vodka.py View on Github external
for group_name, group_config in list(probe.groups.items()):
        if "hosts" not in group_config:
            continue

        r = {}
        for host in group_config.get("hosts"):
            if isinstance(host, dict):
                r[host["host"]] = host
            else:
                r[host] = {"host":host}
        graphsrv.group.add(probe.name, group_name, r, **group_config)


@vaping.plugin.register('vodka')
class VodkaPlugin(vaping.plugins.EmitBase):

    """
    Plugin that emits to vodka data
    """

    def init(self):
        self._is_started = False

    def start(self):

        if self._is_started:
            return
        self._is_started = True
        vodka.run(self.config, self.vaping.config)

        if graphsrv:
github 20c / vaping / vaping / plugins / zeromq.py View on Github external
from builtins import str

try:
    import zmq.green as zmq
except ImportError:
    zmq = None

import vaping
import vaping.plugins


@vaping.plugin.register('zeromq')
class ZeroMQ(vaping.plugins.EmitBase):
    """
    plugin to emit json encoded messages via zeromq

    # Instanced Attributes

    - ctx (`zmq Context`)
    - sock (`zmq socket`)
    """

    def __init__(self, config, ctx):
        super(ZeroMQ, self).__init__(config, ctx)

        self.sock = None

    def init(self):
        self.log.debug("init zeromq ..")