Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_hvac_client(vault_url, cacert=None):
"""Return an hvac client for the given URL.
:param vault_url: Vault url to point client at
:type vault_url: str
:param cacert: Path to CA cert used for vaults api cert.
:type cacert: str
:returns: hvac client for given url
:rtype: hvac.Client
"""
return hvac.Client(url=vault_url, verify=cacert)
"renewable": False,
"request_id": "e7c8b2e1-95e8-cb17-e98a-6c428201f1d5",
"warnings": None,
"wrap_info": None
}
mock_url = 'http://localhost:8200/v1/auth/{0}/role/{1}/custom-secret-id'.format(
'approle' if mount_point is None else mount_point,
role_name,
)
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
json=mock_response,
)
client = Client()
if mount_point is None:
actual_response = client.create_role_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
)
else:
actual_response = client.create_role_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
mount_point=mount_point,
)
self.assertEquals(
first=mock_response,
second=actual_response,
)
"request_id": "2310dc21-0fea-a2de-2d94-bb4edd59f1e9",
"warnings": None,
"wrap_info": None
}
mock_url = 'http://localhost:8200/v1/auth/{0}/role/{1}/secret-id'.format(
'approle' if mount_point is None else mount_point,
role_name,
)
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
json=mock_response,
)
client = Client()
if mount_point is None:
actual_response = client.create_role_secret_id(
role_name=role_name,
)
else:
actual_response = client.create_role_secret_id(
role_name=role_name,
mount_point=mount_point,
)
self.assertEquals(
first=mock_response,
second=actual_response,
)
raise AnsibleError("No Vault Token specified")
# split secret arg, which has format 'secret/hello:value'
# into secret='secret/hello' and secret_field='value'
s = kwargs.get('secret')
if s is None:
raise AnsibleError("No secret specified")
s_f = s.split(':')
self.secret = s_f[0]
if len(s_f) >= 2:
self.secret_field = s_f[1]
else:
self.secret_field = 'value'
self.client = hvac.Client(url=self.url, token=self.token)
if self.client.is_authenticated():
self.isAuthenticated=True
pass
elif hasattr(self, 'default'):
self.isAuthenticated=False
pass
else:
raise AnsibleError("Invalid Hashicorp Vault Token Specified")
def InsertKV(parser_args):
print("Insert Key-Value")
# Set the REQUESTS_CA_BUNDLE environment variable
vault_reader.set_requests_ca_bundle()
base_vault_path = "{}/{}/".format(parser_args.tenant, parser_args.env)
# Set Vault connection
client = hvac.Client(url=args.vault_url, token=args.vault_token)
exist_keys=[]
# Get Exist Keys
keyvalues=listKeyPaths(client,base_vault_path)
if keyvalues != None:
keyvalues=keyvalues.replace(("{}/{}/".format(parser_args.tenant, parser_args.env)),"")
keyvalueList=keyvalues.split()
for kv in keyvalueList:
if kv!='':
splitindex=kv.index("=")
k=kv[:splitindex]
exist_keys.append(k)
#print(list_keys)
# Insert Key-Value into Vault KV
try:
def _vault_client(config):
"""Helper wrapper to create Vault Client
:param: config: configparser object of vaultlocker config
:returns: hvac.Client. configured Vault Client object
"""
client = hvac.Client(url=config.get('vault', 'url'))
client.auth_approle(config.get('vault', 'approle'),
secret_id=config.get('vault', 'secret_id'))
return client
def get_secret(url, token, mount, path):
""" retreiive existing data from mount path and return dictionary """
result = {'data': {}}
try:
client = hvac.Client(url=url, token=token)
result = client.secrets.kv.v1.read_secret(path=path, mount_point=mount)
except ConnectionError:
pass
finally:
return result
def configure_vault_client(self) -> None:
self.vault_client = VaultClient()
self.vault_client.url = environ.get("VAULT_ADDR")
self.vault_client.token = environ.get("VAULT_TOKEN")
if self.vault_client.sys.is_sealed() and environ.get("UNSEAL_VAULT"):
keys = [environ.get(f"UNSEAL_VAULT_KEY{i}") for i in range(1, 6)]
self.vault_client.sys.submit_unseal_keys(filter(None, keys))
def init_vault_client(self):
self.vault_client = VaultClient()
self.vault_client.token = environ.get("VAULT_TOKEN")
if self.vault_client.sys.is_sealed() and self.config["vault"]["unseal"]:
keys = [environ.get(f"UNSEAL_VAULT_KEY{i}") for i in range(1, 6)]
self.vault_client.sys.submit_unseal_keys(filter(None, keys))
import os
import hvac
'''
export VAULT_ADDR=https://vault.example.localnet:8200/
export VAULT_TOKEN=REPLACETOKEN
export SECRET_PATH=REPLACE_PATH
'''
print(os.environ['VAULT_ADDR'])
print(os.environ['VAULT_TOKEN'])
print(os.environ['SECRET_PATH'])
client = hvac.Client()
client = hvac.Client(
url=os.environ['VAULT_ADDR'],
token=os.environ['VAULT_TOKEN'],
verify=False
)
result = client.read(os.environ['SECRET_PATH'])
print(result["data"]["user"])
print(result["data"]["password"])
print(result["data"]["host"])