Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
scheduled_roles = []
roles = Roles([Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))])
scheduled_time = int(time.time()) + (86400 * config.get('repo_schedule_period_days', 7))
for role in roles:
if role.repoable_permissions > 0:
set_role_data(dynamo_table, role.role_id, {'RepoScheduled': scheduled_time})
scheduled_roles.append(role)
LOGGER.info("Scheduled repo for {} days from now for these roles:\n\t{}".format(
config.get('repo_schedule_period_days', 7), ', '.join([r.role_name for r in scheduled_roles])))
repokid.hooks.call_hooks(hooks, 'AFTER_SCHEDULE_REPO', {'roles': scheduled_roles})
"RepoScheduled": scheduled_time,
"ScheduledPerms": role.repoable_services,
},
)
scheduled_roles.append(role)
LOGGER.info(
"Scheduled repo for {} days from now for account {} and these roles:\n\t{}".format(
config.get("repo_schedule_period_days", 7),
account_number,
", ".join([r.role_name for r in scheduled_roles]),
)
)
repokid.hooks.call_hooks(hooks, "AFTER_SCHEDULE_REPO", {"roles": scheduled_roles})
minimum_age: Minimum age of a role (in days) for it to be repoable
hooks: Dict containing hook names and functions to run
Returns:
set: Permissions that are 'repoable' (not used within the time threshold)
"""
potentially_repoable_permissions = _get_potentially_repoable_permissions(
role_name,
account_number,
aa_data,
permissions,
no_repo_permissions,
minimum_age,
)
hooks_output = repokid.hooks.call_hooks(
hooks,
"DURING_REPOABLE_CALCULATION",
{
"account_number": account_number,
"role_name": role_name,
"potentially_repoable_permissions": potentially_repoable_permissions,
"minimum_age": minimum_age,
},
)
LOGGER.debug(
"Repoable permissions for role {role_name} in {account_number}:\n{repoable}".format(
role_name=role_name,
account_number=account_number,
repoable="".join(
"{}: {}\n".format(perm, decision.decider)
if repoed_policies:
error = _replace_policies(repoed_policies, role, account_number, conn)
if error:
LOGGER.error(error)
errors.append(error)
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
roledata.add_new_policy_version(dynamo_table, role, current_policies, "Repo")
# regardless of whether we're successful we want to unschedule the repo
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
repokid.hooks.call_hooks(hooks, "AFTER_REPO", {"role": role})
if not errors:
# repos will stay scheduled until they are successful
set_role_data(
dynamo_table,
role.role_id,
{"Repoed": datetime.datetime.utcnow().isoformat()},
)
_update_repoed_description(role.role_name, **conn)
_update_role_data(
role,
dynamo_table,
account_number,
config,
conn,
hooks,