Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_hex_str_from_asn1_octets(self):
octets = b'123\x3b456\x1f789'
octets_asn1 = asn1_core.OctetString(octets)
hex_str = asn1_convert.hex_str_from_asn1_octets(octets_asn1)
self.assertEqual(hex_str, '3132333b3435361f373839')
# Redundant checks in case the test code changes.
tuf.formats.HEX_SCHEMA.check_match(hex_str)
self.assertEqual(octets, octets_asn1.native)
self.assertEqual(len(hex_str), 2 * len(octets))
string_asn1 = asn1_convert.to_asn1(
'alphabeta', asn1_core.VisibleString)
self.assertEqual('alphabeta', string_asn1.native)
self.assertIsInstance(string_asn1, asn1_core.VisibleString)
# Repeat the check using conversion_check.
self.conversion_check(
data='alphabeta',
datatype=asn1_core.VisibleString,
expected_der=None) # TODO: Fill in expected_der.
octets_asn1 = asn1_convert.to_asn1(
'01234567890abcdef0', asn1_core.OctetString)
self.assertEqual(
'01234567890abcdef0',
asn1_convert.hex_str_from_asn1_octets(octets_asn1))
self.assertIsInstance(octets_asn1, asn1_core.OctetString)
# Repeat the check using conversion_check.
self.conversion_check(
data='01234567890abcdef0',
datatype=asn1_core.OctetString,
expected_der=None) # TODO: Fill in expected_der.
else:
kea = {'algorithm': 'rsa'}
encrypted_key = public_key.encrypt(session_key, padding.PKCS1v15())
result = cms.RecipientInfo(
name='ktri',
value={
'version': 'v0',
'rid': cms.RecipientIdentifier(
name='issuer_and_serial_number',
value={
'issuer': tbs_cert['issuer'],
'serial_number': tbs_cert['serial_number']
}
),
'key_encryption_algorithm': kea,
'encrypted_key': core.OctetString(encrypted_key)
}
)
return result
# GrapheneObject and the data given in __init__()
self.message = OctetString(unhexlify(self.chainid)).dump()
for name, value in list(self.data.items()):
if name == "operations":
for operation in value:
if isinstance(value, string_types):
b = py23_bytes(operation, 'utf-8')
else:
b = py23_bytes(operation)
self.message += OctetString(b).dump()
elif name != "signatures":
if isinstance(value, string_types):
b = py23_bytes(value, 'utf-8')
else:
b = py23_bytes(value)
self.message += OctetString(b).dump()
self.digest = hashlib.sha256(self.message).digest()
# restore signatures
self.data["signatures"] = sigs
"utc_time": core.UTCTime(
datetime.utcnow().replace(
tzinfo=timezone.utc
)
)
}
)
]
),
}
),
cms.CMSAttribute(
{
"type": cms.CMSAttributeType("message_digest"),
"values": cms.SetOfOctetString(
[core.OctetString(message_digest)]
),
}
),
cms.CMSAttribute(
{
"type": cms.CMSAttributeType("1.2.840.113549.1.9.15"),
"values": cms.SetOfAny(
[core.Any(SmimeCapabilities(smime_cap))]
),
}
),
]
)
else:
signed_attributes = None
class_ = 1
tag = 0
class Counter(Integer):
class_ = 1
tag = 1
class Gauge(Integer):
class_ = 1
tag = 2
class TimeTicks(Integer):
class_ = 1
tag = 3
class Opaque(OctetString):
class_ = 1
tag = 4
class VarBindChoice(Choice):
_alternatives = [
('number', Integer),
('string', OctetString),
('object', ObjectIdentifier),
('address', IpAddress),
('counter', Counter),
('gauge', Gauge),
('ticks', TimeTicks),
('arbitrary', Opaque)
]
class VarBind(Sequence):
_oid_specs = {
'x509': Certificate,
}
class CrlBag(Sequence):
_fields = [
('crl_id', ObjectIdentifier),
('crl_value', OctetString, {'explicit': 0}),
]
class SecretBag(Sequence):
_fields = [
('secret_type_id', ObjectIdentifier),
('secret_value', OctetString, {'explicit': 0}),
]
class SafeContents(SequenceOf):
pass
class SafeBag(Sequence):
_fields = [
('bag_id', BagId),
('bag_value', Any, {'explicit': 0}),
('bag_attributes', Attributes, {'optional': True}),
]
_oid_pair = ('bag_id', 'bag_value')
_oid_specs = {
def encode_cert_id_key(self, hkey):
issuer_name_hash, issuer_key_hash, serial_number = hkey
issuer_name_hash = OctetString.load(issuer_name_hash)
issuer_key_hash = OctetString.load(issuer_key_hash)
serial_number = Integer.load(serial_number)
cert_id = CertId({
'hash_algorithm': DigestAlgorithm({
'algorithm': 'sha1',
'parameters': None}),
'issuer_name_hash': issuer_name_hash,
'issuer_key_hash': issuer_key_hash,
'serial_number': serial_number,
})
return cert_id
'issuer_key_hash': getattr(issuer.public_key, hash_algo),
'serial_number': cert.serial_number,
})
request = ocsp.Request({
'req_cert': cert_id,
})
tbs_request = ocsp.TBSRequest({
'request_list': ocsp.Requests([request]),
})
if nonce:
nonce_extension = ocsp.TBSRequestExtension({
'extn_id': 'nonce',
'critical': False,
'extn_value': core.OctetString(core.OctetString(os.urandom(16)).dump())
})
tbs_request['request_extensions'] = ocsp.TBSRequestExtensions([nonce_extension])
ocsp_request = ocsp.OCSPRequest({
'tbs_request': tbs_request,
})
last_e = None
for ocsp_url in cert.ocsp_urls:
try:
if sys.version_info < (3,):
ocsp_url = util.iri_to_uri(ocsp_url)
request = Request(ocsp_url)
_add_header(request, 'Accept', 'application/ocsp-response')
_add_header(request, 'Content-Type', 'application/ocsp-request')
_add_header(request, 'User-Agent', user_agent)
class ORAddress(Sequence):
_fields = [
('built_in_standard_attributes', BuiltInStandardAttributes),
('built_in_domain_defined_attributes', BuiltInDomainDefinedAttributes, {'optional': True}),
('extension_attributes', ExtensionAttributes, {'optional': True}),
]
class EDIPartyName(Sequence):
_fields = [
('name_assigner', DirectoryString, {'tag_type': 'implicit', 'tag': 0, 'optional': True}),
('party_name', DirectoryString, {'tag_type': 'implicit', 'tag': 1}),
]
class IPAddress(OctetString):
def parse(self, spec=None, spec_params=None):
"""
This method is not applicable to IP addresses
"""
raise ValueError('IP address values can not be parsed')
def set(self, value):
"""
Sets the value of the object
:param value:
A unicode string containing an IPv4 address, IPv4 address with CIDR,
an IPv6 address or IPv6 address with CIDR
"""