Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multiple_runs(self):
def fail(s):
assert False, "Shouldn't be called?"
old = Path.is_file, Path.stat
Path.is_file = lambda s: True
Path.stat = fail
try:
files, inputs, outputs = self.do_test([
('proc', 0, None, False),
('open', 0, "/some/dir", True, FILE_WDIR),
('exec', 0, "/some/dir/ls", "/some/dir", b'ls\0/some/cli\0'),
('open', 0, "/some/cli", False, FILE_WRITE),
('open', 0, "/some/r", False, FILE_READ),
('open', 0, "/some/rw", False, FILE_READ),
('proc', 1, None, False),
('open', 1, "/some/dir", True, FILE_WDIR),
('exec', 1, "/some/dir/ls", "/some/dir", b'ls\0'),
('open', 1, "/some/cli", False, FILE_READ),
('proc', 2, 1, True),
('open', 2, "/some/r", False, FILE_READ),
('open', 1, "/some/rw", False, FILE_WRITE),
])
expected = set([
'/some',
'/some/dir',
'/some/dir/ls',
'/some/r',
'/some/rw',
])
else r_name, False):
if filename not in files:
f = TracedFile(filename)
f.read(run)
files[f.path] = f
# Go to final target
if not r_mode & FILE_LINK:
r_name = r_name.resolve()
if event_type == 'exec':
executed.add(r_name)
if r_name not in files:
f = TracedFile(r_name)
files[f.path] = f
else:
f = files[r_name]
if r_mode & FILE_READ:
f.read(run)
if r_mode & FILE_WRITE:
f.write(run)
# Mark the parent directory as read
if r_name.parent not in files:
fp = TracedFile(r_name.parent)
fp.read(run)
files[fp.path] = fp
# Identifies input files
if r_name.is_file() and r_name not in executed:
access_files[-1].add(f)
cur.close()
# Further filters input files
inputs = [[fi.path
# Loops on executed files, and opened files, at the same time
cur = conn.cursor()
rows = cur.execute(
'''
SELECT 'exec' AS event_type, name, NULL AS mode, timestamp
FROM executed_files
UNION ALL
SELECT 'open' AS event_type, name, mode, timestamp
FROM opened_files
ORDER BY timestamp;
''')
executed = set()
run = 0
for event_type, r_name, r_mode, r_timestamp in rows:
if event_type == 'exec':
r_mode = FILE_READ
r_name = Path(normalize_path(r_name))
# Stays on the current run
while run_timestamps and r_timestamp > run_timestamps[0]:
del run_timestamps[0]
access_files.append(set())
run += 1
# Adds symbolic links as read files
for filename in find_all_links(r_name.parent if r_mode & FILE_LINK
else r_name, False):
if filename not in files:
f = TracedFile(filename)
f.read(run)
files[f.path] = f
# Go to final target
# Loops on executed files, and opened files, at the same time
cur = conn.cursor()
rows = cur.execute(
'''
SELECT 'exec' AS event_type, name, NULL AS mode, timestamp
FROM executed_files
UNION ALL
SELECT 'open' AS event_type, name, mode, timestamp
FROM opened_files
ORDER BY timestamp;
''')
executed = set()
run = 0
for event_type, r_name, r_mode, r_timestamp in rows:
if event_type == 'exec':
r_mode = FILE_READ
r_name = Path(normalize_path(r_name))
# Stays on the current run
while run_timestamps and r_timestamp > run_timestamps[0]:
del run_timestamps[0]
access_files.append(set())
run += 1
# Adds symbolic links as read files
for filename in find_all_links(r_name.parent if r_mode & FILE_LINK
else r_name, False):
if filename not in files:
f = TracedFile(filename)
f.read(run)
files[f.path] = f
# Go to final target
else r_name, False):
if filename not in files:
f = TracedFile(filename)
f.read(run)
files[f.path] = f
# Go to final target
if not r_mode & FILE_LINK:
r_name = r_name.resolve()
if event_type == 'exec':
executed.add(r_name)
if r_name not in files:
f = TracedFile(r_name)
files[f.path] = f
else:
f = files[r_name]
if r_mode & FILE_READ:
f.read(run)
if r_mode & FILE_WRITE:
f.write(run)
# Mark the parent directory as read
if r_name.parent not in files:
fp = TracedFile(r_name.parent)
fp.read(run)
files[fp.path] = fp
# Identifies input files
if r_name.is_file() and r_name not in executed:
access_files[-1].add(f)
cur.close()
# Further filters input files
inputs = [[fi.path