Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def duo_mfa_challenge(duo_info, mfa_option, passcode):
"""Poke Duo to challenge the selected factor.
After the user has selected their device and factor of choice,
tell Duo to send a challenge. This is where the end user will receive
a phone call or push.
:param duo_info: dict of parameters for Duo
:param mfa_option: the user's selected second factor.
:return txid: Duo transaction ID used to track this auth attempt.
"""
url = "https://{}/frame/prompt".format(duo_info["host"])
device = mfa_option["device"].split(" - ")[0]
mfa_data = helpers.prepare_payload(factor=mfa_option["factor"],
device=device,
sid=duo_info["sid"],
out_of_date=False,
days_out_of_date=0,
days_to_block=None)
mfa_data["async"] = True # async is a reserved keyword
if passcode:
mfa_data["passcode"] = passcode
mfa_challenge = duo_api_post(url, payload=mfa_data)
txid = parse_duo_mfa_challenge(mfa_challenge)
logging.debug("Sent MFA Challenge and obtained Duo transaction ID.")
return txid
def get_duo_sid(duo_info):
"""Perform the initial Duo authentication request to obtain the SID.
The SID is referenced throughout the authentication process for Duo.
:param duo_info: dict response describing Duo factor in Okta.
:return: duo_info with added SID.
:return: duo_auth_response, contains html content listing available factors.
"""
params = helpers.prepare_payload(
tx=duo_info["tx"], v=duo_info["version"], parent=duo_info["parent"])
url = "https://{}/frame/web/v1/auth".format(duo_info["host"])
logging.info("Calling Duo {} with params {}".format(
urlparse(url).path, params.keys()))
duo_auth_response = duo_api_post(url, params=params)
try:
duo_auth_redirect = urlparse("{}".format(
unquote(duo_auth_response.url))).query
duo_info["sid"] = duo_auth_redirect.strip("sid=")
except Exception as sid_error:
logging.error("There was an error getting your SID."
"Please try again. \n{}".format(sid_error))
return duo_info, duo_auth_response
available_mfas = [d['factorType'] for d in mfa_options]
if preset_mfa is not None and preset_mfa in available_mfas:
mfa_index = available_mfas.index(settings.mfa_method)
else:
logging.warning(
"No MFA provided or provided MFA does not exist. [{}]".format(
settings.mfa_method))
mfa_index = helpers.select_preferred_mfa_index(mfa_options)
# time to challenge the mfa option
selected_mfa_option = mfa_options[mfa_index]
logging.debug("Selected MFA is [{}]".format(selected_mfa_option))
mfa_challenge_url = selected_mfa_option['_links']['verify']['href']
payload = helpers.prepare_payload(stateToken=primary_auth['stateToken'],
factorType=selected_mfa_option['factorType'],
provider=selected_mfa_option['provider'],
profile=selected_mfa_option['profile'])
selected_factor = okta_verify_api_method(
mfa_challenge_url, payload, headers)
mfa_provider = selected_factor["_embedded"]["factor"]["provider"].lower()
logging.debug("MFA Challenge URL: [{}] headers: {}".format(
mfa_challenge_url, headers))
if mfa_provider == "duo":
payload, headers, callback_url = duo_helpers.authenticate_duo(
selected_factor)
okta_verify_api_method(callback_url, payload)
payload.pop("id", "sig_response")
mfa_verify = okta_verify_api_method(
"""Authenticate user with okta credential.
:param okta_url: company specific URL of the okta
:param okta_username: okta username
:param okta_password: okta password
:return: MFA session options
"""
logging.debug(
"Authenticate user with okta credential [{} user {}]".format(
okta_url, okta_username))
headers = {
'content-type': 'application/json',
'accept': 'application/json'
}
payload = helpers.prepare_payload(
username=okta_username, password=okta_password)
primary_auth = okta_verify_api_method(
'{}/api/v1/authn'.format(okta_url), payload, headers)
logging.debug("Authenticate Okta header [{}] ".format(headers))
session_token = user_mfa_challenge(headers, primary_auth)
logging.info("User has been succesfully authenticated.")
return session_token
def cli(args):
"""Tokendito retrieves AWS credentials after authenticating with Okta."""
# Set some required initial values
args = helpers.setup(args)
logging.debug(
"tokendito retrieves AWS credentials after authenticating with Okta."
)
# Collect and organize user specific information
helpers.process_options(args)
# Authenticate okta and AWS also use assumerole to assign the role
logging.debug("Authenticate user with Okta and AWS.")
secret_session_token = okta_helpers.authenticate_user(
settings.okta_org, settings.okta_username, settings.okta_password)
saml_response_string, saml_xml = aws_helpers.authenticate_to_roles(
secret_session_token, settings.okta_aws_app_url)
def cli(args):
"""Tokendito retrieves AWS credentials after authenticating with Okta."""
# Set some required initial values
args = helpers.setup(args)
logging.debug(
"tokendito retrieves AWS credentials after authenticating with Okta."
)
# Collect and organize user specific information
helpers.process_options(args)
# Authenticate okta and AWS also use assumerole to assign the role
logging.debug("Authenticate user with Okta and AWS.")
secret_session_token = okta_helpers.authenticate_user(
settings.okta_org, settings.okta_username, settings.okta_password)
saml_response_string, saml_xml = aws_helpers.authenticate_to_roles(
secret_session_token, settings.okta_aws_app_url)
assume_role_response, role_name = aws_helpers.select_assumeable_role(
saml_response_string, saml_xml)
aws_helpers.ensure_keys_work(assume_role_response)
helpers.set_local_credentials(assume_role_response, role_name,