Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# both read/exec permissions must be available
# if read permissions are removed for first hit, second hit is found instead
ft.adjust_permissions(bar, stat.S_IRUSR, add=False)
self.assertTrue(os.path.samefile(ft.which('bar'), barbis))
# likewise for read permissions
ft.adjust_permissions(bar, stat.S_IRUSR, add=True)
self.assertTrue(os.path.samefile(ft.which('bar'), bar))
ft.adjust_permissions(bar, stat.S_IXUSR, add=False)
self.assertTrue(os.path.samefile(ft.which('bar'), barbis))
# if read permission on other 'bar' are also removed, nothing is found anymore
ft.adjust_permissions(barbis, stat.S_IRUSR, add=False)
self.assertEqual(ft.which('bar'), None)
# checking of read/exec permissions can be disabled via 'check_perms'
self.assertTrue(os.path.samefile(ft.which('bar', check_perms=False), bar))
# put executable file 'bar' in place
bar = os.path.join(self.test_prefix, 'bar')
ft.write_file(bar, '#!/bin/bash')
ft.adjust_permissions(bar, stat.S_IRUSR|stat.S_IXUSR)
self.assertEqual(ft.which('foo'), None)
self.assertTrue(os.path.samefile(ft.which('bar'), bar))
# add another location to 'bar', which should only return the first location by default
barbis = os.path.join(self.test_prefix, 'more', 'bar')
ft.write_file(barbis, '#!/bin/bash')
ft.adjust_permissions(barbis, stat.S_IRUSR|stat.S_IXUSR)
os.environ['PATH'] = '%s:%s' % (os.environ['PATH'], os.path.dirname(barbis))
self.assertTrue(os.path.samefile(ft.which('bar'), bar))
# test getting *all* locations to specified command
res = ft.which('bar', retain_all=True)
self.assertEqual(len(res), 2)
self.assertTrue(os.path.samefile(res[0], bar))
self.assertTrue(os.path.samefile(res[1], barbis))
python_version = '; '.join(sys.version.split('\n'))
return {
'core_count': get_avail_core_count(),
'total_memory': get_total_memory(),
'cpu_model': get_cpu_model(),
'cpu_speed': get_cpu_speed(),
'cpu_vendor': get_cpu_vendor(),
'gcc_version': get_tool_version('gcc', version_option='-v'),
'hostname': gethostname(),
'glibc_version': get_glibc_version(),
'os_name': get_os_name(),
'os_type': get_os_type(),
'os_version': get_os_version(),
'platform_name': get_platform_name(),
'python_version': python_version,
'system_python_path': which('python'),
'system_gcc_path': which('gcc'),
}
'debian': DPKG,
'redhat': RPM,
'ubuntu': DPKG,
}
pkg_cmd_flag = {
DPKG: '-s',
RPM: '-q',
}
os_name = get_os_name()
if os_name in os_to_pkg_cmd_map:
pkg_cmds = [os_to_pkg_cmd_map[os_name]]
else:
pkg_cmds = [RPM, DPKG]
for pkg_cmd in pkg_cmds:
if which(pkg_cmd):
cmd = ' '.join([pkg_cmd, pkg_cmd_flag.get(pkg_cmd), dep])
found = run_cmd(cmd, simple=True, log_all=False, log_ok=False,
force_in_dry_run=True, trace=False, stream_output=False)
if found:
break
if not found:
# fallback for when os-dependency is a binary/library
found = which(dep)
# try locate if it's available
if not found and which('locate'):
cmd = 'locate --regexp "/%s$"' % dep
found = run_cmd(cmd, simple=True, log_all=False, log_ok=False, force_in_dry_run=True, trace=False,
stream_output=False)
"""
symlink_dir = tempfile.mkdtemp()
# prepend location to symlinks to $PATH
setvar('PATH', '%s:%s' % (symlink_dir, os.getenv('PATH')))
for (path, cmds) in paths.values():
for cmd in cmds:
cmd_s = os.path.join(symlink_dir, cmd)
if not os.path.exists(cmd_s):
try:
os.symlink(path, cmd_s)
except OSError as err:
raise EasyBuildError("Failed to symlink %s to %s: %s", path, cmd_s, err)
cmd_path = which(cmd)
self.log.debug("which(%s): %s -> %s", cmd, cmd_path, os.path.realpath(cmd_path))
self.log.info("Commands symlinked to %s via %s: %s", path, symlink_dir, ', '.join(cmds))
else:
rpath_wrapper_log = '/dev/null'
# complete template script and put it in place
cmd_wrapper_txt = read_file(rpath_wrapper_template) % {
'orig_cmd': orig_cmd,
'python': sys.executable,
'rpath_args_py': rpath_args_py,
'rpath_filter': rpath_filter,
'rpath_include': rpath_include,
'rpath_wrapper_log': rpath_wrapper_log,
'wrapper_dir': wrapper_dir,
}
write_file(cmd_wrapper, cmd_wrapper_txt)
adjust_permissions(cmd_wrapper, stat.S_IXUSR)
self.log.info("Wrapper script for %s: %s (log: %s)", orig_cmd, which(cmd), rpath_wrapper_log)
# prepend location to this wrapper to $PATH
setvar('PATH', '%s:%s' % (wrapper_dir, os.getenv('PATH')))
else:
self.log.debug("Not installing RPATH wrapper for non-existing command '%s'", cmd)
pkg_cmds = [RPM, DPKG]
for pkg_cmd in pkg_cmds:
if which(pkg_cmd):
cmd = ' '.join([pkg_cmd, pkg_cmd_flag.get(pkg_cmd), dep])
found = run_cmd(cmd, simple=True, log_all=False, log_ok=False,
force_in_dry_run=True, trace=False, stream_output=False)
if found:
break
if not found:
# fallback for when os-dependency is a binary/library
found = which(dep)
# try locate if it's available
if not found and which('locate'):
cmd = 'locate --regexp "/%s$"' % dep
found = run_cmd(cmd, simple=True, log_all=False, log_ok=False, force_in_dry_run=True, trace=False,
stream_output=False)
return found
def write_wrapper(self, wrapper_dir, compiler, i_mpi_root):
"""Helper function to write a compiler wrapper."""
wrapper_txt = INTEL_COMPILER_WRAPPER % {
'compiler_path': which(compiler),
'intel_mpi_root': i_mpi_root,
'cpath': os.getenv('CPATH'),
'intel_license_file': os.getenv('INTEL_LICENSE_FILE', os.getenv('LM_LICENSE_FILE')),
'wrapper_dir': wrapper_dir,
}
wrapper = os.path.join(wrapper_dir, compiler)
write_file(wrapper, wrapper_txt)
if self.dry_run:
self.dry_run_msg("Wrapper for '%s' was put in place: %s", compiler, wrapper)
else:
adjust_permissions(wrapper, stat.S_IXUSR)
self.log.info("Using wrapper script for '%s': %s", compiler, which(compiler))
img_path = os.path.join(cont_path, img_name)
cmd_opts = '--sandbox'
else:
raise EasyBuildError("Unknown container image format specified for Singularity: %s" % image_format)
if os.path.exists(img_path):
if build_option('force'):
print_msg("WARNING: overwriting existing container image at %s due to --force" % img_path)
remove_file(img_path)
else:
raise EasyBuildError("Container image already exists at %s, not overwriting it without --force",
img_path)
# resolve full path to 'singularity' binary, since it may not be available via $PATH under sudo...
singularity = which('singularity')
cmd_env = ''
singularity_tmpdir = self.tmpdir
if singularity_tmpdir:
cmd_env += 'SINGULARITY_TMPDIR=%s' % singularity_tmpdir
cmd = ' '.join(['sudo', cmd_env, singularity, 'build', cmd_opts, img_path, recipe_path])
print_msg("Running '%s', you may need to enter your 'sudo' password..." % cmd)
run_cmd(cmd, stream_output=True)
print_msg("Singularity image created at %s" % img_path, log=self.log)