Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_calculate_repo_scores(self, mock_get_repoable_permissions, mock_get_role_permissions):
roles = [Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])]
roles[0].disqualified_by = []
roles[0].aa_data = 'some_aa_data'
# disqualified by a filter
roles[1].policies = [{'Policy': ROLE_POLICIES['unused_ec2']}]
roles[1].disqualified_by = ['some_filter']
roles[1].aa_data = 'some_aa_data'
# no AA data
roles[2].policies = [{'Policy': ROLE_POLICIES['all_services_used']}]
roles[2].disqualified_by = []
roles[2].aa_data = None
mock_get_role_permissions.side_effect = [['iam:AddRoleToInstanceProfile', 'iam:AttachRolePolicy',
'ec2:AllocateHosts', 'ec2:AssociateAddress'],
['iam:AddRoleToInstanceProfile', 'iam:AttachRolePolicy'],
commit (bool)
Returns:
None
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
# only load partial data that we need to determine if we should keep going
role_data = get_role_data(dynamo_table, role_id, fields=['DisqualifiedBy', 'AAData', 'RepoablePermissions',
'RoleName'])
if not role_data:
LOGGER.warn('Could not find role with name {}'.format(role_name))
return
else:
role = Role(role_data)
if len(role.disqualified_by) > 0:
LOGGER.info('Cannot repo role {} because it is being disqualified by: {}'.format(role_name,
role.disqualified_by))
return
if not role.aa_data:
LOGGER.warn('ARN not found in Access Advisor: {}'.format(role.arn))
return
if not role.repoable_permissions:
LOGGER.info('No permissions to repo for role {}'.format(role_name))
return
# if we've gotten to this point, load the rest of the role
role = Role(get_role_data(dynamo_table, role_id))
def find_roles_with_permissions(permissions, dynamo_table, output_file):
"""
Search roles in all accounts for a policy with any of the provided permissions, log the ARN of each role.
Args:
permissions (list[string]): The name of the permissions to find
output_file (string): filename to write the output
Returns:
None
"""
arns = list()
for roleID in role_ids_for_all_accounts(dynamo_table):
role = Role(
get_role_data(
dynamo_table, roleID, fields=["Policies", "RoleName", "Arn", "Active"]
)
)
role_permissions, _ = roledata._get_role_permissions(role)
permissions = set([p.lower() for p in permissions])
found_permissions = permissions.intersection(role_permissions)
if found_permissions and role.active:
arns.append(role.arn)
LOGGER.info(
"ARN {arn} has {permissions}".format(
arn=role.arn, permissions=list(found_permissions)
)
)
Returns:
errors (list): if any
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
message = "Could not find role with name {} in account {}".format(
role_name, account_number
)
errors.append(message)
LOGGER.warning(message)
return errors
else:
role = Role(get_role_data(dynamo_table, role_id))
# no option selected, display a table of options
if not selection:
headers = ["Number", "Source", "Discovered", "Permissions", "Services"]
rows = []
for index, policies_version in enumerate(role.policies):
policy_permissions, _ = roledata._get_permissions_in_policy(
policies_version["Policy"]
)
rows.append(
[
index,
policies_version["Source"],
policies_version["Discovered"],
len(policy_permissions),
roledata._get_services_in_permissions(policy_permissions),
continuing = False
if not role.aa_data:
LOGGER.warning("ARN not found in Access Advisor: {}".format(role.arn))
continuing = False
if not role.repoable_permissions:
LOGGER.info(
"No permissions to repo for role {} in account {}".format(
role_name, account_number
)
)
continuing = False
# if we've gotten to this point, load the rest of the role
role = Role(get_role_data(dynamo_table, role_id))
old_aa_data_services = []
for aa_service in role.aa_data:
if datetime.datetime.strptime(
aa_service["lastUpdated"], "%a, %d %b %Y %H:%M:%S %Z"
) < datetime.datetime.now() - datetime.timedelta(
days=config["repo_requirements"]["oldest_aa_data_days"]
):
old_aa_data_services.append(aa_service["serviceName"])
if old_aa_data_services:
LOGGER.error(
"AAData older than threshold for these services: {} (role: {}, account {})".format(
old_aa_data_services, role_name, account_number
)
)
headers = [
"Name",
"Refreshed",
"Disqualified By",
"Can be repoed",
"Permissions",
"Repoable",
"Repoed",
"Services",
]
rows = list()
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))
]
)
if not inactive:
roles = roles.filter(active=True)
for role in roles:
rows.append(
[
role.role_name,
role.refreshed,
role.disqualified_by,
len(role.disqualified_by) == 0,
role.total_permissions,
role.repoable_permissions,