Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns:
None
"""
conn = config['connection_iam']
conn['account_number'] = account_number
roles = Roles([Role(role_data) for role_data in list_roles(**conn)])
active_roles = []
LOGGER.info('Updating role data for account {}'.format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info('Finding inactive accounts')
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info('Filtering roles')
plugins = FilterPlugins()
# Blacklist needs to know the current account
config['filter_config']['BlacklistFilter']['current_account'] = account_number
for plugin_path in config.get('active_filters'):
plugin_name = plugin_path.split(':')[1]
plugins.load_plugin(plugin_path, config=config['filter_config'].get(plugin_name, None))
for plugin in plugins.filter_plugins:
filtered_list = plugin.apply(roles)
# convert policies list to dictionary to maintain consistency with old call which returned a dict
for _, data in role_data_by_id.items():
data["RolePolicyList"] = {
item["PolicyName"]: item["PolicyDocument"]
for item in data["RolePolicyList"]
}
roles = Roles([Role(rd) for rd in role_data])
active_roles = []
LOGGER.info("Updating role data for account {}".format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = role_data_by_id[role.role_id]["RolePolicyList"]
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info("Finding inactive roles in account {}".format(account_number))
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info("Filtering roles")
plugins = FilterPlugins()
# Blocklist needs to know the current account
filter_config = config["filter_config"]
blocklist_filter_config = filter_config.get(
"BlocklistFilter", filter_config.get("BlacklistFilter")
)
blocklist_filter_config["current_account"] = account_number
for plugin_path in config.get("active_filters"):
plugin_name = plugin_path.split(":")[1]
- Filters
- Active/inactive roles
Args:
role (Role)
dynamo_table
account_number
conn (dict)
source: repo, rollback, etc
add_no_repo: if set to True newly discovered permissions will be added to no repo list
Returns:
None
"""
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
roledata.update_role_data(
dynamo_table,
account_number,
role,
current_policies,
source=source,
add_no_repo=add_no_repo,
)
aardvark_data = _get_aardvark_data(config["aardvark_api_location"], arn=role.arn)
if not aardvark_data:
return
batch_processing = config.get("query_role_data_in_batch", False)
batch_size = config.get("batch_processing_size", 100)
role.aa_data = aardvark_data[role.arn]
- Filters
- Active/inactive roles
Args:
role (Role)
dynamo_table
account_number
conn (dict)
source: repo, rollback, etc
add_no_repo: if set to True newly discovered permissions will be added to no repo list
Returns:
None
"""
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
roledata.update_role_data(dynamo_table, account_number, role, current_policies, source=source,
add_no_repo=add_no_repo)
aardvark_data = _get_aardvark_data(config['aardvark_api_location'], arn=role.arn)
if not aardvark_data:
return
role.aa_data = aardvark_data[role.arn]
roledata._calculate_repo_scores([role], config['filter_config']['AgeFilter']['minimum_age'], hooks)
set_role_data(dynamo_table, role.role_id, {'AAData': role.aa_data,
'TotalPermissions': role.total_permissions,
'RepoablePermissions': role.repoable_permissions,
'RepoableServices': role.repoable_services})
roledata.update_stats(dynamo_table, [role], source=source)