Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multiple_runs(self):
def fail(s):
assert False, "Shouldn't be called?"
old = Path.is_file, Path.stat
Path.is_file = lambda s: True
Path.stat = fail
try:
files, inputs, outputs = self.do_test([
('proc', 0, None, False),
('open', 0, "/some/dir", True, FILE_WDIR),
('exec', 0, "/some/dir/ls", "/some/dir", b'ls\0/some/cli\0'),
('open', 0, "/some/cli", False, FILE_WRITE),
('open', 0, "/some/r", False, FILE_READ),
('open', 0, "/some/rw", False, FILE_READ),
('proc', 1, None, False),
('open', 1, "/some/dir", True, FILE_WDIR),
('exec', 1, "/some/dir/ls", "/some/dir", b'ls\0'),
('open', 1, "/some/cli", False, FILE_READ),
('proc', 2, 1, True),
('open', 2, "/some/r", False, FILE_READ),
('open', 1, "/some/rw", False, FILE_WRITE),
])
expected = set([
'/some',
'/some/dir',
'/some/dir/ls',
'/some/r',
'/some/rw',
])
INSERT INTO "opened_files" VALUES(1,0,'/usr',12345678902001,4,1,1);
INSERT INTO "opened_files" VALUES(2,0,'/lib/ld.so',12345678902003,1,0,2);
INSERT INTO "opened_files" VALUES(3,1,'/usr/bin',12345678902004,4,1,3);
INSERT INTO "executed_files" VALUES(1,'/usr/bin/id',1,12345678902006,4,'id',
'RUN=third','/home/vagrant');
''',
schema + '''
INSERT INTO "processes" VALUES(0,0,NULL,12345678903001,0,1);
INSERT INTO "opened_files" VALUES(0,0,'/home',12345678903001,4,1,0);
INSERT INTO "executed_files" VALUES(1,'/bin/false',0,12345678903002,0,'false',
'RUN=fourth','/home');
''']
for i, dat in enumerate(sql_data):
trace = self.tmpdir / ('trace%d.sqlite3' % i)
if PY3:
conn = sqlite3.connect(str(trace))
else:
conn = sqlite3.connect(trace.path)
conn.row_factory = sqlite3.Row
conn.executescript(dat + 'COMMIT;')
conn.commit()
conn.close()
traces.append(trace)
target = self.tmpdir / 'target'
traceutils.combine_traces(traces, target)
target = target / 'trace.sqlite3'
if PY3:
conn = sqlite3.connect(str(target))
for i, dat in enumerate(sql_data):
trace = self.tmpdir / ('trace%d.sqlite3' % i)
if PY3:
conn = sqlite3.connect(str(trace))
else:
conn = sqlite3.connect(trace.path)
conn.row_factory = sqlite3.Row
conn.executescript(dat + 'COMMIT;')
conn.commit()
conn.close()
traces.append(trace)
target = self.tmpdir / 'target'
traceutils.combine_traces(traces, target)
target = target / 'trace.sqlite3'
if PY3:
conn = sqlite3.connect(str(target))
else:
conn = sqlite3.connect(target.path)
conn.row_factory = None
processes = list(conn.execute(
'''
SELECT * FROM processes;
'''))
opened_files = list(conn.execute(
'''
SELECT * FROM opened_files;
'''))
executed_files = list(conn.execute(
def make_paths(cls, obj):
if isinstance(obj, set):
return set(cls.make_paths(e) for e in obj)
elif isinstance(obj, list):
return [cls.make_paths(e) for e in obj]
elif isinstance(obj, AbstractPath):
return obj
elif isinstance(obj, (bytes, unicode_)):
return Path(obj)
else:
assert False
def test_label_files(self):
"""Tests input/output file labelling."""
wd = Path('/fakeworkingdir')
self.assertEqual(
compile_inputs_outputs(
[{'argv': ['aa', 'bb.txt'], 'workingdir': wd}],
[[wd / 'aa', Path('/other/cc.bin'), wd / 'bb.txt']],
[[]]),
{'arg0': InputOutputFile(wd / 'aa', [0], []),
'cc.bin': InputOutputFile(Path('/other/cc.bin'), [0], []),
'arg1': InputOutputFile(wd / 'bb.txt', [0], [])})
def test_label_files(self):
"""Tests input/output file labelling."""
wd = Path('/fakeworkingdir')
self.assertEqual(
compile_inputs_outputs(
[{'argv': ['aa', 'bb.txt'], 'workingdir': wd}],
[[wd / 'aa', Path('/other/cc.bin'), wd / 'bb.txt']],
[[]]),
{'arg0': InputOutputFile(wd / 'aa', [0], []),
'cc.bin': InputOutputFile(Path('/other/cc.bin'), [0], []),
'arg1': InputOutputFile(wd / 'bb.txt', [0], [])})
def test_uniquenames(self):
"""Tests UniqueNames."""
u = UniqueNames()
self.assertEqual(u('test'), 'test')
self.assertEqual(u('test'), 'test_2')
self.assertEqual(u('test'), 'test_3')
self.assertEqual(u('test_2'), 'test_2_2')
self.assertEqual(u('test_'), 'test_')
self.assertEqual(u('test_'), 'test__2')
(len(runs),))
for r_name, r_argv, r_envp, r_workingdir, r_exitcode in executions:
# Decodes command-line
argv = r_argv.split('\0')
if not argv[-1]:
argv = argv[:-1]
# Decodes environment
envp = r_envp.split('\0')
if not envp[-1]:
envp = envp[:-1]
environ = dict(v.split('=', 1) for v in envp)
runs.append({'id': "run%d" % len(runs),
'binary': r_name, 'argv': argv,
'workingdir': unicode_(Path(r_workingdir)),
'architecture': platform.machine().lower(),
'distribution': distribution,
'hostname': platform.node(),
'system': [platform.system(), platform.release()],
'environ': environ,
'uid': os.getuid(),
'gid': os.getgid(),
'signal' if r_exitcode & 0x0100 else 'exitcode':
r_exitcode & 0xFF})
cur.close()
conn.close()
if find_inputs_outputs:
inputs_outputs = compile_inputs_outputs(runs, inputs, outputs)
else:
''')
for r_name, r_argv, r_envp, r_workingdir, r_exitcode in executions:
# Decodes command-line
argv = r_argv.split('\0')
if not argv[-1]:
argv = argv[:-1]
# Decodes environment
envp = r_envp.split('\0')
if not envp[-1]:
envp = envp[:-1]
environ = dict(v.split('=', 1) for v in envp)
runs.append({'id': "run%d" % len(runs),
'binary': r_name, 'argv': argv,
'workingdir': unicode_(Path(r_workingdir)),
'architecture': platform.machine().lower(),
'distribution': distribution,
'hostname': platform.node(),
'system': [platform.system(), platform.release()],
'environ': environ,
'uid': os.getuid(),
'gid': os.getgid(),
'signal' if r_exitcode & 0x0100 else 'exitcode':
r_exitcode & 0xFF})
cur.close()
conn.close()
if find_inputs_outputs:
inputs_outputs = compile_inputs_outputs(runs, inputs, outputs)
else:
try:
with manifest.open('wb') as fp:
fp.write(b'REPROZIP VERSION 2\n')
tar.add(str(manifest), 'METADATA/version')
finally:
manifest.remove()
# Stores the original trace
trace = directory / 'trace.sqlite3'
if not trace.is_file():
logger.critical("trace.sqlite3 is gone! Aborting")
sys.exit(1)
tar.add(str(trace), 'METADATA/trace.sqlite3')
# Checks that input files are packed
for name, f in iteritems(inputs_outputs):
if f.read_runs and not Path(f.path).exists():
logger.warning("File is designated as input (name %s) but is not "
"to be packed: %s", name, f.path)
# Generates a unique identifier for the pack (for usage reports purposes)
pack_id = str(uuid.uuid4())
# Stores canonical config
fd, can_configfile = Path.tempfile(suffix='.yml', prefix='rpz_config_')
os.close(fd)
try:
save_config(can_configfile, runs, packages, other_files,
reprozip_version,
inputs_outputs, canonical=True,
pack_id=pack_id)