Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_only_return_existing_reg():
client = chisel2.uninitialized_client()
email = "test@not-example.com"
client.new_account(messages.NewRegistration.from_data(email=email,
terms_of_service_agreed=True))
client = chisel2.uninitialized_client(key=client.net.key)
class extendedAcct(dict):
def json_dumps(self, indent=None):
return json.dumps(self)
acct = extendedAcct({
"termsOfServiceAgreed": True,
"contact": [email],
"onlyReturnExisting": True
})
resp = client.net.post(client.directory['newAccount'], acct, acme_version=2)
if resp.status_code != 200:
raise(Exception("incorrect response returned for onlyReturnExisting"))
other_client = chisel2.uninitialized_client()
# Create a random domains.
d = random_domain()
# Configure the chall srv to SERVFAIL all queries for that domain.
challSrv.add_servfail_response(d)
# Expect a DNS problem with a detail that matches a regex
expectedProbType = "dns"
expectedProbRegex = re.compile(r"DNS problem: SERVFAIL looking up (A|AAAA|TXT|CAA) for {0}".format(d))
# Try and issue for the domain with the given challenge type.
failed = False
try:
chisel2.auth_and_issue([d], client=client, chall_type=chalType)
except acme_errors.ValidationError as e:
# Mark that the auth_and_issue failed
failed = True
# Extract the failed challenge from each failed authorization
for authzr in e.failed_authzrs:
c = None
if chalType == "http-01":
c = chisel2.get_chall(authzr, challenges.HTTP01)
elif chalType == "dns-01":
c = chisel2.get_chall(authzr, challenges.DNS01)
elif chalType == "tls-alpn-01":
c = chisel2.get_chall(authzr, challenges.TLSALPN01)
else:
raise(Exception("Invalid challenge type requested: {0}".format(challType)))
# The failed challenge's error should match expected
error = c.error
def certificate(self, cert, name, alt_host=None, port=443):
"""Verifies the certificate presented at name is cert"""
if alt_host is None:
host = socket.gethostbyname(name)
elif isinstance(alt_host, six.binary_type):
host = alt_host
else:
host = alt_host.encode()
name = name if isinstance(name, six.binary_type) else name.encode()
try:
presented_cert = crypto_util.probe_sni(name, host, port)
except acme_errors.Error as error:
logger.exception(str(error))
return False
return presented_cert.digest("sha256") == cert.digest("sha256")
def uninitialized_client(key=None):
if key is None:
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
directory = messages.Directory.from_json(net.get(DIRECTORY_V2).json())
return acme_client.ClientV2(directory, net)
def make_client(email=None):
"""Build an acme.Client and register a new account with a random key."""
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
client = acme_client.Client(DIRECTORY, key=key, net=net)
account = client.register(messages.NewRegistration.from_data(email=email))
client.agree_to_tos(account)
client.account = account
return client
def make_client(email=None):
"""Build an acme.Client and register a new account with a random key."""
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
directory = messages.Directory.from_json(net.get(DIRECTORY).json())
client = acme_client.ClientV2(directory, net)
tos = client.directory.meta.terms_of_service
if tos == ACCEPTABLE_TOS:
net.account = client.new_account(messages.NewRegistration.from_data(email=email,
terms_of_service_agreed=True))
else:
raise Exception("Unrecognized terms of service URL %s" % tos)
return client
def make_client(email=None):
"""Build an acme.Client and register a new account with a random key."""
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
client = acme_client.Client(DIRECTORY, key=key, net=net)
account = client.register(messages.NewRegistration.from_data(email=email))
client.agree_to_tos(account)
client.account = account
return client
def test_refresh_registration_for_unknown_key():
m = M('''[account]
dir = tests/support/valid
acme-server = http://127.0.0.1:4000/directory
[mgmt]''')
m.load_private_key()
assert type(m.key) is acme.jose.JWKRSA
m.init_client()
assert type(m.client) is acme.client.Client
with pytest.raises(exceptions.AccountError) as e:
m.refresh_registration()
assert 'Key is not yet registered' in str(e)
def make_client(email=None):
"""Build an acme.Client and register a new account with a random key."""
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
client = acme_client.Client(DIRECTORY, key=key, net=net)
account = client.register(messages.NewRegistration.from_data(email=email))
client.agree_to_tos(account)
client.account = account
return client
def register(self, new_reg=None):
self._registered = True
if new_reg is None:
new_reg = messages.NewRegistration()
self.regr = messages.RegistrationResource(
body=messages.Registration(
contact=new_reg.contact,
agreement=new_reg.agreement))
return succeed(self.regr)