Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
PAGE_SIZE = 1000
page_num = 1
if account_number:
payload = {'phrase': '{}'.format(account_number)}
elif arn:
payload = {'arn': [arn]}
else:
return
while True:
params = {'count': PAGE_SIZE, 'page': page_num}
try:
r_aardvark = requests.post(aardvark_api_location, params=params, json=payload)
except requests.exceptions.RequestException as e:
LOGGER.error('Unable to get Aardvark data: {}'.format(e))
sys.exit(1)
else:
if(r_aardvark.status_code != 200):
LOGGER.error('Unable to get Aardvark data')
sys.exit(1)
response_data.update(r_aardvark.json())
# don't want these in our Aardvark data
response_data.pop('count')
response_data.pop('page')
response_data.pop('total')
if PAGE_SIZE * page_num < r_aardvark.json().get('total'):
page_num += 1
else:
break
return response_data
def __init__(self, config=None):
blocklist_json = None
bucket_config = config.get('blocklist_bucket', config.get('blacklist_bucket', None))
if bucket_config:
blocklist_json = get_blocklist_from_bucket(bucket_config)
current_account = config.get('current_account') or None
if not current_account:
LOGGER.error('Unable to get current account for Blocklist Filter')
blocklisted_role_names = set()
blocklisted_role_names.update([rolename.lower() for rolename in config.get(current_account, [])])
blocklisted_role_names.update([rolename.lower() for rolename in config.get('all', [])])
if blocklist_json:
blocklisted_role_names.update([name.lower() for name, accounts in blocklist_json['names'].items() if
('all' in accounts or config.get('current_account') in accounts)])
self.blocklisted_arns = set() if not blocklist_json else blocklist_json.get('arns', [])
self.blocklisted_role_names = blocklisted_role_names
except KeyError:
return
date_string = datetime.datetime.utcnow().strftime("%m/%d/%y")
if "; Repokid repoed" in description:
new_description = re.sub(
r"; Repokid repoed [0-9]{2}\/[0-9]{2}\/[0-9]{2}",
"; Repokid repoed {}".format(date_string),
description,
)
else:
new_description = description + " ; Repokid repoed {}".format(date_string)
# IAM role descriptions have a max length of 1000, if our new length would be longer, skip this
if len(new_description) < 1000:
client.update_role_description(RoleName=role_name, Description=new_description)
else:
LOGGER.error(
"Unable to set repo description ({}) for role {}, length would be too long".format(
new_description, role_name
)
# if this is a scheduled repo we need to filter out permissions that weren't previously scheduled
if scheduled:
repoable_permissions = roledata._filter_scheduled_repoable_perms(
repoable_permissions, role.scheduled_perms
)
repoed_policies, deleted_policy_names = roledata._get_repoed_policy(
role.policies[-1]["Policy"], repoable_permissions
)
if _inline_policies_size_exceeds_maximum(repoed_policies):
error = (
"Policies would exceed the AWS size limit after repo for role: {} in account {}. "
"Please manually minify.".format(role_name, account_number)
)
LOGGER.error(error)
errors.append(error)
continuing = False
# if we aren't repoing for some reason, unschedule the role
if not continuing:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
return
if not commit:
_logprint_deleted_and_repoed_policies(
deleted_policy_names, repoed_policies, role_name, account_number
)
return
permission: RepoablePermissionDecision()
for permission in permissions
if permission not in no_repo_list
}
used_services = set()
for service in aa_data:
(accessed, valid_authenticated) = _get_epoch_authenticated(
service["lastAuthenticated"]
)
if not accessed:
continue
if not valid_authenticated:
LOGGER.error(
"Got malformed Access Advisor data for {role_name} in {account_number} for service {service}"
": {last_authenticated}".format(
role_name=role_name,
account_number=account_number,
service=service.get("serviceNamespace"),
last_authenticated=service["lastAuthenticated"],
)
)
used_services.add(service["serviceNamespace"])
accessed = datetime.datetime.fromtimestamp(accessed, tzlocal())
if accessed > now - ago:
used_services.add(service["serviceNamespace"])
for permission_name, permission_decision in list(
potentially_repoable_permissions.items()
def __init__(self, config=None):
current_account = config.get("current_account") or None
if not current_account:
LOGGER.error("Unable to get current account for Exclusive Filter")
exclusive_role_globs = set()
exclusive_role_globs.update(
[role_glob.lower() for role_glob in config.get(current_account, [])]
)
exclusive_role_globs.update(
[role_glob.lower() for role_glob in config.get("all", [])]
)
self.exclusive_role_globs = exclusive_role_globs
)
return
if not commit:
_logprint_deleted_and_repoed_policies(
deleted_policy_names, repoed_policies, role_name, account_number
)
return
conn = config["connection_iam"]
conn["account_number"] = account_number
for name in deleted_policy_names:
error = _delete_policy(name, role, account_number, conn)
if error:
LOGGER.error(error)
errors.append(error)
if repoed_policies:
error = _replace_policies(repoed_policies, role, account_number, conn)
if error:
LOGGER.error(error)
errors.append(error)
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
roledata.add_new_policy_version(dynamo_table, role, current_policies, "Repo")
# regardless of whether we're successful we want to unschedule the repo
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
},
},
{
"IndexName": "RoleName",
"KeySchema": [{"AttributeName": "RoleName", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "KEYS_ONLY"},
"ProvisionedThroughput": {
"ReadCapacityUnits": 10,
"WriteCapacityUnits": 10,
},
},
],
)
except BotoClientError as e:
LOGGER.error(e)
return table