Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
None
"""
roles = list()
with open(role_filename, "r") as fd:
roles = json.load(fd)
for role_arn in tqdm(roles):
arn = ARN(role_arn)
if arn.error:
LOGGER.error("INVALID ARN: {arn}".format(arn=role_arn))
return
account_number = arn.account_number
role_name = arn.name.split("/")[-1]
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
role = Role(get_role_data(dynamo_table, role_id))
_remove_permissions_from_role(
account_number,
permissions,
role,
role_id,
dynamo_table,
config,
hooks,
commit=commit,
)
repokid.hooks.call_hooks(hooks, "AFTER_REPO", {"role": role})
"""
Calculate what repoing can be done for a role and then actually do it if commit is set
1) Check that a role exists, it isn't being disqualified by a filter, and that is has fresh AA data
2) Get the role's current permissions, repoable permissions, and the new policy if it will change
3) Make the changes if commit is set
Args:
account_number (string)
role_name (string)
commit (bool)
Returns:
None
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
# only load partial data that we need to determine if we should keep going
role_data = get_role_data(dynamo_table, role_id, fields=['DisqualifiedBy', 'AAData', 'RepoablePermissions',
'RoleName'])
if not role_data:
LOGGER.warn('Could not find role with name {}'.format(role_name))
return
else:
role = Role(role_data)
if len(role.disqualified_by) > 0:
LOGGER.info('Cannot repo role {} because it is being disqualified by: {}'.format(role_name,
role.disqualified_by))
return
if not role.aa_data:
LOGGER.warn('ARN not found in Access Advisor: {}'.format(role.arn))
def list_role_rollbacks(dynamo_table, message):
role_id = dynamo.find_role_in_cache(
dynamo_table, message.account, message.role_name
)
if not role_id:
return ResponderReturn(
successful=False,
return_message="Unable to find role {} in account {}".format(
message.role_name, message.account
),
)
else:
role_data = dynamo.get_role_data(dynamo_table, role_id, fields=["Policies"])
return_val = "Restorable versions for role {} in account {}\n".format(
message.role_name, message.account
)
for index, policy_version in enumerate(role_data["Policies"]):
# filter to show only roles that are scheduled
roles = [role for role in roles if (role.repo_scheduled)]
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Canceled scheduled repo for roles: {}".format(
", ".join([role.role_name for role in roles])
)
)
return
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn(
"Could not find role with name {} in account {}".format(
role_name, account_number
)
)
return
role = Role(get_role_data(dynamo_table, role_id))
if not role.repo_scheduled:
LOGGER.warn(
"Repo was not scheduled for role {} in account {}".format(
role.role_name, account_number
)
)
"""
Calculate what repoing can be done for a role and then actually do it if commit is set
1) Check that a role exists, it isn't being disqualified by a filter, and that is has fresh AA data
2) Get the role's current permissions, repoable permissions, and the new policy if it will change
3) Make the changes if commit is set
Args:
account_number (string)
role_name (string)
commit (bool)
Returns:
None
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
# only load partial data that we need to determine if we should keep going
role_data = get_role_data(
dynamo_table,
role_id,
fields=["DisqualifiedBy", "AAData", "RepoablePermissions", "RoleName"],
)
if not role_data:
LOGGER.warn("Could not find role with name {}".format(role_name))
return
else:
role = Role(role_data)
continuing = True
if len(role.disqualified_by) > 0:
LOGGER.info(
"""
Display the historical policy versions for a roll as a numbered list. Restore to a specific version if selected.
Indicate changes that will be made and then actually make them if commit is selected.
Args:
account_number (string)
role_name (string)
selection (int): which policy version in the list to rollback to
commit (bool): actually make the change
Returns:
errors (list): if any
"""
errors = []
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
message = "Could not find role with name {} in account {}".format(
role_name, account_number
)
errors.append(message)
LOGGER.warning(message)
return errors
else:
role = Role(get_role_data(dynamo_table, role_id))
# no option selected, display a table of options
if not selection:
headers = ["Number", "Source", "Discovered", "Permissions", "Services"]
rows = []
for index, policies_version in enumerate(role.policies):
policy_permissions, _ = roledata._get_permissions_in_policy(
Displays data about a role in a given account:
1) Name, which filters are disqualifying it from repo, if it's repoable, total/repoable permissions,
when it was last repoed, which services can be repoed
2) The policy history: how discovered (repo, scan, etc), the length of the policy, and start of the contents
3) Captured stats entry for the role
4) A list of all services/actions currently allowed and whether they are repoable
5) What the new policy would look like after repoing (if it is repoable)
Args:
account_number (string)
role_name (string)
Returns:
None
"""
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn("Could not find role with name {}".format(role_name))
return
role = Role(get_role_data(dynamo_table, role_id))
print("\n\nRole repo data:")
headers = [
"Name",
"Refreshed",
"Disqualified By",
"Can be repoed",
"Permissions",
"Repoable",
"Repoed",
"Services",