Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.strict_mode and method != self.method:
resp = (
'Malformed method name: According to RFC 2616 '
'(section 5.1.1) and its successors '
'RFC 7230 (section 3.1.1) and RFC 7231 (section 4.1) '
'method names are case-sensitive and uppercase.'
)
self.simple_response('400 Bad Request', resp)
return False
try:
if six.PY2: # FIXME: Figure out better way to do this
# Ref: https://stackoverflow.com/a/196392/595220 (like this?)
"""This is a dummy check for unicode in URI."""
ntou(bton(uri, 'ascii'), 'ascii')
scheme, authority, path, qs, fragment = urllib.parse.urlsplit(uri)
except UnicodeError:
self.simple_response('400 Bad Request', 'Malformed Request-URI')
return False
uri_is_absolute_form = (scheme or authority)
if self.method == b'OPTIONS':
# TODO: cover this branch with tests
path = (
uri
# https://tools.ietf.org/html/rfc7230#section-5.3.4
if (self.proxy_mode and uri_is_absolute_form)
else path
)
elif self.method == b'CONNECT':
def get_environ(self):
"""Return a new environ dict targeting the given wsgi.version."""
req = self.req
req_conn = req.conn
env = {
# set a non-standard environ entry so the WSGI app can know what
# the *real* server protocol is (and what features to support).
# See http://www.faqs.org/rfcs/rfc2145.html.
'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
'PATH_INFO': bton(req.path),
'QUERY_STRING': bton(req.qs),
'REMOTE_ADDR': req_conn.remote_addr or '',
'REMOTE_PORT': str(req_conn.remote_port or ''),
'REQUEST_METHOD': bton(req.method),
'REQUEST_URI': bton(req.uri),
'SCRIPT_NAME': '',
'SERVER_NAME': req.server.server_name,
# Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
'SERVER_PROTOCOL': bton(req.request_protocol),
'SERVER_SOFTWARE': req.server.software,
'wsgi.errors': sys.stderr,
'wsgi.input': req.rfile,
'wsgi.input_terminated': bool(req.chunked_read),
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
# FIXME: keep requested bind_addr separate real bound_addr (port
# is different in case of ephemeral port 0)
bind_addr = socket_.getsockname()
if socket_.family in (
# Windows doesn't have socket.AF_UNIX, so not using it in check
socket.AF_INET,
socket.AF_INET6,
):
"""UNIX domain sockets are strings or bytes.
In case of bytes with a leading null-byte it's an abstract socket.
"""
return bind_addr[:2]
if isinstance(bind_addr, six.binary_type):
bind_addr = bton(bind_addr)
return bind_addr
env['X_REMOTE_USER'] = str(req_conn.peer_user)
env['X_REMOTE_GROUP'] = str(req_conn.peer_group)
env['REMOTE_USER'] = env['X_REMOTE_USER']
except RuntimeError:
"""Unable to retrieve peer creds data.
Unsupported by current kernel or socket error happened, or
unsupported socket type, or disabled.
"""
else:
env['SERVER_PORT'] = str(req.server.bind_addr[1])
# Request headers
env.update(
('HTTP_' + bton(k).upper().replace('-', '_'), bton(v))
for k, v in req.inheaders.items()
)
# CONTENT_TYPE/CONTENT_LENGTH
ct = env.pop('HTTP_CONTENT_TYPE', None)
if ct is not None:
env['CONTENT_TYPE'] = ct
cl = env.pop('HTTP_CONTENT_LENGTH', None)
if cl is not None:
env['CONTENT_LENGTH'] = cl
if req.conn.ssl_env:
env.update(req.conn.ssl_env)
return env
def get_environ(self):
"""Return a new environ dict targeting the given wsgi.version."""
req = self.req
req_conn = req.conn
env = {
# set a non-standard environ entry so the WSGI app can know what
# the *real* server protocol is (and what features to support).
# See http://www.faqs.org/rfcs/rfc2145.html.
'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
'PATH_INFO': bton(req.path),
'QUERY_STRING': bton(req.qs),
'REMOTE_ADDR': req_conn.remote_addr or '',
'REMOTE_PORT': str(req_conn.remote_port or ''),
'REQUEST_METHOD': bton(req.method),
'REQUEST_URI': bton(req.uri),
'SCRIPT_NAME': '',
'SERVER_NAME': req.server.server_name,
# Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
'SERVER_PROTOCOL': bton(req.request_protocol),
'SERVER_SOFTWARE': req.server.software,
'wsgi.errors': sys.stderr,
'wsgi.input': req.rfile,
'wsgi.input_terminated': bool(req.chunked_read),
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': bton(req.scheme),
'wsgi.version': self.version,
}
if isinstance(req.server.bind_addr, six.string_types):
def get_environ(self):
"""Return a new environ dict targeting the given wsgi.version."""
req = self.req
req_conn = req.conn
env = {
# set a non-standard environ entry so the WSGI app can know what
# the *real* server protocol is (and what features to support).
# See http://www.faqs.org/rfcs/rfc2145.html.
'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
'PATH_INFO': bton(req.path),
'QUERY_STRING': bton(req.qs),
'REMOTE_ADDR': req_conn.remote_addr or '',
'REMOTE_PORT': str(req_conn.remote_port or ''),
'REQUEST_METHOD': bton(req.method),
'REQUEST_URI': bton(req.uri),
'SCRIPT_NAME': '',
'SERVER_NAME': req.server.server_name,
# Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
'SERVER_PROTOCOL': bton(req.request_protocol),
'SERVER_SOFTWARE': req.server.software,
'wsgi.errors': sys.stderr,
'wsgi.input': req.rfile,
'wsgi.input_terminated': bool(req.chunked_read),
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': bton(req.scheme),
'wsgi.version': self.version,
}
def get_environ(self):
"""Return a new environ dict targeting the given wsgi.version."""
req = self.req
req_conn = req.conn
env = {
# set a non-standard environ entry so the WSGI app can know what
# the *real* server protocol is (and what features to support).
# See http://www.faqs.org/rfcs/rfc2145.html.
'ACTUAL_SERVER_PROTOCOL': req.server.protocol,
'PATH_INFO': bton(req.path),
'QUERY_STRING': bton(req.qs),
'REMOTE_ADDR': req_conn.remote_addr or '',
'REMOTE_PORT': str(req_conn.remote_port or ''),
'REQUEST_METHOD': bton(req.method),
'REQUEST_URI': bton(req.uri),
'SCRIPT_NAME': '',
'SERVER_NAME': req.server.server_name,
# Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
'SERVER_PROTOCOL': bton(req.request_protocol),
'SERVER_SOFTWARE': req.server.software,
'wsgi.errors': sys.stderr,
'wsgi.input': req.rfile,
'wsgi.input_terminated': bool(req.chunked_read),
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': bton(req.scheme),
'REMOTE_ADDR': req_conn.remote_addr or '',
'REMOTE_PORT': str(req_conn.remote_port or ''),
'REQUEST_METHOD': bton(req.method),
'REQUEST_URI': bton(req.uri),
'SCRIPT_NAME': '',
'SERVER_NAME': req.server.server_name,
# Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
'SERVER_PROTOCOL': bton(req.request_protocol),
'SERVER_SOFTWARE': req.server.software,
'wsgi.errors': sys.stderr,
'wsgi.input': req.rfile,
'wsgi.input_terminated': bool(req.chunked_read),
'wsgi.multiprocess': False,
'wsgi.multithread': True,
'wsgi.run_once': False,
'wsgi.url_scheme': bton(req.scheme),
'wsgi.version': self.version,
}
if isinstance(req.server.bind_addr, six.string_types):
# AF_UNIX. This isn't really allowed by WSGI, which doesn't
# address unix domain sockets. But it's better than nothing.
env['SERVER_PORT'] = ''
try:
env['X_REMOTE_PID'] = str(req_conn.peer_pid)
env['X_REMOTE_UID'] = str(req_conn.peer_uid)
env['X_REMOTE_GID'] = str(req_conn.peer_gid)
env['X_REMOTE_USER'] = str(req_conn.peer_user)
env['X_REMOTE_GROUP'] = str(req_conn.peer_group)
env['REMOTE_USER'] = env['X_REMOTE_USER']