Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
potentially_repoable_permissions_dict[
role.arn
] = _get_potentially_repoable_permissions(
role.role_name,
role.account,
role.aa_data,
permissions_dict[role.arn],
role.no_repo_permissions,
minimum_age,
)
while len(repo_able_roles_batches) > 0:
role_batch = repo_able_roles_batches[:batch_size]
repo_able_roles_batches = repo_able_roles_batches[batch_size:]
hooks_output = repokid.hooks.call_hooks(
hooks,
"DURING_REPOABLE_CALCULATION_BATCH",
{
"role_batch": role_batch,
"potentially_repoable_permissions": potentially_repoable_permissions_dict,
"minimum_age": minimum_age,
},
)
for role_arn, output in list(hooks_output.items()):
repoable_set = set(
[
permission_name
for permission_name, permission_value in list(
output["potentially_repoable_permissions"].items()
)
if permission_value.repoable
if scheduled:
roles = [
role
for role in roles
if (role.repo_scheduled and cur_time > role.repo_scheduled)
]
LOGGER.info(
"Repoing these {}roles from account {}:\n\t{}".format(
"scheduled " if scheduled else "",
account_number,
", ".join([role.role_name for role in roles]),
)
)
repokid.hooks.call_hooks(
hooks, "BEFORE_REPO_ROLES", {"account_number": account_number, "roles": roles}
)
for role in roles:
error = repo_role(
account_number,
role.role_name,
dynamo_table,
config,
hooks,
commit=commit,
scheduled=scheduled,
)
if error:
errors.append(error)
ca.call('iam.client.put_role_policy', RoleName=role.role_name, PolicyName=policy_name,
PolicyDocument=json.dumps(policy, indent=2, sort_keys=True))
except botocore.exceptions.ClientError as e:
error = 'Exception calling PutRolePolicy on {role}/{policy}\n{e}\n'.format(
role=role.role_name, policy=policy_name, e=str(e))
LOGGER.error(error)
errors.append(error)
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
roledata.add_new_policy_version(dynamo_table, role, current_policies, 'Repo')
# regardless of whether we're successful we want to unschedule the repo
set_role_data(dynamo_table, role.role_id, {'RepoScheduled': 0})
repokid.hooks.call_hooks(hooks, 'AFTER_REPO', {'role': role})
if not errors:
# repos will stay scheduled until they are successful
set_role_data(dynamo_table, role.role_id, {'Repoed': datetime.datetime.utcnow().isoformat()})
_update_repoed_description(role.role_name, **conn)
_update_role_data(role, dynamo_table, account_number, config, conn, hooks, source='Repo', add_no_repo=False)
LOGGER.info('Successfully repoed role: {}'.format(role.role_name))
return errors
@hooks.implements_hook('DURING_REPOABLE_CALCULATION_BATCH', 1)
def log_during_repoable_calculation_batch_hooks(input_dict):
LOGGER.debug("Calling DURING_REPOABLE_CALCULATION_BATCH hooks")
if not all(required in input_dict for required in['role_batch', 'potentially_repoable_permissions', 'minimum_age']):
raise hooks.MissingHookParamaeter(
"Did not get all required parameters for DURING_REPOABLE_CALCULATION_BATCH hook")
for role in input_dict['role_batch']:
if not isinstance(role, Role):
raise hooks.MissingHookParamaeter(
"Role_batch needs to be a series of Role objects in DURING_REPOABLE_CALCULATION_BATCH hook")
return input_dict
@hooks.implements_hook('AFTER_SCHEDULE_REPO', 1)
def check_and_log_after_schedule_repo_hooks(input_dict):
LOGGER.debug("Calling AFTER_SCHEDULE_REPO hooks")
if 'roles' not in input_dict:
raise hooks.MissingHookParamaeter
def log_after_repo_hooks(input_dict):
LOGGER.debug("Calling AFTER_REPO hooks")
if "role" not in input_dict:
raise hooks.MissingHookParamaeter(
"Required key 'role' not passed to AFTER_REPO"
)
return input_dict
def log_before_repo_roles(input_dict):
LOGGER.debug("Calling DURING_REPOABLE_CALCULATION hooks")
if not all(required in input_dict for required in ["account_number", "roles"]):
raise hooks.MissingHookParamaeter(
"Did not get all required parameters for BEFORE_REPO_ROLES hook"
)
return input_dict
@hooks.implements_hook("DURING_REPOABLE_CALCULATION", 1)
def log_during_repoable_calculation_hooks(input_dict):
LOGGER.debug("Calling DURING_REPOABLE_CALCULATION hooks")
if not all(
required in input_dict
for required in [
"account_number",
"role_name",
"potentially_repoable_permissions",
"minimum_age",
]
):
raise hooks.MissingHookParamaeter(
"Did not get all required parameters for DURING_REPOABLE_CALCULATION hook"
)
return input_dict
def check_and_log_during_repoable_calculation_hooks(input_dict):
LOGGER.debug("Calling DURING_REPOABLE_CALCULATION hooks")
if not all(['role_name', 'potentially_repoable_permissions', 'minimum_age']
in input_dict):
raise hooks.MissingHookParamaeter