Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log
elif isinstance(exc, LimitRequestLine):
mesg = "%s" % str(exc)
elif isinstance(exc, LimitRequestHeaders):
mesg = "Error parsing headers: '%s'" % str(exc)
elif isinstance(exc, InvalidProxyLine):
mesg = "'%s'" % str(exc)
elif isinstance(exc, ForbiddenProxyRequest):
reason = "Forbidden"
mesg = "Request forbidden"
status_int = 403
elif isinstance(exc, InvalidSchemeHeaders):
mesg = "%s" % str(exc)
elif isinstance(exc, SSLError):
reason = "Forbidden"
mesg = "'%s'" % str(exc)
status_int = 403
msg = "Invalid request from ip={ip}: {error}"
self.log.debug(msg.format(ip=addr[0], error=str(exc)))
else:
if hasattr(req, "uri"):
raise InvalidProxyLine(line)
# Extract data
proto = bits[1]
s_addr = bits[2]
d_addr = bits[3]
# Validation
if proto not in ["TCP4", "TCP6"]:
raise InvalidProxyLine("protocol '%s' not supported" % proto)
if proto == "TCP4":
try:
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
raise InvalidProxyLine(line)
# Extract data
proto = bits[1]
s_addr = bits[2]
d_addr = bits[3]
# Validation
if proto not in ["TCP4", "TCP6"]:
raise InvalidProxyLine("protocol '%s' not supported" % proto)
if proto == "TCP4":
try:
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
# Set data
self.proxy_protocol_info = {
"proxy_protocol": proto,
"client_addr": s_addr,
"client_port": s_port,
"proxy_addr": d_addr,
"proxy_port": d_port
}
# Validation
if proto not in ["TCP4", "TCP6"]:
raise InvalidProxyLine("protocol '%s' not supported" % proto)
if proto == "TCP4":
try:
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
# Set data
self.proxy_protocol_info = {
"proxy_protocol": proto,
"client_addr": s_addr,
"client_port": s_port,
"proxy_addr": d_addr,
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log
elif isinstance(exc, LimitRequestLine):
mesg = "%s" % str(exc)
elif isinstance(exc, LimitRequestHeaders):
mesg = "Error parsing headers: '%s'" % str(exc)
elif isinstance(exc, InvalidProxyLine):
mesg = "'%s'" % str(exc)
elif isinstance(exc, ForbiddenProxyRequest):
reason = "Forbidden"
mesg = "Request forbidden"
status_int = 403
elif isinstance(exc, InvalidSchemeHeaders):
mesg = "%s" % str(exc)
elif isinstance(exc, SSLError):
reason = "Forbidden"
mesg = "'%s'" % str(exc)
status_int = 403
msg = "Invalid request from ip={ip}: {error}"
self.log.debug(msg.format(ip=addr[0], error=str(exc)))
else:
if hasattr(req, "uri"):
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
# Set data
self.proxy_protocol_info = {
"proxy_protocol": proto,
"client_addr": s_addr,
"client_port": s_port,
"proxy_addr": d_addr,
"proxy_port": d_port
}
# Validation
if proto not in ["TCP4", "TCP6"]:
raise InvalidProxyLine("protocol '%s' not supported" % proto)
if proto == "TCP4":
try:
socket.inet_pton(socket.AF_INET, s_addr)
socket.inet_pton(socket.AF_INET, d_addr)
except socket.error:
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
# Set data
self.proxy_protocol_info = {
"proxy_protocol": proto,
"client_addr": s_addr,
"client_port": s_port,
"proxy_addr": d_addr,
raise InvalidProxyLine(line)
elif proto == "TCP6":
try:
socket.inet_pton(socket.AF_INET6, s_addr)
socket.inet_pton(socket.AF_INET6, d_addr)
except socket.error:
raise InvalidProxyLine(line)
try:
s_port = int(bits[4])
d_port = int(bits[5])
except ValueError:
raise InvalidProxyLine("invalid port %s" % line)
if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)):
raise InvalidProxyLine("invalid port %s" % line)
# Set data
self.proxy_protocol_info = {
"proxy_protocol": proto,
"client_addr": s_addr,
"client_port": s_port,
"proxy_addr": d_addr,
"proxy_port": d_port
}
def handle_error(self, req, client, addr, exc):
request_start = datetime.now()
addr = addr or ('', -1) # unix socket case
if isinstance(exc, (InvalidRequestLine, InvalidRequestMethod,
InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
LimitRequestLine, LimitRequestHeaders,
InvalidProxyLine, ForbiddenProxyRequest,
InvalidSchemeHeaders,
SSLError)):
status_int = 400
reason = "Bad Request"
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log