Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#common = commonprefix((with_pathsep(subds), with_pathsep(path)))
#if common.endswith(sep) and common == with_pathsep(subds):
# candidates.append(common)
subdsres = get_status_dict(
'subdataset',
status='ok',
type='dataset',
logger=lgr)
subdsres.update(sm)
subdsres['parentds'] = dspath
if to_report:
if contains_hits:
subdsres['contains'] = contains_hits
if (not bottomup and \
(fulfilled is None or
GitRepo.is_valid_repo(sm['path']) == fulfilled)):
yield subdsres
# expand list with child submodules. keep all paths relative to parent
# and convert jointly at the end
if recursive and \
(recursion_limit in (None, 'existing') or
(isinstance(recursion_limit, int) and
recursion_limit > 1)):
for r in _get_submodules(
Dataset(sm['path']),
paths,
fulfilled, recursive,
(recursion_limit - 1)
if isinstance(recursion_limit, int)
else recursion_limit,
contains,
if self._repo is not None and realpath(self.path) == self._repo.path:
# we got a repo and path references still match
if isinstance(self._repo, AnnexRepo):
# it's supposed to be an annex
if self._repo is AnnexRepo._unique_instances.get(
self._repo.path, None) and \
AnnexRepo.is_valid_repo(self._repo.path,
allow_noninitialized=True):
# it's still the object registered as flyweight and it's a
# valid annex repo
return self._repo
elif isinstance(self._repo, GitRepo):
# it's supposed to be a plain git
if self._repo is GitRepo._unique_instances.get(
self._repo.path, None) and \
GitRepo.is_valid_repo(self._repo.path) and not \
self._repo.is_with_annex():
# it's still the object registered as flyweight, it's a
# valid git repo and it hasn't turned into an annex
return self._repo
# Note: Although it looks like the "self._repo = None" assignments
# could be used instead of variable "valid", that's a big difference!
# The *Repo instances are flyweights, not singletons. self._repo might
# be the last reference, which would lead to those objects being
# destroyed and therefore the constructor call would result in an
# actually new instance. This is unnecessarily costly.
valid = False
for cls, ckw, kw in (
# TODO: Do we really want to allow_noninitialized=True here?
# And if so, leave a proper comment!
(AnnexRepo, {'allow_noninitialized': True}, {'init': False}),
path: str
reckless: bool
"""
# figuring out what dataset to start with, --contains limits --recursive
# to visit only subdataset on the trajectory to the target path
subds_trail = ds.subdatasets(contains=path, recursive=True,
on_failure="ignore",
result_filter=is_ok_dataset)
if not subds_trail:
# there is not a single known subdataset (installed or not)
# for this path -- job done
return
# otherwise we start with the one deepest down
cur_subds = subds_trail[-1]
while not GitRepo.is_valid_repo(cur_subds['path']):
# install using helper that give some flexibility regarding where to
# get the module from
for res in _install_subds_from_flexible_source(
Dataset(cur_subds['parentds']),
cur_subds,
reckless=reckless,
description=description):
if res.get('action', None) == 'install':
if res['status'] == 'ok':
# report installation, whether it helped or not
res['message'] = (
"Installed subdataset in order to get %s",
str(path))
# next subdataset candidate
sd = Dataset(res['path'])
yield res
def _get_submodules(ds, paths, fulfilled, recursive, recursion_limit,
contains, bottomup, set_property, delete_property,
refds_path):
dspath = ds.path
repo = ds.repo
if not GitRepo.is_valid_repo(dspath):
return
# put in giant for-loop to be able to yield results before completion
for sm in _parse_git_submodules(ds.pathobj, repo, paths):
contains_hits = []
if contains:
contains_hits = [
c for c in contains if sm['path'] == c or sm['path'] in c.parents
]
if not contains_hits:
# we are not looking for this subds, because it doesn't
# match the target path
continue
# do we just need this to recurse into subdatasets, or is this a
# real results?
to_report = paths is None \
or any(p == sm['path'] or p in sm['path'].parents
def _discover_subdatasets_recursively(
discovered, top, trace, recursion_limit):
# this beast walks the directory tree from a give `top` directory
# and discovers valid repos that are scattered around, regardless
# of whether they are already subdatasets or not
# `trace` must be a list that has at least one element (the base
# dataset)
if recursion_limit is not None and len(trace) > recursion_limit:
return
if not isdir(top):
return
if not op.islink(top) and GitRepo.is_valid_repo(top):
if top in discovered:
# this was found already, assume everything beneath it too
return
discovered[top] = dict(
path=top,
# and its content
process_content=True,
type='dataset',
parentds=trace[-1])
# new node in the trace down
trace = trace + [top]
for path in listdir(top):
path = opj(top, path)
if not isdir(path):
continue
# next level down
if path in reported_paths:
# we already recorded this path in the output
# this can happen, whenever `path` is a subdataset, that was
# discovered via recursive processing of another path before
continue
# the path exists in some shape or form
# TODO if we have path_props already we could skip this test
if isdir(path):
# keep any existing type info, previously a more expensive run
# could have discovered an uninstalled 'dataset', and we don't
# want it to be relabeled to a directory
path_props['type'] = \
path_props.get(
'type',
'dataset' if not islink(path) and GitRepo.is_valid_repo(path) else 'directory')
# this could contain all types of additional content
containing_dir = path if not islink(path) else normpath(opj(path, pardir))
else:
if lexists(path):
path_props['type'] = 'file'
else:
path_props['state'] = 'absent'
# for everything else we are interested in the container
containing_dir = dirname(path)
if not containing_dir:
containing_dir = curdir
dspath = parent = get_dataset_root(containing_dir)
if dspath:
if path_props.get('type', None) == 'dataset':
# for a dataset the root is not the parent, for anything else
def is_valid_repo(cls, path, allow_noninitialized=False):
"""Return True if given path points to an annex repository
"""
# Note: default value for allow_noninitialized=False is important
# for invalidating an instance via self._flyweight_invalid. If this is
# changed, we also need to override _flyweight_invalid and explicitly
# pass allow_noninitialized=False!
initialized_annex = GitRepo.is_valid_repo(path) and \
exists(opj(path, '.git', 'annex'))
if allow_noninitialized:
try:
return initialized_annex \
or GitRepo(path, create=False, init=False).is_with_annex()
except (NoSuchPathError, InvalidGitRepositoryError):
return False
else:
return initialized_annex
def _adj2subtrees(base, adj, subs):
# given a set of parent-child mapping, compute a mapping of each parent
# to all its (grand)children of any depth level
subtrees = dict(adj)
subs = set(subs)
# from bottom up
for ds in sorted(adj, reverse=True):
subtree = []
for sub in subtrees[ds]:
subtree.append(sub)
subtree.extend(subtrees.get(sub, []))
subtrees[ds] = subtree
# give each leaf dataset an entry too
for sub in subs:
if sub not in subtrees and GitRepo.is_valid_repo(sub):
subtrees[sub] = []
return subtrees