Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_default_interface(self):
container = SystemManager()
container._discover_objects()
os_obj = container.objects.objects[container.objects.objects_by_type[container.type][0]]
collector = SystemCommonMetaCollector(object=os_obj)
collector.collect()
default_from_netstat, _ = subp.call(
'netstat -nr | egrep -i "^0.0.0.0|default" | head -1 | sed "s/.*[ ]\([^ ][^ ]*\)$/\\1/"'
)[0]
default_interface = os_obj.metad.current['network']['default']
assert_that(default_interface, equal_to(default_from_netstat))
def teardown_class(cls):
subp.call('supervisorctl -c /etc/supervisord.conf shutdown')
def stop_first_nginx(self):
subp.call('service nginx stop', check=False)
time.sleep(0.5)
def get_master_workers(self):
master, workers = None, []
ps, _ = subp.call('ps -xa -o pid,ppid,command | egrep "PID|nginx" | grep -v egrep')
for line in ps:
# 21355 1 nginx: master process /usr/sbin/nginx
gwe = re.match(r'\s*(?P\d+)\s+(?P\d+)\s+(?P.+)\s*', line)
# if not parsed - switch to next line
if not gwe or 'py.test' in line:
continue
pid = int(gwe.group('pid'))
cmd = gwe.group('cmd')
if 'nginx: master process' in cmd:
master = pid
else:
workers.append(pid)
return master, workers
def find_packages(self, meta):
"""
Find a package with running binary
"""
package, version = None, None
rpm_qf_out, rpm_qf_err = subp.call(
'rpm -qf %s ' % self.object.bin_path + '--queryformat="%{NAME} %{VERSION}-%{RELEASE}.%{ARCH}' + '\\n"',
check=False
)
if rpm_qf_out and rpm_qf_out[0]:
package, version = rpm_qf_out[0].split(' ')
if rpm_qf_err:
if 'is not owned by' in rpm_qf_err[0]:
meta['warnings'].append('self-made binary, is not from any nginx package')
if not package:
return
meta['packages'] = {package: version}
def uname(meta):
"""
Collects uname for the container, without a hostname
:param meta: {} of meta
"""
uname_out, _ = subp.call('uname -s -r -v -m -p')
meta['uname'] = uname_out.pop(0)
def lscpu(meta):
""" lscpu """
lscpu_out, _ = subp.call('lscpu')
for line in lscpu_out:
kv = re.match('([\w\d\s\(\)]+):\s+([\w|\d]+)', line)
if kv:
key, value = kv.group(1), kv.group(2)
if key == 'Architecture':
meta['processor']['architecture'] = value
elif key == 'CPU MHz':
meta['processor']['mhz'] = value
elif key == 'Hypervisor vendor':
meta['processor']['hypervisor'] = value
elif key == 'Virtualization type':
meta['processor']['virtualization'] = value
elif key == 'CPU(s)':
meta['processor']['cpus'] = value
elif 'cache' in key:
key = key.replace(' cache', '')
def open_ssl(self, meta):
"""Old nginx uses standart openssl library - find its version"""
if not meta['ssl']:
openssl_out, _ = subp.call("dpkg -l | grep openssl")
for line in openssl_out:
gwe = re.match('([\d\w]+)\s+([\d\w\.\-]+)\s+([\d\w\.\-\+_~]+)\s', line)
if gwe:
if gwe.group(2).startswith('openssl'):
meta['ssl'] = {
'built': [gwe.group(2), gwe.group(3)],
'run': [gwe.group(2), gwe.group(3)],
}
def is_docker():
"""
Docker wants you to use their external API when trying to gain information/self-awareness of container state:
https://docs.docker.com/engine/reference/api/docker_remote_api/
The problem is that this API is optional and does not have a standard location from within a container (or rather it
can be configured to listen anywhere). Instead, this check will parse the `/proc` filesystem trying to parse the
docker ID from the output. If we find an ID, we will assume that we are in a docker container.
:return: Bool True if docker ID is found, False otherwise.
"""
try:
stdout, _ = subp.call('cat /proc/self/cgroup | fgrep -e docker | head -n 1 | sed "s/.*docker\/\(.*\)/\\1/"')
docker_id = stdout[0]
return len(docker_id) == 64 and ' ' not in docker_id
except Exception as e:
context.log.error('failed to find docker id due to %s' % e.__class__.__name__)
context.log.debug('additional info:', exc_info=True)
return False
def find_packages(self, meta):
"""
Find a package with running binary
"""
package_name = None
# find which package contains our binary
dpkg_s_nginx_out, dpkg_s_nginx_err = subp.call('dpkg -S %s' % self.object.bin_path, check=False)
for line in dpkg_s_nginx_out:
kv = re.match(self.dpkg_s_re, line)
if kv:
package_name = kv.group(1)
break
if dpkg_s_nginx_err:
if 'no_path' in dpkg_s_nginx_err[0]:
meta['warnings'].append('self-made binary, is not from any nginx package')
if not package_name:
return
# get version
all_installed_packages = self.installed_nginx_packages()