Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tasks = session.listTasks(opts=opts)
else:
logger.info('Task %s is not a build or buildArch task!', task_id)
continue
for task in tasks:
base_path = koji.pathinfo.taskrelpath(task['id'])
output = session.listTaskOutput(task['id'])
for filename in output:
local_path = os.path.join(destination, filename)
download = False
fn, ext = os.path.splitext(filename)
if ext == '.rpm':
if task['state'] != koji.TASK_STATES['CLOSED']:
continue
if local_path not in rpms:
nevra = RpmHelper.split_nevra(fn)
# FIXME: multiple arches
download = nevra['arch'] in ['noarch', 'x86_64']
if download:
rpms.append(local_path)
else:
if local_path not in logs:
download = True
logs.append(local_path)
if download:
logger.info('Downloading file %s', filename)
url = '/'.join([session.opts['topurl'], 'work', base_path, filename])
DownloadHelper.download_file(url, local_path)
return rpms, logs
def _get_rpm_info(cls, name, packages):
if packages is None:
return None
basic_package = sorted(packages)[0]
return getattr(RpmHelper.get_header_from_rpm(basic_package), name)
def run_check(cls, results_dir, **kwargs):
cls.results_dir = os.path.join(results_dir, 'rpminspect-rpm')
cls.prepare_results_dir()
result = {'path': cls.get_checker_output_dir_short(), 'files': [], 'checks': {}}
old_pkgs = results_store.get_old_build()['rpm']
new_pkgs = results_store.get_new_build()['rpm']
for old_pkg in old_pkgs:
if 'debuginfo' in old_pkg:
continue
name = RpmHelper.split_nevra(os.path.basename(old_pkg))['name']
found = [x for x in new_pkgs if RpmHelper.split_nevra(os.path.basename(x))['name'] == name]
if not found:
logger.warning('New version of package %s was not found!', name)
continue
new_pkg = found[0]
outfile, pkg_data = cls.run_rpminspect(cls.results_dir, old_pkg, new_pkg)
result['files'].append(os.path.basename(outfile))
result['checks'][name] = pkg_data
return result
def run_check(cls, results_dir, **kwargs):
cls.results_dir = os.path.join(results_dir, 'rpminspect-srpm')
cls.prepare_results_dir()
result = {'path': cls.get_checker_output_dir_short(), 'files': [], 'checks': {}}
old_pkg = results_store.get_old_build()['srpm']
new_pkg = results_store.get_new_build()['srpm']
name = RpmHelper.split_nevra(os.path.basename(old_pkg))['name']
outfile, pkg_data = cls.run_rpminspect(cls.results_dir, old_pkg, new_pkg)
result['files'].append(os.path.basename(outfile))
result['checks'][name] = pkg_data
return result
def _get_rpms(cls, rpm_list):
rpm_dict = {}
for rpm_name in rpm_list:
rpm_dict[RpmHelper.get_header_from_rpm(rpm_name).name] = rpm_name
return rpm_dict
"""Compares old and new RPMs using abipkgdiff"""
# Check if ABI changes occured
cls.results_dir = os.path.join(results_dir, cls.name)
cls.prepare_results_dir()
debug_old, rest_pkgs_old = cls._get_packages_for_abipkgdiff(results_store.get_build('old'))
debug_new, rest_pkgs_new = cls._get_packages_for_abipkgdiff(results_store.get_build('new'))
cmd = [cls.CMD]
ret_codes = {}
for pkg in rest_pkgs_old:
command = list(cmd)
debug = cls._find_debuginfo(debug_old, pkg)
if debug:
command.append('--d1')
command.append(debug)
old_name = RpmHelper.split_nevra(os.path.basename(pkg))['name']
find = [x for x in rest_pkgs_new if RpmHelper.split_nevra(os.path.basename(x))['name'] == old_name]
if not find:
logger.warning('New version of package %s was not found!', old_name)
continue
new_pkg = find[0]
debug = cls._find_debuginfo(debug_new, new_pkg)
if debug:
command.append('--d2')
command.append(debug)
command.append(pkg)
command.append(new_pkg)
logger.verbose('Package name for ABI comparison %s', old_name)
output = os.path.join(cls.results_dir, old_name + '.txt')
try:
ret_code = ProcessHelper.run_subprocess(command, output_file=output)
except OSError:
raise CheckerNotFoundError("Checker '{}' was not found or installed.".format(cls.name))
def run_check(cls, results_dir, **kwargs):
cls.results_dir = os.path.join(results_dir, 'rpminspect-rpm')
cls.prepare_results_dir()
result = {'path': cls.get_checker_output_dir_short(), 'files': [], 'checks': {}}
old_pkgs = results_store.get_old_build()['rpm']
new_pkgs = results_store.get_new_build()['rpm']
for old_pkg in old_pkgs:
if 'debuginfo' in old_pkg:
continue
name = RpmHelper.split_nevra(os.path.basename(old_pkg))['name']
found = [x for x in new_pkgs if RpmHelper.split_nevra(os.path.basename(x))['name'] == name]
if not found:
logger.warning('New version of package %s was not found!', name)
continue
new_pkg = found[0]
outfile, pkg_data = cls.run_rpminspect(cls.results_dir, old_pkg, new_pkg)
result['files'].append(os.path.basename(outfile))
result['checks'][name] = pkg_data
return result
def all_packages_installed(pkg_names=None):
"""Checks if all specified packages are installed.
Args:
pkg_names (list): List of package names to check.
Returns:
bool: True if all packages are installed, False otherwise.
"""
for pkg in pkg_names:
if not RpmHelper.is_package_installed(pkg):
return False
return True