Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"user": None,
"token": None,
"active": True,
"expired": False,
"all": False,
}
)
with tests.capture_output() as output:
pagure.cli.admin.do_list_admin_token(list_args)
output = output.getvalue()
self.assertNotEqual(output, "No admin tokens found\n")
self.assertEqual(len(output.split("\n")), 2)
self.assertIn(" -- pingou -- ", output)
# Expire the token
args = munch.Munch({"token": token, "all": False})
pagure.cli.admin.do_expire_admin_token(args)
# After
list_args = munch.Munch(
{
"user": None,
"token": None,
"active": True,
"expired": False,
"all": False,
}
)
with tests.capture_output() as output:
pagure.cli.admin.do_list_admin_token(list_args)
output = output.getvalue()
self.assertEqual(output, "No admin tokens found\n")
# Add group packager to project test
project = pagure.lib.query._get_project(self.session, "test")
msg = pagure.lib.query.add_group_to_project(
self.session,
project=project,
new_group="packager",
user="pingou",
access="ticket",
)
self.session.commit()
self.assertEqual(msg, "Group added")
repo = pagure.lib.query._get_project(self.session, "test")
g = munch.Munch()
g.fas_user = tests.FakeUser(username="foo")
g.authenticated = True
g.session = self.session
with mock.patch("flask.g", g):
output = pagure.utils.is_repo_committer(repo)
self.assertFalse(output)
is_admin=True,
)
self.session.commit()
self.assertEqual(msg, "User `foo` added to the group `packager`.")
# Add group packager to project test
project = pagure.lib.query._get_project(self.session, "test")
msg = pagure.lib.query.add_group_to_project(
self.session, project=project, new_group="packager", user="pingou"
)
self.session.commit()
self.assertEqual(msg, "Group added")
repo = pagure.lib.query._get_project(self.session, "test")
g = munch.Munch()
g.fas_user = tests.FakeUser(username="foo")
g.authenticated = True
g.session = self.session
with mock.patch("flask.g", g):
output = pagure.utils.is_repo_committer(repo)
self.assertTrue(output)
def test_is_repo_committer_external_committer_excluding_one(self):
""" Test is_repo_committer in pagure with EXTERNAL_COMMITTER
configured to give access to all the provenpackager but for this
one repo
"""
repo = pagure.lib.query._get_project(self.session, "test")
g = munch.Munch()
g.fas_user = tests.FakeUser()
g.fas_user.groups.append("provenpackager")
g.authenticated = True
g.session = self.session
with mock.patch("flask.g", g):
output = pagure.utils.is_repo_committer(repo)
self.assertFalse(output)
def _normalize_compute_usage(self, usage):
""" Normalize a compute usage object """
usage = usage.copy()
# Discard noise
self._remove_novaclient_artifacts(usage)
project_id = usage.pop('tenant_id', None)
ret = munch.Munch(
location=self._get_current_location(project_id=project_id),
)
for key in (
'max_personality',
'max_personality_size',
'max_server_group_members',
'max_server_groups',
'max_server_meta',
'max_total_cores',
'max_total_instances',
'max_total_keypairs',
'max_total_ram_size',
'total_cores_used',
'total_hours',
'total_instances_used',
'total_local_gb_usage',
updated_at = ip.pop('updated_at', None)
# Note - description may not always be on the underlying cloud.
# Normalizing it here is easy - what do we do when people want to
# set a description?
description = ip.pop('description', '')
revision_number = ip.pop('revision_number', None)
if self._use_neutron_floating():
attached = bool(port_id)
status = ip.pop('status', 'UNKNOWN')
else:
attached = bool(instance_id)
# In neutron's terms, Nova floating IPs are always ACTIVE
status = 'ACTIVE'
ret = munch.Munch(
attached=attached,
fixed_ip_address=fixed_ip_address,
floating_ip_address=floating_ip_address,
id=id,
location=self._get_current_location(project_id=project_id),
network=network_id,
port=port_id,
router=router_id,
status=status,
created_at=created_at,
updated_at=updated_at,
description=description,
revision_number=revision_number,
properties=ip.copy(),
)
# Backwards compat
def to_instance(self, storage, representation):
for f in ('zip_file', 'fs_file'):
if f in representation and representation[f]:
file_object = storage.get_file(representation[f])
new_path = upload_custom_socketenvironment_file_to(Munch(representation),
os.path.basename(file_object.name))
default_storage.save(new_path, file_object)
representation[f] = new_path
return super().to_instance(storage, representation)
def _normalize_cluster_template(self, cluster_template):
"""Normalize Magnum cluster_templates."""
cluster_template = cluster_template.copy()
# Discard noise
cluster_template.pop('links', None)
cluster_template.pop('human_id', None)
# model_name is a magnumclient-ism
cluster_template.pop('model_name', None)
ct_id = cluster_template.pop('uuid')
ret = munch.Munch(
id=ct_id,
location=self._get_current_location(),
)
ret['is_public'] = cluster_template.pop('public')
ret['is_registry_enabled'] = cluster_template.pop('registry_enabled')
ret['is_tls_disabled'] = cluster_template.pop('tls_disabled')
# pop floating_ip_enabled since we want to hide it in a future patch
fip_enabled = cluster_template.pop('floating_ip_enabled', None)
if not self.strict_mode:
ret['uuid'] = ct_id
if fip_enabled is not None:
ret['floating_ip_enabled'] = fip_enabled
ret['public'] = ret['is_public']
ret['registry_enabled'] = ret['is_registry_enabled']
ret['tls_disabled'] = ret['is_tls_disabled']
def _fetch_thing_from_munch(where, what, tolerant=True):
try:
thing = where[what]
except KeyError:
if tolerant:
thing = munch.Munch()
else:
raise
return thing