Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
role_data = get_account_authorization_details(filter="Role", **conn)
role_data_by_id = {item["RoleId"]: item for item in role_data}
# convert policies list to dictionary to maintain consistency with old call which returned a dict
for _, data in role_data_by_id.items():
data["RolePolicyList"] = {
item["PolicyName"]: item["PolicyDocument"]
for item in data["RolePolicyList"]
}
roles = Roles([Role(rd) for rd in role_data])
active_roles = []
LOGGER.info("Updating role data for account {}".format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = role_data_by_id[role.role_id]["RolePolicyList"]
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info("Finding inactive roles in account {}".format(account_number))
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info("Filtering roles")
plugins = FilterPlugins()
# Blocklist needs to know the current account
filter_config = config["filter_config"]
blocklist_filter_config = filter_config.get(
"BlocklistFilter", filter_config.get("BlacklistFilter")
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in role_ids_for_account(dynamo_table, account_number)
]
)
# filter to show only roles that are scheduled
roles = [role for role in roles if (role.repo_scheduled)]
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Canceled scheduled repo for roles: {}".format(
", ".join([role.role_name for role in roles])
)
)
return
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn(
"Could not find role with name {} in account {}".format(
role_name, account_number
)
)
return
role = Role(get_role_data(dynamo_table, role_id))
def _delete_policy(name, role, account_number, conn):
"""Deletes the specified IAM Role inline policy.
Args:
name (string)
role (Role object)
account_number (string)
conn (dict)
Returns:
error (string) or None
"""
LOGGER.info(
"Deleting policy with name {} from {} in account {}".format(
name, role.role_name, account_number
)
)
try:
delete_role_policy(RoleName=role.role_name, PolicyName=name, **conn)
except botocore.exceptions.ClientError as e:
return "Error deleting policy: {} from role: {} in account {}. Exception: {}".format(
name, role.role_name, account_number, e
)
if args.get("rollback_role"):
role_name = args.get("")
commit = args.get("--commit")
selection = args.get("--selection")
return rollback_role(
account_number,
role_name,
dynamo_table,
config,
hooks,
selection=selection,
commit=commit,
)
if args.get("repo_all_roles"):
LOGGER.info("Updating role data")
update_role_cache(account_number, dynamo_table, config, hooks)
LOGGER.info("Repoing all roles")
commit = args.get("--commit")
return repo_all_roles(
account_number, dynamo_table, config, hooks, commit=commit, scheduled=False
)
if args.get("schedule_repo"):
LOGGER.info("Updating role data")
update_role_cache(account_number, dynamo_table, config, hooks)
return schedule_repo(account_number, dynamo_table, config, hooks)
if args.get("show_scheduled_roles"):
LOGGER.info("Showing scheduled roles")
return show_scheduled_roles(account_number, dynamo_table)
LOGGER.info(
"Updating roles with Aardvark data in account {}".format(account_number)
)
for role in roles:
try:
role.aa_data = aardvark_data[role.arn]
except KeyError:
LOGGER.warning(
"Aardvark data not found for role: {} ({})".format(
role.role_id, role.role_name
)
)
else:
set_role_data(dynamo_table, role.role_id, {"AAData": role.aa_data})
LOGGER.info(
"Calculating repoable permissions and services for account {}".format(
account_number
)
)
batch_processing = config.get("query_role_data_in_batch", False)
batch_size = config.get("batch_processing_size", 100)
roledata._calculate_repo_scores(
roles,
config["filter_config"]["AgeFilter"]["minimum_age"],
hooks,
batch_processing,
batch_size,
)
for role in roles:
LOGGER.debug(
if args.get("show_scheduled_roles"):
LOGGER.info("Showing scheduled roles")
return show_scheduled_roles(account_number, dynamo_table)
if args.get("cancel_scheduled_repo"):
role_name = args.get("--role")
is_all = args.get("--all")
if not is_all:
LOGGER.info(
"Cancelling scheduled repo for role: {} in account {}".format(
role_name, account_number
)
)
else:
LOGGER.info(
"Cancelling scheduled repo for all roles in account {}".format(
account_number
)
)
return cancel_scheduled_repo(
account_number, dynamo_table, role_name=role_name, is_all=is_all
)
if args.get("repo_scheduled_roles"):
update_role_cache(account_number, dynamo_table, config, hooks)
LOGGER.info("Repoing scheduled roles")
commit = args.get("--commit")
return repo_all_roles(
account_number, dynamo_table, config, hooks, commit=commit, scheduled=True
)
LOGGER.info(
"Role {} filtered by {}".format(filtered_role.role_name, class_name)
)
filtered_role.disqualified_by.append(class_name)
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"DisqualifiedBy": role.disqualified_by}
)
LOGGER.info("Getting data from Aardvark for account {}".format(account_number))
aardvark_data = _get_aardvark_data(
config["aardvark_api_location"], account_number=account_number
)
LOGGER.info(
"Updating roles with Aardvark data in account {}".format(account_number)
)
for role in roles:
try:
role.aa_data = aardvark_data[role.arn]
except KeyError:
LOGGER.warning(
"Aardvark data not found for role: {} ({})".format(
role.role_id, role.role_name
)
)
else:
set_role_data(dynamo_table, role.role_id, {"AAData": role.aa_data})
LOGGER.info(
"Calculating repoable permissions and services for account {}".format(
role.aa_data = aardvark_data[role.arn]
except KeyError:
LOGGER.info('Aardvark data not found for role: {} ({})'.format(role.role_id, role.role_name))
else:
set_role_data(dynamo_table, role.role_id, {'AAData': role.aa_data})
LOGGER.info('Calculating repoable permissions and services')
roledata._calculate_repo_scores(roles, config['filter_config']['AgeFilter']['minimum_age'], hooks)
for role in roles:
if role.role_name == 'Monterey':
import pdb; pdb.set_trace()
set_role_data(dynamo_table, role.role_id, {'TotalPermissions': role.total_permissions,
'RepoablePermissions': role.repoable_permissions,
'RepoableServices': role.repoable_services})
LOGGER.info('Updating stats')
roledata.update_stats(dynamo_table, roles, source='Scan')
commit = args.get("--commit")
selection = args.get("--selection")
return rollback_role(
account_number,
role_name,
dynamo_table,
config,
hooks,
selection=selection,
commit=commit,
)
if args.get("repo_all_roles"):
LOGGER.info("Updating role data")
update_role_cache(account_number, dynamo_table, config, hooks)
LOGGER.info("Repoing all roles")
commit = args.get("--commit")
return repo_all_roles(
account_number, dynamo_table, config, hooks, commit=commit, scheduled=False
)
if args.get("schedule_repo"):
LOGGER.info("Updating role data")
update_role_cache(account_number, dynamo_table, config, hooks)
return schedule_repo(account_number, dynamo_table, config, hooks)
if args.get("show_scheduled_roles"):
LOGGER.info("Showing scheduled roles")
return show_scheduled_roles(account_number, dynamo_table)
if args.get("cancel_scheduled_repo"):
role_name = args.get("--role")