Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
with manifest.open('wb') as fp:
fp.write(b'REPROZIP VERSION 2\n')
tar.add(str(manifest), 'METADATA/version')
finally:
manifest.remove()
# Stores the original trace
trace = directory / 'trace.sqlite3'
if not trace.is_file():
logger.critical("trace.sqlite3 is gone! Aborting")
sys.exit(1)
tar.add(str(trace), 'METADATA/trace.sqlite3')
# Checks that input files are packed
for name, f in iteritems(inputs_outputs):
if f.read_runs and not Path(f.path).exists():
logger.warning("File is designated as input (name %s) but is not "
"to be packed: %s", name, f.path)
# Generates a unique identifier for the pack (for usage reports purposes)
pack_id = str(uuid.uuid4())
# Stores canonical config
fd, can_configfile = Path.tempfile(suffix='.yml', prefix='rpz_config_')
os.close(fd)
try:
save_config(can_configfile, runs, packages, other_files,
reprozip_version,
inputs_outputs, canonical=True,
pack_id=pack_id)
if ' ' not in pkgname:
# If we had assigned it to a package already, undo
if path in found:
found[path] = None
# Else assign to the package
else:
found[path] = pkgname
# Remaining files are not from packages
self.unknown_files.update(
f for f in files
if f.path in requested and found.get(f.path) is None)
nb_pkg_files = 0
for path, pkgname in iteritems(found):
if pkgname is None:
continue
if pkgname in self.packages:
package = self.packages[pkgname]
else:
package = self._create_package(pkgname)
self.packages[pkgname] = package
package.add_file(requested.pop(path))
nb_pkg_files += 1
logger.info("%d packages with %d files, and %d other files",
len(self.packages),
nb_pkg_files,
len(self.unknown_files))
if pkgname in self.packages:
pkgs.append(self.packages[pkgname])
else:
pkg = self._create_package(pkgname)
if pkg is not None:
self.packages[pkgname] = pkg
pkgs.append(self.packages[pkgname])
if len(pkgs) == 1:
pkgs[0].add_file(f)
nb_pkg_files += 1
else:
self.unknown_files.add(f)
# Filter out packages with no files
self.packages = {pkgname: pkg
for pkgname, pkg in iteritems(self.packages)
if pkg.files}
logger.info("%d packages with %d files, and %d other files",
len(self.packages),
nb_pkg_files,
len(self.unknown_files))
parts = []
# Run number, if there are more than one runs
if len(runs) > 1:
parts.append(run_nb)
# Argument number, if there are more than one file arguments
if nb_file_args[run_nb] > 1:
parts.append(arg_nb)
file_names[fi] = make_unique(
'arg%s' % '_'.join('%s' % s for s in parts))
else:
file_names[fi] = make_unique('arg_%s' % fi.unicodename)
else:
file_names[fi] = make_unique(fi.unicodename)
return dict((n, InputOutputFile(p, readers.get(p, []), writers.get(p, [])))
for p, n in iteritems(file_names))
def python(files, input_files, **kwargs):
add = []
for path, fi in iteritems(files):
if path.ext == b'.pyc':
pyfile = path.parent / path.stem + '.py'
if pyfile.is_file():
if pyfile not in files:
logger.info("Adding %s", pyfile)
add.append(TracedFile(pyfile))
for fi in add:
files[fi.path] = fi
for i in irange(len(input_files)):
lst = []
for path in input_files[i]:
if path.ext in (b'.py', b'.pyc'):
logger.info("Removing input %s", path)
else:
parts = []
# Run number, if there are more than one runs
if len(runs) > 1:
parts.append(run_nb)
# Argument number, if there are more than one file arguments
if nb_file_args[run_nb] > 1:
parts.append(arg_nb)
file_names[fi] = make_unique(
'arg%s' % '_'.join('%s' % s for s in parts))
else:
file_names[fi] = make_unique('arg_%s' % fi.unicodename)
else:
file_names[fi] = make_unique(fi.unicodename)
return dict((n, InputOutputFile(p, readers.get(p, []), writers.get(p, [])))
for p, n in iteritems(file_names))