Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start(config, args):
"""Start Glances."""
# Load mode
global mode
start_duration = Counter()
if core.is_standalone():
from glances.standalone import GlancesStandalone as GlancesMode
elif core.is_client():
if core.is_client_browser():
from glances.client_browser import GlancesClientBrowser as GlancesMode
else:
from glances.client import GlancesClient as GlancesMode
elif core.is_server():
from glances.server import GlancesServer as GlancesMode
elif core.is_webserver():
from glances.webserver import GlancesWebServer as GlancesMode
# Init the mode
logger.info("Start {} mode".format(GlancesMode.__name__))
mode = GlancesMode(config=config, args=args)
def load_plugins(self, args=None):
"""Load all plugins in the 'plugins' folder."""
start_duration = Counter()
for item in os.listdir(plugins_path):
if (item.startswith(self.header) and
item.endswith(".py") and
item != (self.header + "plugin.py")):
# Load the plugin
start_duration.reset()
self._load_plugin(os.path.basename(item),
args=args, config=self.config)
logger.debug("Plugin {} started in {} seconds".format(item,
start_duration.get()))
# Log plugins list
logger.debug("Active plugins list: {}".format(self.getPluginsList()))
def __serve_forever(self):
"""Main loop for the CLI.
return True if we should continue (no exit key has been pressed)
"""
# Start a counter used to compute the time needed for
# update and export the stats
counter = Counter()
# Update stats
self.stats.update()
logger.debug('Stats updated duration: {} seconds'.format(counter.get()))
# Export stats
counter_export = Counter()
self.stats.export(self.stats)
logger.debug('Stats exported duration: {} seconds'.format(counter_export.get()))
# Patch for issue1326 to avoid < 0 refresh
adapted_refresh = self.refresh_time - counter.get()
adapted_refresh = adapted_refresh if adapted_refresh > 0 else 0
# Display stats
# and wait refresh_time - counter
if not self.quiet:
# The update function return True if an exit key 'q' or 'ESC'
# has been pressed.
ret = not self.screen.update(self.stats, duration=adapted_refresh)
else:
# Nothing is displayed
# Break should be done via a signal (CTRL-C)
def update(self):
"""Wrapper method to update the stats."""
# For standalone and server modes
# For each plugins, call the update method
for p in self._plugins:
if self._plugins[p].is_disable():
# If current plugin is disable
# then continue to next plugin
continue
start_duration = Counter()
# Update the stats...
self._plugins[p].update()
# ... the history
self._plugins[p].update_stats_history()
# ... and the views
self._plugins[p].update_views()
# logger.debug("Plugin {} update duration: {} seconds".format(p,
def __serve_forever(self):
"""Main loop for the CLI.
return True if we should continue (no exit key has been pressed)
"""
# Start a counter used to compute the time needed for
# update and export the stats
counter = Counter()
# Update stats
self.stats.update()
logger.debug('Stats updated duration: {} seconds'.format(counter.get()))
# Export stats
counter_export = Counter()
self.stats.export(self.stats)
logger.debug('Stats exported duration: {} seconds'.format(counter_export.get()))
# Patch for issue1326 to avoid < 0 refresh
adapted_refresh = self.refresh_time - counter.get()
adapted_refresh = adapted_refresh if adapted_refresh > 0 else 0
# Display stats
# and wait refresh_time - counter
timeout_opt = '-t'
count_opt = '-c'
else:
# Linux and co...
timeout_opt = '-W'
count_opt = '-c'
# Build the command line
# Note: Only string are allowed
cmd = ['ping',
count_opt, '1',
timeout_opt, str(self._resolv_name(port['timeout'])),
self._resolv_name(port['host'])]
fnull = open(os.devnull, 'w')
try:
counter = Counter()
ret = subprocess.check_call(cmd, stdout=fnull, stderr=fnull, close_fds=True)
if ret == 0:
port['status'] = counter.get()
else:
port['status'] = False
except subprocess.CalledProcessError as e:
# Correct issue #1084: No Offline status for timeouted ports
port['status'] = False
except Exception as e:
logger.debug("{}: Error while pinging host {} ({})".format(self.plugin_name, port['host'], e))
return ret
def _port_scan_tcp(self, port):
"""Scan the (TCP) port structure (dict) and update the status key."""
ret = None
# Create and configure the scanning socket
try:
socket.setdefaulttimeout(port['timeout'])
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except Exception as e:
logger.debug("{}: Error while creating scanning socket".format(self.plugin_name))
# Scan port
ip = self._resolv_name(port['host'])
counter = Counter()
try:
ret = _socket.connect_ex((ip, int(port['port'])))
except Exception as e:
logger.debug("{}: Error while scanning port {} ({})".format(self.plugin_name, port, e))
else:
if ret == 0:
port['status'] = counter.get()
else:
port['status'] = False
finally:
_socket.close()
return ret