Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_calculate_repo_scores(self, mock_get_repoable_permissions, mock_get_role_permissions):
roles = [Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])]
roles[0].disqualified_by = []
roles[0].aa_data = 'some_aa_data'
# disqualified by a filter
roles[1].policies = [{'Policy': ROLE_POLICIES['unused_ec2']}]
roles[1].disqualified_by = ['some_filter']
roles[1].aa_data = 'some_aa_data'
# no AA data
roles[2].policies = [{'Policy': ROLE_POLICIES['all_services_used']}]
roles[2].disqualified_by = []
roles[2].aa_data = None
mock_get_role_permissions.side_effect = [['iam:AddRoleToInstanceProfile', 'iam:AttachRolePolicy',
'ec2:AllocateHosts', 'ec2:AssociateAddress'],
['iam:AddRoleToInstanceProfile', 'iam:AttachRolePolicy'],
repokid.repokid.LOGGER = logging.getLogger('test')
repokid.repokid.LOGGER.addHandler(console_logger)
repokid.repokid.update_role_cache('123456789012')
# validate update data called for each role
assert mock_update_role_data.mock_calls == [call(Role(ROLES[0]), ROLE_POLICIES['all_services_used']),
call(Role(ROLES[1]), ROLE_POLICIES['unused_ec2']),
call(Role(ROLES[2]), ROLE_POLICIES['all_services_used'])]
# all roles active
assert mock_find_and_mark_inactive.mock_calls == [call('123456789012',
[Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])])]
roles = Roles([Role(ROLES[0]), Role(ROLES[1]), Role(ROLES[2])])
assert mock_update_filtered_roles.mock_calls == [call(roles)]
assert mock_update_aardvark_data.mock_calls == [call(AARDVARK_DATA, roles)]
# TODO: validate total permission, repoable, etc are getting updated properly
assert mock_update_repoable_data.mock_calls == [call(roles)]
assert mock_update_stats.mock_calls == [call(roles, source='Scan')]
def get_blocklist_from_bucket(bucket_config):
try:
s3_resource = boto3_cached_conn('s3', service_type='resource',
account_number=bucket_config.get('account_number'),
assume_role=bucket_config.get('assume_role', None),
session_name='repokid',
region=bucket_config.get('region', 'us-west-2'))
s3_obj = s3_resource.Object(bucket_name=bucket_config['bucket_name'], key=bucket_config['key'])
blocklist = s3_obj.get()['Body'].read().decode("utf-8")
blocklist_json = json.loads(blocklist)
# Blocklist problems are really bad and we should quit rather than silently continue
except (botocore.exceptions.ClientError, AttributeError):
LOGGER.error("S3 blocklist config was set but unable to connect retrieve object, quitting")
sys.exit(1)
except ValueError:
LOGGER.error("S3 blocklist config was set but the returned file is bad, quitting")
sys.exit(1)
if set(blocklist_json.keys()) != set(['arns', 'names']):
LOGGER.error("S3 blocklist file is malformed, quitting")
sys.exit(1)
return blocklist_json
PAGE_SIZE = 1000
page_num = 1
if account_number:
payload = {'phrase': '{}'.format(account_number)}
elif arn:
payload = {'arn': [arn]}
else:
return
while True:
params = {'count': PAGE_SIZE, 'page': page_num}
try:
r_aardvark = requests.post(aardvark_api_location, params=params, json=payload)
except requests.exceptions.RequestException as e:
LOGGER.error('Unable to get Aardvark data: {}'.format(e))
sys.exit(1)
else:
if(r_aardvark.status_code != 200):
LOGGER.error('Unable to get Aardvark data')
sys.exit(1)
response_data.update(r_aardvark.json())
# don't want these in our Aardvark data
response_data.pop('count')
response_data.pop('page')
response_data.pop('total')
if PAGE_SIZE * page_num < r_aardvark.json().get('total'):
page_num += 1
else:
break
return response_data
if account_number:
payload = {'phrase': '{}'.format(account_number)}
elif arn:
payload = {'arn': [arn]}
else:
return
while True:
params = {'count': PAGE_SIZE, 'page': page_num}
try:
r_aardvark = requests.post(aardvark_api_location, params=params, json=payload)
except requests.exceptions.RequestException as e:
LOGGER.error('Unable to get Aardvark data: {}'.format(e))
sys.exit(1)
else:
if(r_aardvark.status_code != 200):
LOGGER.error('Unable to get Aardvark data')
sys.exit(1)
response_data.update(r_aardvark.json())
# don't want these in our Aardvark data
response_data.pop('count')
response_data.pop('page')
response_data.pop('total')
if PAGE_SIZE * page_num < r_aardvark.json().get('total'):
page_num += 1
else:
break
return response_data
def main():
args = docopt(__doc__, version="Repokid {version}".format(version=__version__))
if args.get("config"):
config_filename = args.get("")
_generate_default_config(filename=config_filename)
sys.exit(0)
account_number = args.get("")
if not CONFIG:
config = _generate_default_config()
else:
config = CONFIG
LOGGER.debug("Repokid cli called with args {}".format(args))
hooks = _get_hooks(config.get("hooks", ["repokid.hooks.loggers"]))
dynamo_table = dynamo_get_or_create_table(**config["dynamo_db"])
if args.get("update_role_cache"):
return update_role_cache(account_number, dynamo_table, config, hooks)
if args.get("display_role_cache"):
inactive = args.get("--inactive")
return display_roles(account_number, dynamo_table, inactive=inactive)
if args.get("find_roles_with_permissions"):
permissions = args.get("")
output_file = args.get("--output")
return find_roles_with_permissions(permissions, dynamo_table, output_file)
page_num = 1
if account_number:
payload = {"phrase": "{}".format(account_number)}
elif arn:
payload = {"arn": [arn]}
else:
return
while True:
params = {"count": PAGE_SIZE, "page": page_num}
try:
r_aardvark = requests.post(
aardvark_api_location, params=params, json=payload
)
except requests.exceptions.RequestException as e:
LOGGER.error("Unable to get Aardvark data: {}".format(e))
sys.exit(1)
else:
if r_aardvark.status_code != 200:
LOGGER.error("Unable to get Aardvark data")
sys.exit(1)
response_data.update(r_aardvark.json())
# don't want these in our Aardvark data
response_data.pop("count")
response_data.pop("page")
response_data.pop("total")
if PAGE_SIZE * page_num < r_aardvark.json().get("total"):
page_num += 1
else:
break
return response_data
Returns:
None
"""
conn = config['connection_iam']
conn['account_number'] = account_number
roles = Roles([Role(role_data) for role_data in list_roles(**conn)])
active_roles = []
LOGGER.info('Updating role data for account {}'.format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info('Finding inactive accounts')
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info('Filtering roles')
plugins = FilterPlugins()
# Blacklist needs to know the current account
config['filter_config']['BlacklistFilter']['current_account'] = account_number
for plugin_path in config.get('active_filters'):
plugin_name = plugin_path.split(':')[1]
plugins.load_plugin(plugin_path, config=config['filter_config'].get(plugin_name, None))
for plugin in plugins.filter_plugins:
filtered_list = plugin.apply(roles)
for stats_entry in role.stats:
rows.append([stats_entry['Date'],
stats_entry['Source'],
stats_entry['PermissionsCount'],
stats_entry.get('DisqualifiedBy', [])])
print tabulate(rows, headers=headers) + '\n\n'
# can't do anymore if we don't have AA data
if not role.aa_data:
LOGGER.warn('ARN not found in Access Advisor: {}'.format(role.arn))
return
warn_unknown_permissions = config.get('warnings', {}).get('unknown_permissions', False)
repoable_permissions = set([])
permissions = roledata._get_role_permissions(role, warn_unknown_perms=warn_unknown_permissions)
if len(role.disqualified_by) == 0:
repoable_permissions = roledata._get_repoable_permissions(role.role_name, permissions, role.aa_data,
role.no_repo_permissions,
config['filter_config']['AgeFilter']['minimum_age'],
hooks)
print "Repoable services:"
headers = ['Service', 'Action', 'Repoable']
rows = []
for permission in permissions:
service = permission.split(':')[0]
action = permission.split(':')[1]
repoable = permission in repoable_permissions
rows.append([service, action, repoable])
rows = sorted(rows, key=lambda x: (x[2], x[0], x[1]))
account_number=account_number,
service=service.get("serviceNamespace"),
last_authenticated=service["lastAuthenticated"],
)
)
used_services.add(service["serviceNamespace"])
accessed = datetime.datetime.fromtimestamp(accessed, tzlocal())
if accessed > now - ago:
used_services.add(service["serviceNamespace"])
for permission_name, permission_decision in list(
potentially_repoable_permissions.items()
):
if permission_name.split(":")[0] in IAM_ACCESS_ADVISOR_UNSUPPORTED_SERVICES:
LOGGER.warn("skipping {}".format(permission_name))
continue
# we have an unused service but need to make sure it's repoable
if permission_name.split(":")[0] not in used_services:
if permission_name in IAM_ACCESS_ADVISOR_UNSUPPORTED_ACTIONS:
LOGGER.warn("skipping {}".format(permission_name))
continue
permission_decision.repoable = True
permission_decision.decider = "Access Advisor"
return potentially_repoable_permissions