Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
HEADER_STATE_VALUE = 2
HEADER_STATE_DONE = 3
def copy(data):
if six.PY3:
return data[:]
else:
return str(data)
class HTTPConnectionClosed(HTTPParseError):
pass
class HTTPProtocolViolationError(HTTPParseError):
pass
class HTTPResponse(HTTPResponseParser):
def __init__(self, method='GET', headers_type=Headers):
super(HTTPResponse, self).__init__()
self.method = method.upper()
self.headers_complete = False
self.message_begun = False
self.message_complete = False
self._headers_index = headers_type()
self._header_state = HEADER_STATE_INIT
self._current_header_field = None
self._current_header_value = None
self._header_position = 1
def _read_headers(self):
try:
start = True
while not self.headers_complete:
try:
data = self._sock.recv(self.block_size)
self.feed(data)
# depending on gevent version we get a conn reset or no data
if not len(data) and not self.headers_complete:
if start:
raise HTTPConnectionClosed(
'connection closed.')
raise HTTPParseError('connection closed before'
' end of the headers')
start = False
except gevent.socket.error as e:
if e.errno == errno.ECONNRESET:
if start:
raise HTTPConnectionClosed(
'connection closed.')
raise
if self.message_complete:
self.release()
except BaseException:
self.release()
raise
HEADER_STATE_INIT = 0
HEADER_STATE_FIELD = 1
HEADER_STATE_VALUE = 2
HEADER_STATE_DONE = 3
def copy(data):
if six.PY3:
return data[:]
else:
return str(data)
class HTTPConnectionClosed(HTTPParseError):
pass
class HTTPProtocolViolationError(HTTPParseError):
pass
class HTTPResponse(HTTPResponseParser):
def __init__(self, method='GET', headers_type=Headers):
super(HTTPResponse, self).__init__()
self.method = method.upper()
self.headers_complete = False
self.message_begun = False
self.message_complete = False
self._headers_index = headers_type()