Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
repokid.repokid.LOGGER = logging.getLogger('test')
repokid.repokid.LOGGER.addHandler(console_logger)
repokid.repokid.update_role_cache('123456789012')
# validate update data called for each role
assert mock_update_role_data.mock_calls == [call(Role(ROLES[0]), ROLE_POLICIES['all_services_used']),
call(Role(ROLES[1]), ROLE_POLICIES['unused_ec2']),
call(Role(ROLES[2]), ROLE_POLICIES['all_services_used'])]
# all roles active
assert mock_find_and_mark_inactive.mock_calls == [call('123456789012',
[Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])])]
roles = Roles([Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])])
assert mock_update_filtered_roles.mock_calls == [call(roles)]
assert mock_update_aardvark_data.mock_calls == [call(AARDVARK_DATA, roles)]
# TODO: validate total permission, repoable, etc are getting updated properly
assert mock_update_repoable_data.mock_calls == [call(roles)]
assert mock_update_stats.mock_calls == [call(roles, source='Scan')]
"""
Display a table with data about all roles in an account and write a csv file with the data.
Args:
account_number (string)
inactive (bool): show roles that have historically (but not currently) existed in the account if True
Returns:
None
"""
headers = ['Name', 'Refreshed', 'Disqualified By', 'Can be repoed', 'Permissions', 'Repoable', 'Repoed',
'Services']
rows = list()
roles = Roles([Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))])
if not inactive:
roles = roles.filter(active=True)
for role in roles:
rows.append([role.role_name,
role.refreshed,
role.disqualified_by,
len(role.disqualified_by) == 0,
role.total_permissions,
role.repoable_permissions,
role.repoed,
role.repoable_services])
rows = sorted(rows, key=lambda x: (x[5], x[0], x[4]))
"Getting current role data for account {} (this may take a while for large accounts)".format(
account_number
)
)
role_data = get_account_authorization_details(filter="Role", **conn)
role_data_by_id = {item["RoleId"]: item for item in role_data}
# convert policies list to dictionary to maintain consistency with old call which returned a dict
for _, data in role_data_by_id.items():
data["RolePolicyList"] = {
item["PolicyName"]: item["PolicyDocument"]
for item in data["RolePolicyList"]
}
roles = Roles([Role(rd) for rd in role_data])
active_roles = []
LOGGER.info("Updating role data for account {}".format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = role_data_by_id[role.role_id]["RolePolicyList"]
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info("Finding inactive roles in account {}".format(account_number))
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info("Filtering roles")
plugins = FilterPlugins()
# Blocklist needs to know the current account
9) get Aardvark data for each role
10) update Dynamo with Aardvark data
11) calculate repoable permissions/policies for all the roles
12) update Dynamo with information about how many total and repoable permissions and which services are repoable
13) update stats in Dynamo with basic information like total permissions and which filters are applicable
Args:
account_number (string): The current account number Repokid is being run against
Returns:
None
"""
conn = config['connection_iam']
conn['account_number'] = account_number
roles = Roles([Role(role_data) for role_data in list_roles(**conn)])
active_roles = []
LOGGER.info('Updating role data for account {}'.format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info('Finding inactive accounts')
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info('Filtering roles')
plugins = FilterPlugins()
# Blacklist needs to know the current account
def schedule_repo(account_number, dynamo_table, config, hooks):
"""
Schedule a repo for a given account. Schedule repo for a time in the future (default 7 days) for any roles in
the account with repoable permissions.
"""
scheduled_roles = []
roles = Roles([Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))])
scheduled_time = int(time.time()) + (86400 * config.get('repo_schedule_period_days', 7))
for role in roles:
if role.repoable_permissions > 0:
set_role_data(dynamo_table, role.role_id, {'RepoScheduled': scheduled_time})
scheduled_roles.append(role)
LOGGER.info("Scheduled repo for {} days from now for these roles:\n\t{}".format(
config.get('repo_schedule_period_days', 7), ', '.join([r.role_name for r in scheduled_roles])))
repokid.hooks.call_hooks(hooks, 'AFTER_SCHEDULE_REPO', {'roles': scheduled_roles})
def cancel_scheduled_repo(account_number, dynamo_table, role_name=None, is_all=None):
"""
Cancel scheduled repo for a role in an account
"""
if not is_all and not role_name:
LOGGER.error("Either a specific role to cancel or all must be provided")
return
if is_all:
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in role_ids_for_account(dynamo_table, account_number)
]
)
# filter to show only roles that are scheduled
roles = [role for role in roles if (role.repo_scheduled)]
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Canceled scheduled repo for roles: {}".format(
Repo all scheduled or eligible roles in an account. Collect any errors and display them at the end.
Args:
account_number (string)
dynamo_table
config
commit (bool): actually make the changes
scheduled (bool): if True only repo the scheduled roles, if False repo all the (eligible) roles
Returns:
None
"""
errors = []
role_ids_in_account = role_ids_for_account(dynamo_table, account_number)
roles = Roles([])
for role_id in role_ids_in_account:
roles.append(Role(get_role_data(dynamo_table, role_id, fields=['Active', 'RoleName', 'RepoScheduled'])))
roles = roles.filter(active=True)
cur_time = int(time.time())
if scheduled:
roles = [role for role in roles if (role.repo_scheduled and cur_time > role.repo_scheduled)]
LOGGER.info('Repoing these {}roles from account {}:\n\t{}'.format('scheduled ' if scheduled else '',
account_number,
', '.join([role.role_name for role in roles])))
for role in roles:
error = repo_role(account_number, role.role_name, dynamo_table, config, hooks, commit=commit)
if error:
def schedule_repo(account_number, dynamo_table, config, hooks):
"""
Schedule a repo for a given account. Schedule repo for a time in the future (default 7 days) for any roles in
the account with repoable permissions.
"""
scheduled_roles = []
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))
]
)
scheduled_time = int(time.time()) + (
86400 * config.get("repo_schedule_period_days", 7)
)
for role in roles:
if role.repoable_permissions > 0 and not role.repo_scheduled:
role.repo_scheduled = scheduled_time
# freeze the scheduled perms to whatever is repoable right now
set_role_data(
dynamo_table,
role.role_id,