Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@utils.deprecated_method(
to_be_removed_in_version='0.11.2',
new_method=api.auth_methods.Aws.read_config,
)
def get_vault_ec2_client_configuration(self, mount_point='aws-ec2'):
"""GET /auth//config/client
:param mount_point:
:type mount_point:
:return:
:rtype:
"""
return self._adapter.get('/v1/auth/{0}/config/client'.format(mount_point)).json()
:type password: str | unicode
:verify_connection: Specifies whether to verify connection URI, username, and password.
:type verify_connection: bool
:param mount_point: Specifies the place where the secrets engine will be accessible (default: rabbitmq).
:type mount_point: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
params = {
"connection_uri": connection_uri,
"verify_connection": verify_connection,
"username": username,
"password": password
}
api_path = utils.format_url('/v1/{mount_point}/config/connection', mount_point=mount_point)
return self._adapter.post(
url=api_path,
json=params,
)
def set_urls(self, params, mount_point=DEFAULT_MOUNT_POINT):
"""Set URLs.
Setting the issuing certificate endpoints, CRL distribution points, and OCSP server endpoints that will be
encoded into issued certificates. You can update any of the values at any time without affecting the other
existing values. To remove the values, simply use a blank string as the parameter.
Supported methods:
POST: /{mount_point}/config/urls. Produces: 200 application/json
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url('/v1/{mount_point}/config/urls', mount_point=mount_point)
return self._adapter.post(
url=api_path,
json=params,
)
def list_leases(self, prefix):
"""Retrieve a list of lease ids.
Supported methods:
LIST: /sys/leases/lookup/{prefix}. Produces: 200 application/json
:param prefix: Lease prefix to filter list by.
:type prefix: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
api_path = utils.format_url('/v1/sys/leases/lookup/{prefix}', prefix=prefix)
response = self._adapter.list(
url=api_path,
)
return response.json()
:type timeout: int
:param proxies: Proxies to use when performing requests.
See: http://docs.python-requests.org/en/master/user/advanced/#proxies
:type proxies: dict
:param allow_redirects: Whether to follow redirects when sending requests to Vault.
:type allow_redirects: bool
:param session: Optional session object to use when performing request.
:type session: request.Session
:param adapter: Optional class to be used for performing requests. If none is provided, defaults to
hvac.adapters.Request
:type adapter: hvac.adapters.Adapter
:param namespace: Optional Vault Namespace.
:type namespace: str
"""
token = token if token is not None else utils.get_token_from_env()
url = url if url else os.getenv('VAULT_ADDR', DEFAULT_URL)
self._adapter = adapter(
base_uri=url,
token=token,
cert=cert,
verify=verify,
timeout=timeout,
proxies=proxies,
allow_redirects=allow_redirects,
session=session,
namespace=namespace
)
# Instantiate API classes to be exposed as properties on this class starting with auth method classes.
self._auth = api.AuthMethods(adapter=self._adapter)
self._secrets = api.SecretsEngines(adapter=self._adapter)
def read_role(self, name, mount_point=DEFAULT_MOUNT_POINT):
"""This endpoint queries the role definition.
:param name: Specifies the name of the role to read.
:type name: str | unicode
:param mount_point: Specifies the place where the secrets engine will be accessible (default: rabbitmq).
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: requests.Response
"""
api_path = utils.format_url("/v1/{}/roles/{}", mount_point, name)
return self._adapter.get(
url=api_path,
).json()
This requires sudo capability and access to it should be tightly controlled as it can be used to revoke very
large numbers of secrets/tokens at once.
Supported methods:
PUT: /sys/leases/revoke-prefix/{prefix}. Produces: 204 (empty body)
:param prefix: The prefix to revoke.
:type prefix: str | unicode
:return: The response of the request.
:rtype: requests.Response
"""
params = {
'prefix': prefix,
}
api_path = utils.format_url('/v1/sys/leases/revoke-prefix/{prefix}', prefix=prefix)
return self._adapter.put(
url=api_path,
json=params,
)
@utils.deprecated_method(
to_be_removed_in_version='0.9.0',
new_method=login,
)
def auth(self, url, use_token=True, **kwargs):
return self.login(
url=url,
use_token=use_token,
**kwargs
)
def read_ca_certificate(self, mount_point=DEFAULT_MOUNT_POINT):
"""Read CA Certificate.
Retrieves the CA certificate in raw DER-encoded form.
Supported methods:
GET: /{mount_point}/ca/pem. Produces: String
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The certificate as pem.
:rtype: str
"""
api_path = utils.format_url('/v1/{mount_point}/ca/pem', mount_point=mount_point)
response = self._adapter.get(
url=api_path,
)
return str(response.text)
response = self.session.request(
method=method,
url=url,
headers=headers,
allow_redirects=self.allow_redirects,
**_kwargs
)
if raise_exception and 400 <= response.status_code < 600:
text = errors = None
if response.headers.get('Content-Type') == 'application/json':
errors = response.json().get('errors')
if errors is None:
text = response.text
utils.raise_for_error(response.status_code, text, errors=errors)
return response