Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_headers(self):
"""
Reads the HTTP response headers from the socket
:return:
On error, None, otherwise a 4-element tuple:
0: A 2-element tuple of integers representing the HTTP version
1: An integer representing the HTTP response code
2: A unicode string of the HTTP response code name
3: An OrderedDict of HTTP headers with lowercase unicode key and unicode values
"""
version = None
code = None
text = None
headers = OrderedDict()
data = self.socket.read_until(b'\r\n\r\n')
string = data.decode('iso-8859-1')
first = False
for line in string.split('\r\n'):
line = line.strip()
if first is False:
if line == '':
continue
match = re.match(r'^HTTP/(1\.[01]) +(\d+) +(.*)$', line)
if not match:
return None
version = tuple(map(int, match.group(1).split('.')))
code = int(match.group(2))
text = match.group(3)
first = True
cri['subject_pk_info']['public_key'].parsed['public_exponent'].native
)
self.assertEqual(
[
util.OrderedDict([
('type', 'extension_request'),
(
'values',
[
[
util.OrderedDict([
('extn_id', 'basic_constraints'),
('critical', False),
(
'extn_value',
util.OrderedDict([
('ca', False),
('path_len_constraint', None),
])
),
]),
util.OrderedDict([
('extn_id', 'key_usage'),
('critical', False),
(
'extn_value',
set(['digital_signature', 'non_repudiation', 'key_encipherment']),
),
])
]
]
),
'geotrust_certs/codex.crt',
[
util.OrderedDict([
('policy_identifier', '1.3.6.1.4.1.14370.1.6'),
(
'policy_qualifiers',
[
util.OrderedDict([
('policy_qualifier_id', 'certification_practice_statement'),
('qualifier', 'https://www.geotrust.com/resources/repository/legal')
]),
util.OrderedDict([
('policy_qualifier_id', 'user_notice'),
(
'qualifier',
util.OrderedDict([
('notice_ref', None),
('explicit_text', 'https://www.geotrust.com/resources/repository/legal')
])
)
])
]
)
])
]
),
(
'lets_encrypt/isrgrootx1.pem',
None
),
(
'lets_encrypt/letsencryptauthorityx1.pem',
('key_identifier', b"'\xf8/\xe9]\xd7\r\xf4\xa8\xea\x87\x99=\xfd\x8e\xb3\x9e@\xd0\x91"),
('authority_cert_issuer', None),
('authority_cert_serial_number', None)
])
),
(
'globalsign_example_keys/SSL2.cer',
util.OrderedDict([
('key_identifier', b"'\xf8/\xe9]\xd7\r\xf4\xa8\xea\x87\x99=\xfd\x8e\xb3\x9e@\xd0\x91"),
('authority_cert_issuer', None),
('authority_cert_serial_number', None)
])
),
(
'globalsign_example_keys/SSL3.cer',
util.OrderedDict([
('key_identifier', b"'\xf8/\xe9]\xd7\r\xf4\xa8\xea\x87\x99=\xfd\x8e\xb3\x9e@\xd0\x91"),
('authority_cert_issuer', None),
('authority_cert_serial_number', None)
])
),
(
'rfc3739.crt',
util.OrderedDict([
('key_identifier', b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\xfe\xdc\xba\x98"),
('authority_cert_issuer', None),
('authority_cert_serial_number', None)
])
)
self.assertEqual(
util.OrderedDict([
('country_name', 'US'),
('state_or_province_name', 'Massachusetts'),
('locality_name', 'Newbury'),
('organization_name', 'Codex Non Sufficit LC'),
('organizational_unit_name', 'Test Third-Level Certificate'),
('common_name', 'Will Bond'),
('email_address', 'will@codexns.io'),
]),
cri['subject'].native
)
self.assertEqual(
util.OrderedDict([
('algorithm', 'rsa'),
('parameters', None),
]),
cri['subject_pk_info']['algorithm'].native
)
self.assertEqual(
24242772097421005542208203320016703216069397492249392798445262959177221203301502279838173203064357049006693856302147277901773700963054800321566171864477088538775137040886151390015408166478059887940234405152693144166884492162723776487601158833605063151869850475289834250129252480954724818505034734280077580919995584375189497366089269712298471489896645221362055822887892887126082288043106492130176555423739906252380437817155678204772878611148787130925042126257401487070141904017757131876614711613405231164930930771261221451019736883391322299033324412671768599041417705072563016759224152503535867541947310239343903761461, # noqa
cri['subject_pk_info']['public_key'].parsed['modulus'].native
)
self.assertEqual(
65537,
cri['subject_pk_info']['public_key'].parsed['public_exponent'].native
)
self.assertEqual(
[
util.OrderedDict([
)
self.maxDiff = None
for extension in extensions:
self.assertIsInstance(
extension,
x509.Extension
)
self.assertEqual(
[
util.OrderedDict([
('extn_id', 'key_identifier'),
('critical', False),
('extn_value', b'\xBE\x42\x85\x3D\xCC\xFF\xE3\xF9\x28\x02\x8F\x7E\x58\x56\xB4\xFD\x03\x5C\xEA\x4B'),
]),
util.OrderedDict([
('extn_id', 'authority_key_identifier'),
('critical', False),
(
'extn_value',
util.OrderedDict([
(
'key_identifier',
b'\xBE\x42\x85\x3D\xCC\xFF\xE3\xF9\x28\x02\x8F\x7E\x58\x56\xB4\xFD\x03\x5C\xEA\x4B'
),
(
'authority_cert_issuer',
[
util.OrderedDict([
('country_name', 'US'),
('state_or_province_name', 'Massachusetts'),
('locality_name', 'Newbury'),
extension,
x509.Extension
)
self.assertEqual(
[
util.OrderedDict([
('extn_id', 'key_identifier'),
('critical', False),
('extn_value', b'\x54\xAA\x54\x70\x6C\x34\x1A\x6D\xEB\x5D\x97\xD7\x1E\xFC\xD5\x24\x3C\x8A\x0E\xD7'),
]),
util.OrderedDict([
('extn_id', 'authority_key_identifier'),
('critical', False),
(
'extn_value',
util.OrderedDict([
(
'key_identifier',
b'\x54\xAA\x54\x70\x6C\x34\x1A\x6D\xEB\x5D\x97\xD7\x1E\xFC\xD5\x24\x3C\x8A\x0E\xD7'
),
('authority_cert_issuer', None),
('authority_cert_serial_number', None),
])
),
]),
util.OrderedDict([
('extn_id', 'basic_constraints'),
('critical', False),
(
'extn_value',
util.OrderedDict([
('ca', True),
:param timeout:
The int number of seconds to set the timeout to
:return:
The string contents of the URL
"""
self.setup_connection(url, timeout)
tries = 0
while tries < 2:
tries += 1
try:
self.ensure_connected()
req_headers = OrderedDict()
req_headers['Host'] = self.url_info[0]
if self.url_info[1] != 443:
req_headers['Host'] += ':%d' % self.url_info[1]
req_headers['Connection'] = 'Keep-Alive' if self.keep_alive else 'Close'
req_headers["User-Agent"] = 'oscrypto %s TLS HTTP Client' % version.__version__
request = 'GET '
url_info = urlparse(url)
path = '/' if not url_info.path else url_info.path
if url_info.query:
path += '?' + url_info.query
request += path + ' HTTP/1.1'
self.write_request(request, req_headers)
response = self.read_headers()
if not response:
'keys/test-pkcs8.key',
'keys/test-pkcs8-der.key',
'PRIVATE KEY',
{}
),
(
'test-third.csr',
'test-third-der.csr',
'CERTIFICATE REQUEST',
{}
),
(
'keys/test-aes128.key',
'keys/test-aes128-der.key',
'RSA PRIVATE KEY',
util.OrderedDict([
('Proc-Type', '4,ENCRYPTED'),
('DEK-Info', 'AES-128-CBC,01F6EE04516C912788B11BD7377626C2')
])
]
),
(
'globalsign_example_keys/IssuingCA-der.cer',
[
util.OrderedDict([
('distribution_point', ['http://crl.globalsign.com/gs/trustrootcatg2.crl']),
('reasons', None),
('crl_issuer', None)
])
]
),
(
'globalsign_example_keys/rootCA.cer',
[
util.OrderedDict([
('distribution_point', ['http://crl.globalsign.com/gs/trustrootcatg2.crl']),
('reasons', None),
('crl_issuer', None)
])
]
),
(
'globalsign_example_keys/SSL1.cer',
[]
),
(
'globalsign_example_keys/SSL2.cer',
[]
),
(
'globalsign_example_keys/SSL3.cer',