Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
],
}
anon_config = {
'type': 'plugin0',
'var0': 999,
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
self.store.append(message)
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
self.store.append(message)
@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
def probe(self):
msg = self.new_message()
return msg
@plugin.register('probe0')
class TimedPlugin0(plugins.TimedProbe):
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
self.store.append(message)
@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
def probe(self):
msg = self.new_message()
for group_name, group_config in list(probe.groups.items()):
if "hosts" not in group_config:
continue
r = {}
for host in group_config.get("hosts"):
if isinstance(host, dict):
r[host["host"]] = host
else:
r[host] = {"host":host}
graphsrv.group.add(probe.name, group_name, r, **group_config)
@vaping.plugin.register('vodka')
class VodkaPlugin(vaping.plugins.EmitBase):
"""
Plugin that emits to vodka data
"""
def init(self):
self._is_started = False
def start(self):
if self._is_started:
return
self._is_started = True
vodka.run(self.config, self.vaping.config)
if graphsrv:
from builtins import str
try:
import zmq.green as zmq
except ImportError:
zmq = None
import vaping
import vaping.plugins
@vaping.plugin.register('zeromq')
class ZeroMQ(vaping.plugins.EmitBase):
"""
plugin to emit json encoded messages via zeromq
# Instanced Attributes
- ctx (`zmq Context`)
- sock (`zmq socket`)
"""
def __init__(self, config, ctx):
super(ZeroMQ, self).__init__(config, ctx)
self.sock = None
def init(self):
self.log.debug("init zeromq ..")