Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import collections
import os
import abc
import copy
import datetime
import logging
import munge
from future.utils import with_metaclass
from vaping.config import parse_interval
import vaping.io
class PluginBase(vaping.io.Thread):
"""
Base plugin interface
# Instanced Attributes
- config (`dict`): plugin config
- vaping: reference to the main vaping object
Calls `self.init()` prefork while loading all modules, init() should
not do anything active, any files opened may be closed when it forks.
Plugins should prefer `init()` to `__init__()` to ensure the class is
completely done initializing.
Calls `self.on_start()` and `self.on_stop()` before and after running in
case any connections need to be created or cleaned up.
def popen(self, args, **kwargs):
"""
creates a subprocess with passed args
**Returns**
Popen instance
"""
self.log.debug("popen %s", ' '.join(args))
return vaping.io.subprocess.Popen(args, **kwargs)
def _run(self):
self.run_level = 1
while self.run_level:
self.send_emission()
for msg in self.probe():
self.queue_emission(msg)
vaping.io.sleep(0.1)
def __init__(self, config, ctx, emit=None):
if emit:
self._emit = [emit]
else:
self._emit = []
self._emit_queue = vaping.io.Queue()
super(ProbeBase, self).__init__(config, ctx)