Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# local file url that is a named channel
conda_bld_path = join(gettempdir(), 'conda-bld')
try:
mkdir_p(conda_bld_path)
with env_var('CONDA_BLD_PATH', conda_bld_path, reset_context):
url = path_to_url(join_url(context.croot, 'osx-64', 'bcrypt-3.1.1-py35_2.tar.bz2'))
d = Dist(url)
assert d.channel == 'local'
assert d.name == 'bcrypt'
assert d.version == '3.1.1'
assert d.build_string == 'py35_2'
assert d.to_url() == url
assert d.is_channel is True
finally:
rm_rf(conda_bld_path)
# local file url that is not a named channel
url = join_url('file:///some/location/on/disk', 'osx-64', 'bcrypt-3.1.1-py35_2.tar.bz2')
d = Dist(url)
assert d.channel == 'file:///some/location/on/disk'
assert d.name == 'bcrypt'
assert d.version == '3.1.1'
assert d.build_string == 'py35_2'
assert d.to_url() == url
assert d.is_channel is True
def test_remove_link_to_dir():
with tempdir() as td:
dst_link = join(td, "test_link")
src_dir = join(td, "test_dir")
test_file = join(td, "test_file")
mkdir_p(src_dir)
touch(test_file)
assert isdir(src_dir)
assert not islink(src_dir)
assert not islink(dst_link)
if not softlink_supported(test_file, td) and on_win:
pytest.skip("softlink not supported")
symlink(src_dir, dst_link)
assert islink(dst_link)
assert rm_rf(dst_link)
assert not isdir(dst_link)
assert not islink(dst_link)
assert not lexists(dst_link)
assert isdir(src_dir)
assert rm_rf(src_dir)
assert not isdir(src_dir)
assert not islink(src_dir)
mkdir_p(src_dir)
touch(test_file)
assert isdir(src_dir)
assert not islink(src_dir)
assert not islink(dst_link)
if not softlink_supported(test_file, td) and on_win:
pytest.skip("softlink not supported")
symlink(src_dir, dst_link)
assert islink(dst_link)
assert rm_rf(dst_link)
assert not isdir(dst_link)
assert not islink(dst_link)
assert not lexists(dst_link)
assert isdir(src_dir)
assert rm_rf(src_dir)
assert not isdir(src_dir)
assert not islink(src_dir)
def tearDown(self):
rm_rf(self.prefix)
assert not lexists(self.prefix)
def _copy_then_remove(source_path, destination_path):
from .create import copy, create_hard_link_or_copy
if islink(source_path):
copy(source_path, destination_path)
else:
create_hard_link_or_copy(source_path, destination_path)
rm_rf(source_path)
rm_rf_these = (p for p in rm_rf_these if not p.endswith('conda.egg-link'))
for fn in rm_rf_these:
print("rm -rf %s" % join(site_packages_dir, fn), file=sys.stderr)
if not context.dry_run:
rm_rf(join(site_packages_dir, fn))
modified = True
others = (
"conda",
"conda_env",
)
for other in others:
path = join(site_packages_dir, other)
if lexists(path):
print("rm -rf %s" % path, file=sys.stderr)
if not context.dry_run:
rm_rf(path)
modified = True
if modified:
return Result.MODIFIED
else:
return Result.NO_CHANGE
' add -n NAME or -p PREFIX option')
print("\nRemove all packages in environment %s:\n" % prefix, file=sys.stderr)
if 'package_names' in args:
stp = PrefixSetup(
target_prefix=prefix,
unlink_precs=tuple(PrefixData(prefix).iter_records()),
link_precs=(),
remove_specs=(),
update_specs=(),
neutered_specs=(),
)
txn = UnlinkLinkTransaction(stp)
handle_txn(txn, prefix, args, False, True)
rm_rf(prefix, clean_empty_parents=True)
unregister_env(prefix)
return
else:
if args.features:
specs = tuple(MatchSpec(track_features=f) for f in set(args.package_names))
else:
specs = [s for s in specs_from_args(args.package_names)]
if not context.quiet:
print("Removing specs: {}".format(specs))
channel_urls = ()
subdirs = ()
installed_json_f = get_installed_jsonfile(prefix)
# the default mode is 'w+b', and universal new lines don't work in that mode
tf.write(json.dumps(plan, ensure_ascii=False, default=lambda x: x.__dict__))
temp_path = tf.name
python_exe = '"%s"' % abspath(sys.executable)
hinstance, error_code = run_as_admin((python_exe, '-m', 'conda.core.initialize',
'"%s"' % temp_path))
if error_code is not None:
print("ERROR during elevated execution.\n rc: %s" % error_code,
file=sys.stderr)
with open(temp_path) as fh:
_plan = json.loads(ensure_text_type(fh.read()))
finally:
if temp_path and lexists(temp_path):
rm_rf(temp_path)
else:
stdin = json.dumps(plan, ensure_ascii=False, default=lambda x: x.__dict__)
result = subprocess_call(
'sudo %s -m conda.core.initialize' % sys.executable,
env={},
path=os.getcwd(),
stdin=stdin
)
stderr = result.stderr.strip()
if stderr:
print(stderr, file=sys.stderr)
_plan = json.loads(result.stdout.strip())
del plan[:]
plan.extend(_plan)
def reverse(self):
rm_rf(self.target_full_path)
if lexists(self.hold_path):
log.trace("moving %s => %s", self.hold_path, self.target_full_path)
rm_rf(self.target_full_path)
backoff_rename(self.hold_path, self.target_full_path)
for prefix_setup in itervalues(prefix_setups):
test_path = join(prefix_setup.target_prefix, PREFIX_MAGIC_FILE)
test_path_existed = lexists(test_path)
dir_existed = None
try:
dir_existed = mkdir_p(dirname(test_path))
open(test_path, "a").close()
except EnvironmentError:
if dir_existed is False:
rm_rf(dirname(test_path))
yield EnvironmentNotWritableError(prefix_setup.target_prefix)
else:
if not dir_existed:
rm_rf(dirname(test_path))
elif not test_path_existed:
rm_rf(test_path)