Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self, args=(), with_chroot=False, blocking=True, setsid=False, **kwargs):
"""Run the PythonEnvironment in an interpreter in a subprocess.
:keyword args: Additional arguments to be passed to the application being invoked by the
environment.
:keyword with_chroot: Run with cwd set to the environment's working directory.
:keyword blocking: If true, return the return code of the subprocess.
If false, return the Popen object of the invoked subprocess.
:keyword setsid: If true, run the PEX in a separate operating system session.
Remaining keyword arguments are passed directly to subprocess.Popen.
"""
self.clean_environment()
cmdline = self.cmdline(args)
TRACER.log('PEX.run invoking %s' % ' '.join(cmdline))
process = Executor.open_process(cmdline,
cwd=self._pex if with_chroot else os.getcwd(),
preexec_fn=os.setsid if setsid else None,
stdin=kwargs.pop('stdin', None),
stdout=kwargs.pop('stdout', None),
stderr=kwargs.pop('stderr', None),
**kwargs)
return process.wait() if blocking else process
continue
with TRACER.timed('Resolving %s' % req, V=2):
try:
resolveds.update(working_set.resolve([req], env=self))
except DistributionNotFound as e:
TRACER.log('Failed to resolve a requirement: %s' % e)
requirers = unresolved_reqs.setdefault(e.req, OrderedSet())
if e.requirers:
requirers.update(reqs_by_key[requirer] for requirer in e.requirers)
if unresolved_reqs:
TRACER.log('Unresolved requirements:')
for req in unresolved_reqs:
TRACER.log(' - %s' % req)
TRACER.log('Distributions contained within this pex:')
distributions_by_key = defaultdict(list)
if not self._pex_info.distributions:
TRACER.log(' None')
else:
for dist_name, dist_digest in self._pex_info.distributions.items():
TRACER.log(' - %s' % dist_name)
distribution = DistributionHelper.distribution_from_path(
path=os.path.join(self._pex_info.install_cache, dist_digest, dist_name)
)
distributions_by_key[distribution.as_requirement().key].append(distribution)
if not self._pex_info.ignore_errors:
items = []
for index, (requirement, requirers) in enumerate(unresolved_reqs.items()):
rendered_requirers = ''
if requirers:
def _validate(self):
if self._hasher:
if self._hash_value != self._hasher.hexdigest():
raise Context.Error('%s failed checksum!' % (self._link.url))
else:
TRACER.log('Validated %s (%s)' % (self._link.filename, self._link.fragment), V=3)
def __init__(self, *args, **kw):
TRACER.log('Warning, using a UrllibContext which is known to be flaky.')
TRACER.log('Please build pex with the requests module for more reliable downloads.')
super(UrllibContext, self).__init__(*args, **kw)
def set_script(self, script):
"""Set the entry point of this PEX environment based upon a distribution script.
:param script: The script name as defined either by a console script or ordinary
script within the setup.py of one of the distributions added to the PEX.
:raises: :class:`PEXBuilder.InvalidExecutableSpecification` if the script is not found
in any distribution added to the PEX.
"""
# check if 'script' is a console_script
dist, entry_point = get_entry_point_from_console_script(script, self._distributions)
if entry_point:
self.set_entry_point(entry_point)
TRACER.log('Set entrypoint to console_script %r in %r' % (entry_point, dist))
return
# check if 'script' is an ordinary script
dist_script = get_script_from_distributions(script, self._distributions)
if dist_script:
if self._pex_info.entry_point:
raise self.InvalidExecutableSpecification('Cannot set both entry point and script of PEX!')
self._pex_info.script = script
TRACER.log('Set entrypoint to script %r in %r' % (script, dist_script.dist))
return
raise self.InvalidExecutableSpecification(
'Could not find script %r in any distribution %s within PEX!' % (
script, ', '.join(str(d) for d in self._distributions)))
TRACER.log('Scrubbing user PYTHONPATH element: %s' % path)
elif inherit_path == 'prefer':
TRACER.log('Prepending user PYTHONPATH: %s' % os.pathsep.join(user_pythonpath))
scrubbed_sys_path = user_pythonpath + scrubbed_sys_path
elif inherit_path == 'fallback':
TRACER.log('Appending user PYTHONPATH: %s' % os.pathsep.join(user_pythonpath))
scrubbed_sys_path = scrubbed_sys_path + user_pythonpath
scrub_from_importer_cache = filter(
lambda key: any(key.startswith(path) for path in scrub_paths),
sys.path_importer_cache.keys())
scrubbed_importer_cache = dict((key, value) for (key, value) in sys.path_importer_cache.items()
if key not in scrub_from_importer_cache)
for importer_cache_entry in scrub_from_importer_cache:
TRACER.log('Scrubbing from path_importer_cache: %s' % importer_cache_entry, V=2)
return scrubbed_sys_path, scrubbed_importer_cache
self._working_set = None
self._interpreter = interpreter or PythonInterpreter.get()
self._inherit_path = pex_info.inherit_path
self._supported_tags = []
platform = Platform.current()
platform_name = platform.platform
super(PEXEnvironment, self).__init__(
search_path=[] if pex_info.inherit_path == 'false' else sys.path,
# NB: Our pkg_resources.Environment base-class wants the platform name string and not the
# pex.platform.Platform object.
platform=platform_name,
**kw
)
self._supported_tags.extend(platform.supported_tags(self._interpreter))
TRACER.log(
'E: tags for %r x %r -> %s' % (self.platform, self._interpreter, self._supported_tags),
V=9
)