Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_paths(cls, obj):
if isinstance(obj, set):
return set(cls.make_paths(e) for e in obj)
elif isinstance(obj, list):
return [cls.make_paths(e) for e in obj]
elif isinstance(obj, AbstractPath):
return obj
elif isinstance(obj, (bytes, unicode_)):
return Path(obj)
else:
assert False
(len(runs),))
for r_name, r_argv, r_envp, r_workingdir, r_exitcode in executions:
# Decodes command-line
argv = r_argv.split('\0')
if not argv[-1]:
argv = argv[:-1]
# Decodes environment
envp = r_envp.split('\0')
if not envp[-1]:
envp = envp[:-1]
environ = dict(v.split('=', 1) for v in envp)
runs.append({'id': "run%d" % len(runs),
'binary': r_name, 'argv': argv,
'workingdir': unicode_(Path(r_workingdir)),
'architecture': platform.machine().lower(),
'distribution': distribution,
'hostname': platform.node(),
'system': [platform.system(), platform.release()],
'environ': environ,
'uid': os.getuid(),
'gid': os.getgid(),
'signal' if r_exitcode & 0x0100 else 'exitcode':
r_exitcode & 0xFF})
cur.close()
conn.close()
if find_inputs_outputs:
inputs_outputs = compile_inputs_outputs(runs, inputs, outputs)
else:
''')
for r_name, r_argv, r_envp, r_workingdir, r_exitcode in executions:
# Decodes command-line
argv = r_argv.split('\0')
if not argv[-1]:
argv = argv[:-1]
# Decodes environment
envp = r_envp.split('\0')
if not envp[-1]:
envp = envp[:-1]
environ = dict(v.split('=', 1) for v in envp)
runs.append({'id': "run%d" % len(runs),
'binary': r_name, 'argv': argv,
'workingdir': unicode_(Path(r_workingdir)),
'architecture': platform.machine().lower(),
'distribution': distribution,
'hostname': platform.node(),
'system': [platform.system(), platform.release()],
'environ': environ,
'uid': os.getuid(),
'gid': os.getgid(),
'signal' if r_exitcode & 0x0100 else 'exitcode':
r_exitcode & 0xFF})
cur.close()
conn.close()
if find_inputs_outputs:
inputs_outputs = compile_inputs_outputs(runs, inputs, outputs)
else:
conn.text_factory = lambda x: unicode_(x, 'utf-8', 'replace')
inputs = [[path for path in lst if path in files]
for lst in inputs]
# Displays a warning for READ_THEN_WRITTEN files
read_then_written_files = [
fi
for fi in itervalues(files)
if fi.what == TracedFile.READ_THEN_WRITTEN and
not any(fi.path.lies_under(m) for m in magic_dirs)]
if read_then_written_files:
logger.warning(
"Some files were read and then written. We will only pack the "
"final version of the file; reproducible experiments shouldn't "
"change their input files")
logger.info("Paths:\n%s",
", ".join(unicode_(fi.path)
for fi in read_then_written_files))
files = set(
fi
for fi in itervalues(files)
if fi.what != TracedFile.WRITTEN and not any(fi.path.lies_under(m)
for m in magic_dirs))
return files, inputs, outputs
inputs = [[path for path in lst if path in files]
for lst in inputs]
# Displays a warning for READ_THEN_WRITTEN files
read_then_written_files = [
fi
for fi in itervalues(files)
if fi.what == TracedFile.READ_THEN_WRITTEN and
not any(fi.path.lies_under(m) for m in magic_dirs)]
if read_then_written_files:
logging.warning(
"Some files were read and then written. We will only pack the "
"final version of the file; reproducible experiments shouldn't "
"change their input files")
logging.info("Paths:\n%s",
", ".join(unicode_(fi.path)
for fi in read_then_written_files))
files = set(
fi
for fi in itervalues(files)
if fi.what != TracedFile.WRITTEN and not any(fi.path.lies_under(m)
for m in magic_dirs))
return files, inputs, outputs