Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
account_number=account_number,
service=service.get("serviceNamespace"),
last_authenticated=service["lastAuthenticated"],
)
)
used_services.add(service["serviceNamespace"])
accessed = datetime.datetime.fromtimestamp(accessed, tzlocal())
if accessed > now - ago:
used_services.add(service["serviceNamespace"])
for permission_name, permission_decision in list(
potentially_repoable_permissions.items()
):
if permission_name.split(":")[0] in IAM_ACCESS_ADVISOR_UNSUPPORTED_SERVICES:
LOGGER.warn("skipping {}".format(permission_name))
continue
# we have an unused service but need to make sure it's repoable
if permission_name.split(":")[0] not in used_services:
if permission_name in IAM_ACCESS_ADVISOR_UNSUPPORTED_ACTIONS:
LOGGER.warn("skipping {}".format(permission_name))
continue
permission_decision.repoable = True
permission_decision.decider = "Access Advisor"
return potentially_repoable_permissions
accessed = datetime.datetime.fromtimestamp(accessed, tzlocal())
if accessed > now - ago:
used_services.add(service["serviceNamespace"])
for permission_name, permission_decision in list(
potentially_repoable_permissions.items()
):
if permission_name.split(":")[0] in IAM_ACCESS_ADVISOR_UNSUPPORTED_SERVICES:
LOGGER.warn("skipping {}".format(permission_name))
continue
# we have an unused service but need to make sure it's repoable
if permission_name.split(":")[0] not in used_services:
if permission_name in IAM_ACCESS_ADVISOR_UNSUPPORTED_ACTIONS:
LOGGER.warn("skipping {}".format(permission_name))
continue
permission_decision.repoable = True
permission_decision.decider = "Access Advisor"
return potentially_repoable_permissions
total_permissions = total_permissions.union(
get_actions_from_statement(statement)
)
if not (
"Sid" in statement
and statement["Sid"].startswith(STATEMENT_SKIP_SID)
):
# No Sid
# Sid exists, but doesn't start with STATEMENT_SKIP_SID
eligible_permissions = eligible_permissions.union(
get_actions_from_statement(statement)
)
weird_permissions = total_permissions.difference(all_permissions)
if weird_permissions and warn_unknown_perms:
LOGGER.warn("Unknown permissions found: {}".format(weird_permissions))
return total_permissions, eligible_permissions
str(policies_version['Policy'])[:50]])
print tabulate(rows, headers=headers) + '\n\n'
print "Stats:"
headers = ['Date', 'Event Type', 'Permissions Count', 'Disqualified By']
rows = []
for stats_entry in role.stats:
rows.append([stats_entry['Date'],
stats_entry['Source'],
stats_entry['PermissionsCount'],
stats_entry.get('DisqualifiedBy', [])])
print tabulate(rows, headers=headers) + '\n\n'
# can't do anymore if we don't have AA data
if not role.aa_data:
LOGGER.warn('ARN not found in Access Advisor: {}'.format(role.arn))
return
warn_unknown_permissions = config.get('warnings', {}).get('unknown_permissions', False)
repoable_permissions = set([])
permissions = roledata._get_role_permissions(role, warn_unknown_perms=warn_unknown_permissions)
if len(role.disqualified_by) == 0:
repoable_permissions = roledata._get_repoable_permissions(role.role_name, permissions, role.aa_data,
role.no_repo_permissions,
config['filter_config']['AgeFilter']['minimum_age'],
hooks)
print "Repoable services:"
headers = ['Service', 'Action', 'Repoable']
rows = []
for permission in permissions:
def load_plugin(self, module, config=None):
"""Import a module by path, instantiate it with plugin specific config and add to the list of active plugins"""
cls = None
try:
cls = import_string(module)
except ImportError as e:
LOGGER.warn("Unable to find plugin {}, exception: {}".format(module, e))
else:
plugin = None
try:
plugin = cls(config=config)
except KeyError:
plugin = cls()
LOGGER.info('Loaded plugin {}'.format(module))
self.filter_plugins.append(plugin)
)
return
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn(
"Could not find role with name {} in account {}".format(
role_name, account_number
)
)
return
role = Role(get_role_data(dynamo_table, role_id))
if not role.repo_scheduled:
LOGGER.warn(
"Repo was not scheduled for role {} in account {}".format(
role.role_name, account_number
)
)
return
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Successfully cancelled scheduled repo for role {} in account {}".format(
role.role_name, role.account
)
when it was last repoed, which services can be repoed
2) The policy history: how discovered (repo, scan, etc), the length of the policy, and start of the contents
3) Captured stats entry for the role
4) A list of all services/actions currently allowed and whether they are repoable
5) What the new policy would look like after repoing (if it is repoable)
Args:
account_number (string)
role_name (string)
Returns:
None
"""
role_id = find_role_in_cache(dynamo_table, account_number, role_name)
if not role_id:
LOGGER.warn("Could not find role with name {}".format(role_name))
return
role = Role(get_role_data(dynamo_table, role_id))
print("\n\nRole repo data:")
headers = [
"Name",
"Refreshed",
"Disqualified By",
"Can be repoed",
"Permissions",
"Repoable",
"Repoed",
"Services",
]
rows = [