Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def remove_one(self, pkg: Pkg) -> E.PkgRemoved:
await trash(
[self.config.addon_dir / f.name for f in pkg.folders],
parent=self.config.temp_dir,
missing_ok=True,
)
self.db_session.delete(pkg)
self.db_session.commit()
return E.PkgRemoved(pkg)
)
if conflicts:
raise E.PkgConflictsWithInstalled(conflicts)
if replace:
await trash([self.config.addon_dir / f for f in folders], self.config.temp_dir)
await extract(self.config.addon_dir)
pkg.folders = [PkgFolder(name=f) for f in folders]
self.db_session.add(pkg)
self.db_session.merge(
PkgVersionLog(version=pkg.version, pkg_source=pkg.source, pkg_id=pkg.id)
)
self.db_session.commit()
return E.PkgInstalled(pkg)
async def update_one(self, old_pkg: Pkg, new_pkg: Pkg, archive: _ArchiveACM) -> E.PkgUpdated:
async with archive as (folders, extract):
conflicts = (
self.db_session.query(Pkg)
.join(Pkg.folders)
.filter(PkgFolder.pkg_source != new_pkg.source, PkgFolder.pkg_id != new_pkg.id)
.filter(PkgFolder.name.in_(folders))
.all()
)
if conflicts:
raise E.PkgConflictsWithInstalled(conflicts)
await trash(
[self.config.addon_dir / f.name for f in old_pkg.folders],
parent=self.config.temp_dir,
missing_ok=True,
)
await extract(self.config.addon_dir)
new_pkg.folders = [PkgFolder(name=f) for f in folders]
self.db_session.delete(old_pkg)
self.db_session.add(new_pkg)
self.db_session.merge(
PkgVersionLog(version=new_pkg.version, pkg_source=new_pkg.source, pkg_id=new_pkg.id)
)
self.db_session.commit()
async def resolve_one(self, defn: Defn, metadata: O[JsonDict]) -> m.Pkg:
if not metadata:
raise E.PkgNonexistent
# 'UIPending' is set to '1' for files awaiting approval during which
# time 'UIMD5' is null - all other fields appear to be filled correctly
return m.Pkg(
source=self.source,
id=metadata['UID'],
slug=slugify(f'{metadata["UID"]} {metadata["UIName"]}'),
name=metadata['UIName'],
description=metadata['UIDescription'],
url=metadata['UIFileInfoURL'],
download_url=metadata['UIDownload'],
date_published=metadata['UIDate'],
version=metadata['UIVersion'],
options=m.PkgOptions(strategy=defn.strategy.name),
)
async def resolve(self, defns: List[Defn], **kwargs: Any) -> Dict[Defn, E.PkgSourceInvalid]:
return dict.fromkeys(defns, E.PkgSourceInvalid())
async def update(self, defns: Sequence[Defn]) -> Dict[Defn, E.ManagerResult]:
# Rebuild ``Defn`` with ID and strategy from package for defns
# of installed packages. Using the ID has the benefit of resolving
# installed-but-renamed packages - the slug is transient; the ID isn't
maybe_pkgs = ((d, self.get_pkg(d)) for d in defns)
checked_defns = {Defn.from_pkg(p) if p else c: p for c, p in maybe_pkgs}
# Results can contain errors
# installables are those results which are packages
# and updatables are packages with updates
results = await self.resolve([d for d, p in checked_defns.items() if p])
installables = {d: r for d, r in results.items() if is_pkg(r)}
updatables = {
(d, checked_defns[d], p): download_archive(self, p)
for d, p in installables.items()
if p.version != cast(Pkg, checked_defns[d]).version
}
archives = await gather(updatables.values())
async def resolve_wrapper(
self: Resolver, defn: Defn, metadata: O[JsonDict] = None
) -> m.Pkg:
if defn.strategy in self.strategies:
return await resolve_one(self, defn, metadata) # type: ignore
raise E.PkgStrategyUnsupported(defn.strategy)
async def install(self, defns: Sequence[Defn], replace: bool) -> Dict[Defn, E.ManagerResult]:
prelim_results = await self.resolve(
[d for d in defns if not self.get_pkg(d)], with_deps=True
)
# Weed out installed deps - this isn't super efficient but avoids
# having to deal with local state in the resolver
results = {d: r for d, r in prelim_results.items() if not self.get_pkg(d)}
installables = {(d, r): download_archive(self, r) for d, r in results.items() if is_pkg(r)}
archives = await gather(installables.values())
coros = dict_chain(
defns,
partial(_error_out, E.PkgAlreadyInstalled()),
((d, partial(_error_out, r)) for d, r in results.items()),
(
(d, partial(self.install_one, p, a, replace))
for (d, p), a in zip(installables, archives)
),
)
return await self._consume_seq(coros)
def extract(parent: Path) -> None:
conflicts = base_dirs & {f.name for f in parent.iterdir()}
if conflicts:
raise E.PkgConflictsWithForeign(conflicts)
else:
members = filter(should_extract(base_dirs), names)
archive.extractall(parent, members=members)