Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(this_dir):
config_dir = os.path.join(this_dir, 'data', 'config', 'vodka')
vaping.daemon.Vaping(config_dir=config_dir)
plugin.get_instance("vodka")
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
self.store.append(message)
@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
def probe(self):
msg = self.new_message()
return msg
@plugin.register('probe0')
class TimedPlugin0(plugins.TimedProbe):
default_config = {
'interval': '1m',
'count': 5,
'period': 20,
}
def probe(self):
return []
@plugin.register('probe1')
class ProbePlugin1(plugins.ProbeBase):
def probe(self):
return []
'var0': 999,
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
self.store.append(message)
@plugin.register('queue_tester')
class QueueTester(plugins.ProbeBase):
def probe(self):
}
],
}
anon_config = {
'type': 'plugin0',
'var0': 999,
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def init(self):
super(EmitPluginStore, self).init()
self.store = []
def emit(self, message):
def test_init(this_dir):
config_dir = os.path.join(this_dir, 'data', 'config', 'vodka')
vaping.daemon.Vaping(config_dir=config_dir)
plugin.get_instance("vodka")
def test_init():
with pytest.raises(ValueError):
vaping.plugins.zeromq.ZeroMQ({}, None)
config = {
'bind': 'tcp://*:5555',
'connect': 'tcp://*:5555',
}
with pytest.raises(ValueError):
vaping.plugins.zeromq.ZeroMQ(config, None)
vaping.plugins.zeromq.ZeroMQ({'bind': 'tcp://*:5555'}, None)
'output': [
'emit_store_1',
'emit_store_2'
]
}
],
}
anon_config = {
'type': 'plugin0',
'var0': 999,
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
def test_update_and_create():
"""
test that update() and create() are called accordingly during
emit()
"""
inst = plugin.get_instance(config, None)
t = time.time()
inst.emit({
"type" : "test",
"source" : "test_update_and_create",
"ts" : t,
"data" : [
{ "test" : 123, "a" : "row", "b" : "1" },
{ "test" : 456, "a" : "row", "b" : "2" }
]
})
assert inst.created == True
assert inst.updated["row-1-test"] == (t, 123)
assert inst.updated["row-2-test"] == (t, 456)
def test_whisper(tmpdir):
"""
test whisper-db creation and update from emit
"""
config["filename"] = str(tmpdir.mkdir("whisper").join(FILENAME_TEMPLATE))
inst = plugin.get_instance(config, None)
inst.start()
i = 0
t = int(time.time())-10
# emit and insert 10 data points for 2 graphs at 1 second intervals
while i < 10:
inst.emit({
"type" : "test",
"source" : "test_whisper",
"ts" : t+i,
"data" : [
{ "test" : 123+i, "id" : 1 },
{ "test" : 456+i, "id" : 2 }
]
})
i += 1
def test_init():
data_timeseries_empty = dict(
filename="file",
field="field",
)
vaping.plugin.get_plugin_class("graphite")(data_timeseries_empty, None)