Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rpms = []
logs = []
for chroot in client.build_chroot_proxy.get_list(build_id):
url = chroot.result_url
url = url if url.endswith('/') else url + '/'
d = pyquery.PyQuery(url)
d.make_links_absolute()
for a in d('a[href$=\'.rpm\'], a[href$=\'.log.gz\']'):
fn = os.path.basename(urllib.parse.urlsplit(a.attrib['href']).path)
dest = os.path.join(destination, chroot.name)
os.makedirs(dest, exist_ok=True)
dest = os.path.join(dest, fn)
if fn.endswith('.src.rpm'):
# skip source RPM
continue
DownloadHelper.download_file(a.attrib['href'], dest)
if fn.endswith('.rpm'):
rpms.append(dest)
elif fn.endswith('.log.gz'):
local_path = dest.replace('.log.gz', '.log')
os.rename(dest, local_path)
logs.append(local_path)
return rpms, logs
if r is None:
return None
if not r.ok:
if r.status_code == 403 and r.headers.get('X-RateLimit-Remaining') == '0':
logger.warning("Rate limit exceeded on Github API! Try again later.")
return None
data = r.json()
version = spec_file.header.version
tag_name = None
for release in data:
if version in release.get('name', ''):
tag_name = release.get('tag_name')
break
r = DownloadHelper.request('{}/tags'.format(baseurl))
if r is None:
return None
if not r.ok:
if r.status_code == 403 and r.headers.get('X-RateLimit-Remaining') == '0':
logger.warning("Rate limit exceeded on Github API! Try again later.")
return None
data = r.json()
for tag in data:
name = tag.get('name')
if tag_name:
if name != tag_name:
continue
else:
# no specific tag name, try common tag names
if name not in [version, 'v{}'.format(version)]:
continue
def _download_source(cls, tool, url, package, filename, hashtype, hsh, target=None):
if target is None:
target = os.path.basename(filename)
if os.path.exists(target):
if cls._hash(target, hashtype) == hsh:
# nothing to do
return
else:
os.unlink(target)
if tool == 'fedpkg':
url = '{0}/{1}/{2}/{3}/{4}/{2}'.format(url, package, filename, hashtype, hsh)
else:
url = '{0}/{1}/{2}/{3}/{2}'.format(url, package, filename, hsh)
try:
DownloadHelper.download_file(url, target)
except DownloadError as e:
raise LookasideCacheError(str(e))
def progress(uploaded, total, chunksize, t1, t2): # pylint: disable=unused-argument
DownloadHelper.progress(total, uploaded, upload_start)
suffix = ''.join(random.choice(string.ascii_letters) for _ in range(8))
def __iter__(self):
if self.uploaded:
# ensure the progressbar is shown only once (HTTPSPNEGOAuth causes second request)
yield self.data
else:
totalsize = len(self.data)
for offset in range(0, totalsize, self.chunksize):
transferred = min(offset + self.chunksize, totalsize)
if not self.check_only:
DownloadHelper.progress(totalsize, transferred, self.start)
yield self.data[offset:transferred]
self.uploaded = True
def _get_initial_patches(self) -> Dict[str, List[PatchObject]]:
"""Returns a dict of patches from a spec file"""
patches_applied = []
patches_not_used = []
patches_list = [p for p in self.spc.sources if p[2] == 2]
strip_options = self._get_patch_strip_options(patches_list)
for patch, num, _ in patches_list:
is_url = bool(urllib.parse.urlparse(patch).scheme)
filename = os.path.basename(patch) if is_url else patch
patch_path = os.path.join(self.sources_location, filename)
if not os.path.exists(patch_path):
if is_url:
logger.info('Patch%s is remote, trying to download the patch', num)
try:
DownloadHelper.download_file(patch, filename)
except DownloadError:
logger.error('Could not download remote patch %s', patch)
continue
else:
logger.error('Patch %s does not exist', filename)
continue
patch_num = num
if patch_num in strip_options:
patches_applied.append(PatchObject(patch_path, patch_num, strip_options[patch_num]))
else:
patches_not_used.append(PatchObject(patch_path, patch_num, None))
patches_applied = sorted(patches_applied, key=lambda x: x.index)
return {"applied": patches_applied, "not_applied": patches_not_used}
def _get_version(cls, package_name):
# sets package name to format used on metacpan
if package_name.startswith('perl-'):
package_name = package_name.replace('perl-', '', 1)
package_name = package_name.replace('-', '::')
r = DownloadHelper.request('{}/download_url/{}'.format(cls.API_URL, package_name))
if r is None or not r.ok:
return None
data = r.json()
if data.get('status') == 'latest':
return data.get('version')
return None
def _get_version(cls, package_name):
# special-case "ruby", as https://rubygems.org/api/v1/gems/ruby.json returns nonsense
if package_name == 'ruby':
return None
r = DownloadHelper.request('{}/{}.json'.format(cls.API_URL, package_name))
if r is None or not r.ok:
# try to strip rubygem prefix
package_name = re.sub(r'^rubygem-', '', package_name)
r = DownloadHelper.request('{}/{}.json'.format(cls.API_URL, package_name))
if r is None or not r.ok:
return None
data = r.json()
return data.get('version')