Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raises=exceptions.InvalidPath,
),
])
def test_read_config(self, label, configure_first=True, raises=None, exception_msg=''):
if configure_first:
configure_response = self.client.auth.okta.configure(
org_name=self.TEST_ORG_NAME,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('configure_response: %s' % configure_response)
if raises:
with self.assertRaises(raises) as cm:
self.client.auth.gcp.read_config(
mount_point=self.TEST_MOUNT_POINT,
)
self.assertIn(
pki_issue_response = self.client.write(
path='pki/issue/my-role',
common_name='test.hvac.com',
)
# Revoke the lease of our test cert that was just issued.
revoke_lease_response = self.client.sys.revoke_lease(
lease_id=pki_issue_response['lease_id'],
)
logging.debug('revoke_lease_response: %s' % revoke_lease_response)
self.assertEqual(
first=revoke_lease_response.status_code,
second=204,
)
with self.assertRaises(exceptions.InvalidPath):
self.client.sys.list_leases(
prefix='pki',
)
raises=exceptions.InvalidPath,
),
])
def test_read_role(self, label, role_name='hvac', configure_role_first=True, raises=None, exception_message=''):
bound_service_principal_ids = ['some-dummy-sp-id']
if configure_role_first:
create_role_response = self.client.auth.azure.create_role(
name=role_name,
bound_service_principal_ids=bound_service_principal_ids,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_role_response: %s' % create_role_response)
if raises is not None:
with self.assertRaises(raises):
self.client.auth.azure.read_role(
name=role_name,
raises=exceptions.InvalidPath,
),
])
def test_list_roles(self, label, num_roles_to_create=1, write_config_first=True, raises=None):
if write_config_first:
self.client.auth.azure.configure(
tenant_id='my-tenant-id',
resource='my-resource',
mount_point=self.TEST_MOUNT_POINT,
)
roles_to_create = ['hvac%s' % n for n in range(0, num_roles_to_create)]
bound_service_principal_ids = ['some-dummy-sp-id']
logging.debug('roles_to_create: %s' % roles_to_create)
for role_to_create in roles_to_create:
create_role_response = self.client.auth.azure.create_role(
name=role_to_create,
bound_service_principal_ids=bound_service_principal_ids,
:type method: str | unicode
:param mount_point: The "path" the secret engine was mounted on.
:type mount_point: str | unicode
:return: The response of the create_or_update_secret request.
:rtype: requests.Response
"""
if method is None:
# If no method was selected by the caller, use the result of a `read_secret()` call to determine if we need
# to perform an update (PUT) or creation (POST) request.
try:
self.read_secret(
path=path,
mount_point=mount_point,
)
method = 'PUT'
except exceptions.InvalidPath:
method = 'POST'
if method == 'POST':
api_path = '/v1/{mount_point}/{path}'.format(mount_point=mount_point, path=path)
return self._adapter.post(
url=api_path,
json=secret,
)
elif method == 'PUT':
api_path = '/v1/{mount_point}/{path}'.format(mount_point=mount_point, path=path)
return self._adapter.post(
url=api_path,
json=secret,
)
response = client.secrets.kv.v1.read_secret(
path=data[0], mount_point=self.vault_params.get("mount", "secret")
)
return_data = response["data"][data[1]]
else:
response = client.secrets.kv.v2.read_secret_version(
path=data[0], mount_point=self.vault_params.get("mount", "secret")
)
return_data = response["data"]["data"][data[1]]
client.adapter.close()
except Forbidden:
VaultError(
"Permission Denied. "
+ "make sure the token is authorised to access {path} on Vault".format(path=data[0])
)
except InvalidPath:
VaultError("{path} does not exist on Vault secret".format(path=data[0]))
if return_data is "":
VaultError("'{key}' doesn't exist on '{path}'".format(key=data[1], path=data[0]))
return return_data
def read(self, client):
try:
return client.read(self.path)
except hvac.exceptions.InvalidPath:
return None
def hashivault_pki_crl_get(module):
params = module.params
client = hashivault_auth_client(params)
mount_point = params.get('mount_point').strip('/')
# check if engine is enabled
_, err = check_secrets_engines(module, client)
if err:
return err
result = {"changed": False, "rc": 0}
from hvac.exceptions import InvalidPath
try:
result['data'] = client.secrets.pki.read_crl_configuration(mount_point=mount_point).get('data')
except InvalidPath:
result['rc'] = 1
result['failed'] = True
result['msg'] = u"CRLs must be configured before reading"
except Exception as e:
result['rc'] = 1
result['failed'] = True
result['msg'] = u"Exception: " + str(e)
return result
def delete(self, client):
"""Delete from Vault while handling non-surprising errors."""
if self.no_resource:
return
LOG.debug("Deleting %s", self)
try:
client.delete(self.path)
except (hvac.exceptions.InvalidPath,
hvac.exceptions.InvalidRequest) \
as vault_exception:
if str(vault_exception).startswith('no handler for route'):
return None
:param errors: Optional errors to include in a resulting exception.
:type errors: list | str
:raises: hvac.exceptions.InvalidRequest | hvac.exceptions.Unauthorized | hvac.exceptions.Forbidden |
hvac.exceptions.InvalidPath | hvac.exceptions.RateLimitExceeded | hvac.exceptions.InternalServerError |
hvac.exceptions.VaultNotInitialized | hvac.exceptions.VaultDown | hvac.exceptions.UnexpectedError
"""
if status_code == 400:
raise exceptions.InvalidRequest(message, errors=errors)
elif status_code == 401:
raise exceptions.Unauthorized(message, errors=errors)
elif status_code == 403:
raise exceptions.Forbidden(message, errors=errors)
elif status_code == 404:
raise exceptions.InvalidPath(message, errors=errors)
elif status_code == 429:
raise exceptions.RateLimitExceeded(message, errors=errors)
elif status_code == 500:
raise exceptions.InternalServerError(message, errors=errors)
elif status_code == 501:
raise exceptions.VaultNotInitialized(message, errors=errors)
elif status_code == 503:
raise exceptions.VaultDown(message, errors=errors)
else:
raise exceptions.UnexpectedError(message)