Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_keystone_session(username, password, project_name):
"""Return a keystone `Session` obect for the given user/project.
username, password and project_name are the keystone user/pass/project
of the desired session.
"""
auth = v3.Password(auth_url=_keystone_cfg_opt('auth_url'),
username=username,
password=password,
project_name=project_name,
user_domain_id='default',
project_domain_id='default')
return session.Session(auth=auth)
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
# enables the tests in this class to share a keystone token
cls.sess = session.Session(auth=cls.auth)
def get_keystone_session(self, keystone_ip, username, password,
api_version=False, admin_port=False,
user_domain_name=None, domain_name=None,
project_domain_name=None, project_name=None):
"""Return a keystone session object"""
ep = self.get_keystone_endpoint(keystone_ip,
api_version=api_version,
admin_port=admin_port)
if api_version == 2:
auth = v2.Password(
username=username,
password=password,
tenant_name=project_name,
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
else:
auth = v3.Password(
user_domain_name=user_domain_name,
username=username,
password=password,
domain_name=domain_name,
project_domain_name=project_domain_name,
project_name=project_name,
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
return (sess, auth)
role_admin = keystone_v3.roles.find(name="admin")
logger.info("Setting role 'admin' to user {0}".format(user_name))
keystone_v3.roles.grant(role=role_admin, user=user, domain=domain)
role_assignments = keystone_v3.role_assignments.list(domain=domain)
domain_users_ids = [du.user["id"] for du in role_assignments]
assert user.id in domain_users_ids
controller_ip = env.get_primary_controller_ip()
auth_url = 'http://{0}:5000/v3'.format(controller_ip)
auth = v3.Password(auth_url=auth_url,
username=user_name,
password=user_pass,
domain_name=domain_name,
user_domain_name=domain_name)
sess = session.Session(auth=auth)
keystone_v3 = KeystoneClientV3(session=sess)
new_project_name = "project_1616778"
logger.info("Creating project {0} in domain {1}".
format(new_project_name, domain_name))
keystone_v3.projects.create(name=new_project_name, domain=domain,
description="New project")
projects = keystone_v3.projects.list(domain=domain)
projects_names = [p.name for p in projects]
assert new_project_name in projects_names, ("Project {0} is not created".
format(new_project_name))
def get_keystone_session(self, keystone_ip, username, password,
api_version=False, admin_port=False,
user_domain_name=None, domain_name=None,
project_domain_name=None, project_name=None):
"""Return a keystone session object"""
ep = self.get_keystone_endpoint(keystone_ip,
api_version=api_version,
admin_port=admin_port)
if api_version == 2:
auth = v2.Password(
username=username,
password=password,
tenant_name=project_name,
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
else:
auth = v3.Password(
user_domain_name=user_domain_name,
username=username,
password=password,
domain_name=domain_name,
project_domain_name=project_domain_name,
project_name=project_name,
auth_url=ep
)
keystone_v3 = KeystoneClientV3(session=os_conn.session)
domain = keystone_v3.domains.find(name=domain_name)
user = keystone_v3.users.find(domain=domain, name=user_name)
logger.info("Setting role 'admin' to user {0}".format(user_name))
role_admin = keystone_v3.roles.find(name="admin")
keystone_v3.roles.grant(role=role_admin, user=user, domain=domain)
role_assignments = keystone_v3.role_assignments.list(domain=domain)
domain_users_ids = [du.user["id"] for du in role_assignments]
assert user.id in domain_users_ids
logger.info("Login as {0}".format(user_name))
controller_ip = env.get_primary_controller_ip()
auth_url = 'http://{0}:5000/v3'.format(controller_ip)
auth = v3.Password(auth_url=auth_url,
username=user_name,
password=user_pass,
domain_name=domain_name,
user_domain_name=domain_name)
sess = session.Session(auth=auth)
keystone_v3 = KeystoneClientV3(session=sess)
basic_check(keystone_v3, domain_name="openldap1")
def setUpClass(cls):
super(WhenTestingClientConnectivity, cls).setUpClass()
if 'v2' in CONF.identity.auth_version:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
# enables the tests in this class to share a keystone token
cls.sess = session.Session(auth=cls.auth)
def setUp(self):
self.LOG.info('Starting: %s', self._testMethodName)
super(TestCase, self).setUp()
if 'v2' in CONF.identity.auth_version:
self.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
self.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
self.sess = session.Session(auth=self.auth)
self.barbicanclient = client.Client(
endpoint=CONF.keymanager.url,
project_id=CONF.keymanager.project_id,
session=self.sess)
def test_get_endpoint(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
object_store_endpoint = auth_plugin.get_endpoint(
self.mock_session, service_type='object-store')
self.assertEqual(object_store_endpoint, self.expected_endpoint)
auth_endpoint = auth_plugin.get_endpoint(
self.mock_session, interface=plugin.AUTH_INTERFACE)
self.assertEqual(auth_endpoint, self.options['auth_url'])
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(self.mock_session)
self.assertEqual('public endpoint for None service not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='identity', region_name='DFW')
self.assertEqual(
'public endpoint for identity service in DFW region not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='image', service_name='glance')
self.assertEqual(
'public endpoint for image service named glance not found',
def wrapper(self, *a, **k):
try:
return f(self, *a, **k)
except ks_exc.EndpointNotFound:
warn_limit(
self, 'The placement API endpoint was not found.')
# Reset client session so there is a new catalog, which
# gets cached when keystone is first successfully contacted.
self._client = self._create_client()
except ks_exc.MissingAuthPlugin:
warn_limit(
self, 'No authentication information found for placement API.')
except ks_exc.Unauthorized:
warn_limit(
self, 'Placement service credentials do not work.')
except ks_exc.DiscoveryFailure:
# TODO(_gryf): Looks like DiscoveryFailure is not the only missing
# exception here. In Pike we should take care about keystoneauth1
# failures handling globally.
warn_limit(self,
'Discovering suitable URL for placement API failed.')
except ks_exc.ConnectFailure:
LOG.warning('Placement API service is not responding.')
return wrapper