Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_console_entry_point(bin_dir, name, value, env_exe, creator):
result = []
if sys.platform == "win32":
# windows doesn't support simple script files, so fallback to more complicated exe generator
from distlib.scripts import ScriptMaker
maker = ScriptMaker(None, str(bin_dir))
maker.clobber = True # overwrite
maker.variants = {"", "X", "X.Y"} # create all variants
maker.set_mode = True # ensure they are executable
maker.executable = str(env_exe)
specification = "{} = {}".format(name, value)
new_files = maker.make(specification)
result.extend(new_files)
else:
module, func = value.split(":")
content = (
dedent(
"""
#!{0}
# -*- coding: utf-8 -*-
import re
import sys
def handle(self, requester, data):
requirement = data['requirement']
wheel = Wheel(data['wheel'])
run_time_dependencies = wheel.metadata.requires_dist
for spec in run_time_dependencies:
# Packages might declare their "extras" here, so let's split it
dependency, extra = (';' in spec and spec or spec + ';').split(';')
self.emit('dependency_found', self.name,
requirement=dependency,
dependency_of=requirement)
return {'requirement': requirement}
def _ensure_cfg_read():
global _cfg_read
if not _cfg_read:
from distlib.resources import finder
backport_package = __name__.rsplit('.', 1)[0]
_finder = finder(backport_package)
_cfgfile = _finder.find('sysconfig.cfg')
assert _cfgfile, 'sysconfig.cfg exists'
with _cfgfile.as_stream() as s:
_SCHEMES.readfp(s)
if _PYTHON_BUILD:
for scheme in ('posix_prefix', 'posix_home'):
_SCHEMES.set(scheme, 'include', '{srcdir}/Include')
_SCHEMES.set(scheme, 'platinclude', '{projectbase}/.')
_cfg_read = True
def _ensure_cfg_read():
global _cfg_read
if not _cfg_read:
from distlib.resources import finder
backport_package = __name__.rsplit('.', 1)[0]
_finder = finder(backport_package)
_cfgfile = _finder.find('sysconfig.cfg')
assert _cfgfile, 'sysconfig.cfg exists'
with _cfgfile.as_stream() as s:
_SCHEMES.readfp(s)
if _PYTHON_BUILD:
for scheme in ('posix_prefix', 'posix_home'):
_SCHEMES.set(scheme, 'include', '{srcdir}/Include')
_SCHEMES.set(scheme, 'platinclude', '{projectbase}/.')
_cfg_read = True
def check_credentials(self):
"""
Check that ``username`` and ``password`` have been set, and raise an
exception if not.
"""
if self.username is None or self.password is None:
raise DistlibException('username and password must be set')
pm = HTTPPasswordMgr()
_, netloc, _, _, _, _ = urlparse(self.url)
pm.add_password(self.realm, netloc, self.username, self.password)
self.password_handler = HTTPBasicAuthHandler(pm)
def update_url_credentials(base_url, other_url):
base = compat.urlparse(base_url)
other = compat.urlparse(other_url)
# If they're not from the same server, we return right away without
# trying to update anything
if base.hostname != other.hostname or base.port != other.port:
return other.geturl()
# Update the `netloc` field and return the `other` url
return other._replace(netloc=base.netloc).geturl()
def check_credentials(self):
"""
Check that ``username`` and ``password`` have been set, and raise an
exception if not.
"""
if self.username is None or self.password is None:
raise DistlibException('username and password must be set')
pm = HTTPPasswordMgr()
_, netloc, _, _, _, _ = urlparse(self.url)
pm.add_password(self.realm, netloc, self.username, self.password)
self.password_handler = HTTPBasicAuthHandler(pm)
def get_page(self, url):
# http://peak.telecommunity.com/DevCenter/EasyInstall#package-index-api
scheme, netloc, path, _, _, _ = compat.urlparse(url)
if scheme == 'file' and os.path.isdir(url2pathname(path)):
url = compat.urljoin(ensure_slash(url), 'index.html')
# The `retrieve()` method follows any eventual redirects, so the
# initial url might be different from the final one
try:
response, final_url = http_retrieve(self.opener, url)
except urllib3.exceptions.MaxRetryError:
return
content_type = response.headers.get('content-type', '')
if locators.HTML_CONTENT_TYPE.match(content_type):
data = response.data
encoding = response.headers.get('content-encoding')
if encoding:
decoder = self.decoders[encoding] # fail if not found
def update_url_credentials(self, base_url, other_url):
base = compat.urlparse(base_url)
other = compat.urlparse(other_url)
# If they're not from the same server, we return right away without
# trying to update anything
if base.hostname != other.hostname or base.password != other.password:
return other.geturl()
# Updating credentials only if the target URL doesn't have the data
# that we want to set
if base.username and not other.username:
other.username = base.username
if base.password and not other.password:
other.password = base.password
return other.geturl()
def get_opener():
http_proxy = os.getenv('http_proxy')
if http_proxy:
parsed_url = compat.urlparse(http_proxy)
proxy_headers = util.get_auth_info_from_url(
http_proxy, proxy=True)
return urllib3.ProxyManager(
proxy_url=parsed_url.geturl(),
proxy_headers=proxy_headers)
return urllib3.PoolManager()