Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
failed_sources = []
success_sources = []
with open('{0}/parameters.json'.format(os.getcwd()), 'r') as f:
parameters = f.read()
if re.search(r'"<.*>"', parameters):
print('Please replace placeholders in parameters.json')
exit()
parameters = json.loads(parameters)
key_vault = parameters['key vault']
tenant_id = str(json.loads(subprocess.check_output('az keyvault secret show --name tenant-id --vault-name {0}'.format(key_vault), shell=True))["value"])
app_id = str(json.loads(subprocess.check_output('az keyvault secret show --name app-id --vault-name {0}'.format(key_vault), shell=True))["value"])
app_secret = str(json.loads(subprocess.check_output('az keyvault secret show --name app-secret --vault-name {0}'.format(key_vault), shell=True))["value"])
authority_url = parameters['authority host url'] + '/' + tenant_id
context = adal.AuthenticationContext(authority_url)
token = context.acquire_token_with_client_credentials(
parameters['resource'],
app_id,
app_secret)
head = {'Authorization': 'Bearer ' + token['accessToken']}
subscription = str(json.loads(subprocess.check_output('az keyvault secret show --name subscription-id --vault-name {0}'.format(key_vault), shell=True))["value"])
resource_group = parameters['resource group']
workspace = parameters['workspace']
url = ENDPOINT.format(subscription, resource_group, workspace)
sources = ['Heartbeat', 'Syslog', 'Perf', 'ApacheAccess_CL', 'MySQL_CL', 'Custom_Log_CL']
distro = hostname.split('-')[0]
results = {}
results[distro] = {}
@unittest.skip('https://github.com/AzureAD/azure-activedirectory-library-for-python-priv/issues/21')
@httpretty.activate
def test_managed_happy_path(self):
util.setup_expected_user_realm_response_common(False)
response = util.create_response()
authorityUrl = response['authority'] + '/' + cp['tenant']
upRequest = self.setup_expected_username_password_request_response(200, response['wireResponse'], authorityUrl)
context = adal.AuthenticationContext(authorityUrl)
token_response = context.acquire_token_with_username_password(response['resource'], user_pass_params['username'], user_pass_params['password'], client_cred_params['clientId'])
self.assertTrue(util.isMatchTokenResponse(response['cachedResponse'], token_response), 'Response did not match expected: ' + JSON.stringify(token_response))
@httpretty.activate
def test_happy_path(self):
response = util.create_response()
self.setup_expected_auth_code_token_request_response(200, response['wireResponse'])
context = adal.AuthenticationContext(cp['authUrl'])
token_response = context.acquire_token_with_authorization_code(self.authorization_code, self.redirect_uri, response['resource'], cp['clientId'], cp['clientSecret'])
self.assertTrue(util.is_match_token_response(response['decodedResponse'], token_response), 'The response did not match what was expected')
req = httpretty.last_request()
util.match_standard_request_headers(req)
def create_empty_adal_object():
context = log.create_log_context()
component = 'TEST'
logger = log.Logger(component, context)
call_context = {'log_context' : context }
adal_object = { 'log' : logger, 'call_context' : call_context }
return adal_object
def test_logging(self):
log_capture_string = StringIO()
handler = logging.StreamHandler(log_capture_string)
util.turn_on_logging(handler=handler)
test_logger = adal_logging.Logger("TokenRequest", {'correlation_id':'12345'})
test_logger.warn('a warning', log_stack_trace=True)
log_contents = log_capture_string.getvalue()
logging.getLogger(adal_logging.ADAL_LOGGER_NAME).removeHandler(handler)
self.assertTrue('12345 - TokenRequest:a warning' in log_contents and 'Stack:' in log_contents)
def create_authentication_context_stub(self, authority):
context = AuthenticationContext(authority, False)
context.authority._tokenEndpoint = authority + cp['tokenPath']
return context
def test_dsts_authority(self):
try:
context = AuthenticationContext(self.dstsTestEndpoint)
except:
self.fail("AuthenticationContext() rased an exception on dstsTestEndpoint")
def test_bad_url_not_https(self):
with six.assertRaisesRegex(self, ValueError, "The authority url must be an https endpoint\."):
context = AuthenticationContext('http://this.is.not.https.com/mytenant.com')
def test_settings_none(self):
current_options = adal_logging.get_logging_options()
adal_logging.set_logging_options()
options = adal_logging.get_logging_options()
adal_logging.set_logging_options(current_options)
noOptions = len(options) == 1 and options['level'] == 'ERROR'
self.assertTrue(noOptions, 'Did not expect to find any logging options set: ' + json.dumps(options))
def turn_on_logging(level='DEBUG', handler = None):
log.set_logging_options({
'level' : level,
'handler' : handler
})