Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
selection (int): which policy version in the list to rollback to
commit (bool): actually make the change
Returns:
errors (list): if any
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
message = 'Could not find role with name {}'.format(role_name)
errors.append(message)
LOGGER.warn(message)
return errors
else:
role = Role(get_role_data(dynamo_table, role_id))
# no option selected, display a table of options
if not selection:
headers = ['Number', 'Source', 'Discovered', 'Policy Length', 'Policy Contents']
rows = []
for index, policies_version in enumerate(role.policies):
rows.append([index, policies_version['Source'], policies_version['Discovered'],
len(str(policies_version['Policy'])),
str(policies_version['Policy'])[:50]])
print tabulate(rows, headers=headers)
return
from cloudaux import CloudAux
conn = config['connection_iam']
conn['account_number'] = account_number
ca = CloudAux(**conn)
1) Check that a role exists, it isn't being disqualified by a filter, and that is has fresh AA data
2) Get the role's current permissions, repoable permissions, and the new policy if it will change
3) Make the changes if commit is set
Args:
account_number (string)
role_name (string)
commit (bool)
Returns:
None
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
# only load partial data that we need to determine if we should keep going
role_data = get_role_data(
dynamo_table,
role_id,
fields=["DisqualifiedBy", "AAData", "RepoablePermissions", "RoleName"],
)
if not role_data:
LOGGER.warn("Could not find role with name {}".format(role_name))
return
else:
role = Role(role_data)
continuing = True
if len(role.disqualified_by) > 0:
LOGGER.info(
"Cannot repo role {} in account {} because it is being disqualified by: {}".format(
role_name, account_number, role.disqualified_by
def find_roles_with_permission(permission, dynamo_table):
"""
Search roles in all accounts for a policy with a given permission, log the ARN of each role with this permission
Args:
permission (string): The name of the permission to find
Returns:
None
"""
for roleID in role_ids_for_all_accounts(dynamo_table):
role = Role(get_role_data(dynamo_table, roleID, fields=['Policies', 'RoleName', 'Arn']))
permissions = roledata._get_role_permissions(role)
if permission.lower() in permissions:
LOGGER.info('ARN {arn} has {permission}'.format(arn=role.arn, permission=permission))
Args:
role (Role)
current_policy (dict)
update_source (string): ['Repo', 'Scan', 'Restore']
Returns:
None
"""
policy_entry = {
"Source": update_source,
"Discovered": datetime.datetime.utcnow().isoformat(),
"Policy": current_policy,
}
add_to_end_of_list(dynamo_table, role.role_id, "Policies", policy_entry)
role.policies = get_role_data(dynamo_table, role.role_id, fields=["Policies"])[
"Policies"
]
"Canceled scheduled repo for roles: {}".format(
", ".join([role.role_name for role in roles])
)
)
return
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn(
"Could not find role with name {} in account {}".format(
role_name, account_number
)
)
return
role = Role(get_role_data(dynamo_table, role_id))
if not role.repo_scheduled:
LOGGER.warn(
"Repo was not scheduled for role {} in account {}".format(
role.role_name, account_number
)
)
return
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Successfully cancelled scheduled repo for role {} in account {}".format(
role.role_name, role.account
)
def cancel_scheduled_repo(account_number, dynamo_table, role_name=None, is_all=None):
"""
Cancel scheduled repo for a role in an account
"""
if not is_all and not role_name:
LOGGER.error("Either a specific role to cancel or all must be provided")
return
if is_all:
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in role_ids_for_account(dynamo_table, account_number)
]
)
# filter to show only roles that are scheduled
roles = [role for role in roles if (role.repo_scheduled)]
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Canceled scheduled repo for roles: {}".format(
", ".join([role.role_name for role in roles])
)
1) Check that a role exists, it isn't being disqualified by a filter, and that is has fresh AA data
2) Get the role's current permissions, repoable permissions, and the new policy if it will change
3) Make the changes if commit is set
Args:
account_number (string)
role_name (string)
commit (bool)
Returns:
None
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
# only load partial data that we need to determine if we should keep going
role_data = get_role_data(dynamo_table, role_id, fields=['DisqualifiedBy', 'AAData', 'RepoablePermissions',
'RoleName'])
if not role_data:
LOGGER.warn('Could not find role with name {}'.format(role_name))
return
else:
role = Role(role_data)
if len(role.disqualified_by) > 0:
LOGGER.info('Cannot repo role {} because it is being disqualified by: {}'.format(role_name,
role.disqualified_by))
return
if not role.aa_data:
LOGGER.warn('ARN not found in Access Advisor: {}'.format(role.arn))
return
4) A list of all services/actions currently allowed and whether they are repoable
5) What the new policy would look like after repoing (if it is repoable)
Args:
account_number (string)
role_name (string)
Returns:
None
"""
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn('Could not find role with name {}'.format(role_name))
return
role = Role(get_role_data(dynamo_table, role_id))
print "\n\nRole repo data:"
headers = ['Name', 'Refreshed', 'Disqualified By', 'Can be repoed', 'Permissions', 'Repoable', 'Repoed', 'Services']
rows = [[role.role_name,
role.refreshed,
role.disqualified_by,
len(role.disqualified_by) == 0,
role.total_permissions,
role.repoable_permissions,
role.repoed,
role.repoable_services]]
print tabulate(rows, headers=headers) + '\n\n'
print "Policy history:"
headers = ['Number', 'Source', 'Discovered', 'Policy Length', 'Policy Contents']
rows = []
def schedule_repo(account_number, dynamo_table, config, hooks):
"""
Schedule a repo for a given account. Schedule repo for a time in the future (default 7 days) for any roles in
the account with repoable permissions.
"""
scheduled_roles = []
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))
]
)
scheduled_time = int(time.time()) + (
86400 * config.get("repo_schedule_period_days", 7)
)
for role in roles:
if role.repoable_permissions > 0 and not role.repo_scheduled:
role.repo_scheduled = scheduled_time
# freeze the scheduled perms to whatever is repoable right now
set_role_data(
dynamo_table,
role.role_id,
{
"RepoScheduled": scheduled_time,