Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUpClass(cls):
super(WhenTestingClientConnectivity, cls).setUpClass()
if 'v2' in CONF.identity.auth_version:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
# enables the tests in this class to share a keystone token
cls.sess = session.Session(auth=cls.auth)
def setUp(self):
self.LOG.info('Starting: %s', self._testMethodName)
super(TestCase, self).setUp()
if 'v2' in CONF.identity.auth_version:
self.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
self.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
self.sess = session.Session(auth=self.auth)
self.barbicanclient = client.Client(
endpoint=CONF.keymanager.url,
project_id=CONF.keymanager.project_id,
session=self.sess)
def _get_keystone_auth(self, context):
if context.__class__.__name__ == 'KeystonePassword':
return identity.Password(
auth_url=context.auth_url,
username=context.username,
password=context.password,
user_id=context.user_id,
user_domain_id=context.user_domain_id,
user_domain_name=context.user_domain_name,
trust_id=context.trust_id,
domain_id=context.domain_id,
domain_name=context.domain_name,
project_id=context.project_id,
project_name=context.project_name,
project_domain_id=context.project_domain_id,
project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate)
elif context.__class__.__name__ == 'KeystoneToken':
return identity.Token(
def admin_client(self):
if not self._admin_client:
# Create admin client connection to v3 API
admin_creds = self._service_admin_creds()
auth = identity.Password(**admin_creds)
session = ks_session.Session(auth=auth)
c = kc_v3.Client(session=session)
self._admin_client = c
return self._admin_client
def _get_keystone_auth(self, context):
if context.__class__.__name__ == 'KeystonePassword':
return identity.Password(
auth_url=self._auth_url,
username=context.username,
password=context.password,
user_id=context.user_id,
user_domain_id=context.user_domain_id,
user_domain_name=context.user_domain_name,
trust_id=context.trust_id,
domain_id=context.domain_id,
domain_name=context.domain_name,
project_id=context.project_id,
project_name=context.project_name,
project_domain_id=context.project_domain_id,
project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate)
elif context.__class__.__name__ == 'KeystoneToken':
return identity.Token(
def get_keystone_client():
auth = Password(**keystone_args_from_env())
session = Session(auth=auth,
app_name='keystone-init',
user_agent='keystone-init',
timeout=KEYSTONE_TIMEOUT,
verify=KEYSTONE_VERIFY,
cert=KEYSTONE_CERT)
discover = Discover(session=session)
return discover.create_client()
temp_session = session.Session(
verify=(self.credential.https_cacert or
not self.credential.https_insecure),
timeout=CONF.openstack_client_http_timeout)
version = str(discover.Discover(
temp_session,
password_args["auth_url"]).version_data()[0]["version"][0])
if "v2.0" not in password_args["auth_url"] and (
version != "2"):
password_args.update({
"user_domain_name": self.credential.user_domain_name,
"domain_name": self.credential.domain_name,
"project_domain_name": self.credential.project_domain_name
})
identity_plugin = identity.Password(**password_args)
sess = session.Session(
auth=identity_plugin,
verify=(self.credential.https_cacert or
not self.credential.https_insecure),
timeout=CONF.openstack_client_http_timeout)
self.cache[key] = (sess, identity_plugin)
return self.cache[key]
def _get_auth_handler(kwargs):
if 'token' in kwargs:
auth = identity.Token(
auth_url=kwargs.get('auth_url', None),
token=kwargs.get('token', None),
project_id=kwargs.get('project_id', None),
project_name=kwargs.get('project_name', None),
project_domain_id=kwargs.get('project_domain_id', None),
project_domain_name=kwargs.get('project_domain_name', None)
)
elif {'username', 'password'} <= set(kwargs):
auth = identity.Password(
auth_url=kwargs.get('auth_url', None),
username=kwargs.get('username', None),
password=kwargs.get('password', None),
project_id=kwargs.get('project_id', None),
project_name=kwargs.get('project_name', None),
project_domain_id=kwargs.get('project_domain_id', None),
project_domain_name=kwargs.get('project_domain_name', None),
user_domain_id=kwargs.get('user_domain_id', None),
user_domain_name=kwargs.get('user_domain_name', None)
)
else:
raise Exception('monascaclient can be configured with either '
'"token" or "username:password" but neither of '
'them was found in passed arguments.')
return auth