Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
port_nodes = xmlutil.xpath_find(self._dom, PORT_XPATH)
if not port_nodes:
self._log.warn("No ports found")
for node in port_nodes:
binding_id = node.get('binding')
binding_id = binding_id.split(':')[-1]
trust_policy = bindings.get(binding_id)
if trust_policy:
binding_policy = policies.get(trust_policy.get('url'))
if binding_policy and not binding_policy.get('url', None):
binding_policy['version'] = trust_policy['version']
address_node = node.find(ADDRESS_XPATH, XmlNamespaces.namespaces)
if address_node is None:
raise AdalError("No address nodes on port")
address = xmlutil.find_element_text(address_node)
if _url_is_secure(address):
binding_policy['url'] = address
else:
self._log.warn(
"Skipping insecure endpoint: %(mex_endpoint)s",
{"mex_endpoint": address})
int_keys = [
OAuth2.DeviceCodeResponseParameters.EXPIRES_IN,
OAuth2.DeviceCodeResponseParameters.INTERVAL
]
self._parse_optional_ints(wire_response, int_keys)
if not wire_response.get(OAuth2.DeviceCodeResponseParameters.EXPIRES_IN):
raise AdalError('wire_response is missing expires_in', wire_response)
if not wire_response.get(OAuth2.DeviceCodeResponseParameters.DEVICE_CODE):
raise AdalError('wire_response is missing device_code', wire_response)
if not wire_response.get(OAuth2.DeviceCodeResponseParameters.USER_CODE):
raise AdalError('wire_response is missing user_code', wire_response)
#skip field naming tweak, becasue names from wire are python style already
return wire_response
already authorized this account by creating a Service Principal
with the appropriate (Contributor) role.
Returns:
A ServicePrincipalCredentials instance, that can be used to access or
create any resources.
"""
app_id = parameters[self.PARAM_APP_ID]
app_secret_key = parameters[self.PARAM_APP_SECRET]
tenant_id = parameters[self.PARAM_TENANT_ID]
# Get an Authentication token using ADAL.
context = adal.AuthenticationContext(self.AZURE_AUTH_ENDPOINT + tenant_id)
try:
token_response = context.acquire_token_with_client_credentials(
self.AZURE_RESOURCE_URL, app_id, app_secret_key)
except adal.adal_error.AdalError as e:
raise AgentConfigurationException(
"Unable to communicate with Azure! Please check your cloud "
"configuration. Reason: {}".format(e.message))
token_response.get('accessToken')
# To access Azure resources for an application, we need a Service Principal
# with the accurate role assignment. It can be created using the Azure CLI.
credentials = ServicePrincipalCredentials(client_id=app_id,
secret=app_secret_key,
tenant=tenant_id)
return credentials
except NoTTYException:
raise CLIError('Please specify both username and password in non-interactive mode.')
else:
interactive = True
try:
subscriptions = profile.find_subscriptions_on_login(
interactive,
username,
password,
service_principal,
tenant,
use_device_code=use_device_code,
allow_no_subscriptions=allow_no_subscriptions,
use_cert_sn_issuer=use_cert_sn_issuer)
except AdalError as err:
# try polish unfriendly server errors
if username:
msg = str(err)
suggestion = "For cross-check, try 'az login' to authenticate through browser."
if ('ID3242:' in msg) or ('Server returned an unknown AccountType' in msg):
raise CLIError("The user name might be invalid. " + suggestion)
if 'Server returned error in RSTR - ErrorCode' in msg:
raise CLIError("Logging in through command line is not supported. " + suggestion)
raise CLIError(err)
except requests.exceptions.ConnectionError as err:
raise CLIError('Please ensure you have network connection. Error detail: ' + str(err))
all_subscriptions = list(subscriptions)
for sub in all_subscriptions:
sub['cloudName'] = sub.pop('environmentName', None)
return all_subscriptions
self._parse_optional_ints(wire_response, int_keys)
expires_in = wire_response.get(OAuth2.ResponseParameters.EXPIRES_IN)
if expires_in:
now = datetime.now()
soon = timedelta(seconds=expires_in)
wire_response[OAuth2.ResponseParameters.EXPIRES_ON] = str(now + soon)
created_on = wire_response.get(OAuth2.ResponseParameters.CREATED_ON)
if created_on:
temp_date = datetime.fromtimestamp(created_on)
wire_response[OAuth2.ResponseParameters.CREATED_ON] = str(temp_date)
if not wire_response.get(OAuth2.ResponseParameters.TOKEN_TYPE):
raise AdalError('wire_response is missing token_type', wire_response)
if not wire_response.get(OAuth2.ResponseParameters.ACCESS_TOKEN):
raise AdalError('wire_response is missing access_token', wire_response)
token_response = map_fields(wire_response, TOKEN_RESPONSE_MAP)
if wire_response.get(OAuth2.ResponseParameters.ID_TOKEN):
id_token = self._parse_id_token(wire_response[OAuth2.ResponseParameters.ID_TOKEN])
if id_token:
token_response.update(id_token)
return token_response
def parse(self):
if not self._response:
raise AdalError("Received empty RSTR response body.")
try:
self._dom = ET.fromstring(self._response)
except Exception as exp:
raise AdalError('Failed to parse RSTR in to DOM', exp)
try:
self._parents = {c:p for p in self._dom.iter() for c in p}
error_found = self._parse_error()
if error_found:
str_error_code = self.error_code or 'NONE'
str_fault_message = self.fault_message or 'NONE'
error_template = 'Server returned error in RSTR - ErrorCode: {} : FaultMessage: {}'
raise AdalError(error_template.format(str_error_code, str_fault_message))
self._parse_token()
finally:
self._log.debug('No resource specific cache entries found.')
#There are no resource specific entries. Find an MRRT token.
mrrt_tokens = (x for x in potential_entries if x[TokenResponseFields.IS_MRRT])
token = next(mrrt_tokens, None)
if token:
self._log.debug('Found an MRRT token.')
return_val = token
else:
self._log.debug('No MRRT tokens found.')
elif len(resource_tenant_specific_entries) == 1:
self._log.debug('Resource specific token found.')
return_val = resource_tenant_specific_entries[0]
is_resource_tenant_specific = True
else:
raise AdalError('More than one token matches the criteria. The result is ambiguous.')
if return_val:
self._log.debug('Returning token from cache lookup, %s',
_create_token_id_message(return_val))
return return_val, is_resource_tenant_specific
if resp.status_code == 429:
resp.raise_for_status() # Will raise requests.exceptions.HTTPError
if not util.is_http_success(resp.status_code):
return_error_string = u"{} request returned http error: {}".format(operation,
resp.status_code)
error_response = ""
if resp.text:
return_error_string = u"{} and server response: {}".format(return_error_string,
resp.text)
try:
error_response = resp.json()
except ValueError:
pass
raise AdalError(return_error_string, error_response)
else:
discovery_resp = resp.json()
if discovery_resp.get('tenant_discovery_endpoint'):
return discovery_resp['tenant_discovery_endpoint']
else:
raise AdalError('Failed to parse instance discovery response')
def get_token_with_device_code(self, user_code_info):
self._log.info("Getting a token via device code")
oauth_parameters = self._create_oauth_parameters(OAUTH2_GRANT_TYPE.DEVICE_CODE)
oauth_parameters[OAUTH2_PARAMETERS.CODE] = user_code_info[OAUTH2_DEVICE_CODE_RESPONSE_PARAMETERS.DEVICE_CODE]
interval = user_code_info[OAUTH2_DEVICE_CODE_RESPONSE_PARAMETERS.INTERVAL]
expires_in = user_code_info[OAUTH2_DEVICE_CODE_RESPONSE_PARAMETERS.EXPIRES_IN]
if interval <= 0:
raise AdalError('invalid refresh interval')
client = self._create_oauth2_client()
self._polling_client = client
token = client.get_token_with_polling(oauth_parameters, interval, expires_in)
self._add_token_into_cache(token)
return token
def _get_token_username_password_federated(self, username, password):
self._log.debug("Acquiring token with username password for federated user")
if not self._user_realm.federation_metadata_url:
self._log.warn("Unable to retrieve federationMetadataUrl from AAD. "
"Attempting fallback to AAD supplied endpoint.")
if not self._user_realm.federation_active_auth_url:
raise AdalError('AAD did not return a WSTrust endpoint. Unable to proceed.')
wstrust_version = TokenRequest._parse_wstrust_version_from_federation_active_authurl(
self._user_realm.federation_active_auth_url)
self._log.debug(
'wstrust endpoint version is: %(wstrust_version)s',
{"wstrust_version": wstrust_version})
return self._perform_username_password_for_access_token_exchange(
self._user_realm.federation_active_auth_url,
wstrust_version, username, password)
else:
mex_endpoint = self._user_realm.federation_metadata_url
self._log.debug(
"Attempting mex at: %(mex_endpoint)s",
{"mex_endpoint": mex_endpoint})
mex_instance = self._create_mex(mex_endpoint)