Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
INSERT INTO "opened_files" VALUES(1,0,'/usr',12345678902001,4,1,1);
INSERT INTO "opened_files" VALUES(2,0,'/lib/ld.so',12345678902003,1,0,2);
INSERT INTO "opened_files" VALUES(3,1,'/usr/bin',12345678902004,4,1,3);
INSERT INTO "executed_files" VALUES(1,'/usr/bin/id',1,12345678902006,4,'id',
'RUN=third','/home/vagrant');
''',
schema + '''
INSERT INTO "processes" VALUES(0,0,NULL,12345678903001,0,1);
INSERT INTO "opened_files" VALUES(0,0,'/home',12345678903001,4,1,0);
INSERT INTO "executed_files" VALUES(1,'/bin/false',0,12345678903002,0,'false',
'RUN=fourth','/home');
''']
for i, dat in enumerate(sql_data):
trace = self.tmpdir / ('trace%d.sqlite3' % i)
if PY3:
conn = sqlite3.connect(str(trace))
else:
conn = sqlite3.connect(trace.path)
conn.row_factory = sqlite3.Row
conn.executescript(dat + 'COMMIT;')
conn.commit()
conn.close()
traces.append(trace)
target = self.tmpdir / 'target'
traceutils.combine_traces(traces, target)
target = target / 'trace.sqlite3'
if PY3:
conn = sqlite3.connect(str(target))
def print_db(database):
"""Prints out database content.
"""
assert database.is_file()
if PY3:
# On PY3, connect() only accepts unicode
conn = sqlite3.connect(str(database))
else:
conn = sqlite3.connect(database.path)
conn.row_factory = sqlite3.Row
conn.text_factory = lambda x: unicode_(x, 'utf-8', 'replace')
cur = conn.cursor()
rows = cur.execute(
'''
SELECT id, parent, timestamp, exitcode
FROM processes;
''')
print("\nProcesses:")
header = "+------+--------+-------+------------------+"
print(header)
def combine_traces(traces, target):
"""Combines multiple trace databases into one.
The runs from the original traces are appended ('run_id' field gets
translated to avoid conflicts).
:param traces: List of trace database filenames.
:type traces: [Path]
:param target: Directory where to write the new database and associated
configuration file.
:type target: Path
"""
# We are probably overwriting one of the traces we're reading, so write to
# a temporary file first then move it
fd, output = Path.tempfile('.sqlite3', 'reprozip_combined_')
if PY3:
# On PY3, connect() only accepts unicode
conn = sqlite3.connect(str(output))
else:
conn = sqlite3.connect(output.path)
os.close(fd)
conn.row_factory = sqlite3.Row
# Create the schema
create_schema(conn)
# Temporary database with lookup tables
conn.execute(
'''
ATTACH DATABASE '' AS maps;
''')
conn.execute(
def write_configuration(directory, sort_packages, find_inputs_outputs,
overwrite=False):
"""Writes the canonical YAML configuration file.
"""
database = directory / 'trace.sqlite3'
if PY3:
# On PY3, connect() only accepts unicode
conn = sqlite3.connect(str(database))
else:
conn = sqlite3.connect(database.path)
conn.row_factory = sqlite3.Row
# Reads info from database
files, inputs, outputs = get_files(conn)
# Identifies which file comes from which package
if sort_packages:
files, packages = identify_packages(files)
else:
packages = []
# Writes configuration file
def write_configuration(directory, sort_packages, find_inputs_outputs,
overwrite=False):
"""Writes the canonical YAML configuration file.
"""
database = directory / 'trace.sqlite3'
assert database.is_file()
if PY3:
# On PY3, connect() only accepts unicode
conn = sqlite3.connect(str(database))
else:
conn = sqlite3.connect(database.path)
conn.row_factory = sqlite3.Row
# Reads info from database
files, inputs, outputs = get_files(conn)
# Identifies which file comes from which package
if sort_packages:
files, packages = identify_packages(files)
else:
packages = []
# Writes configuration file