Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cancel_scheduled_repo(account_number, dynamo_table, role_name=None, is_all=None):
"""
Cancel scheduled repo for a role in an account
"""
if not is_all and not role_name:
LOGGER.error("Either a specific role to cancel or all must be provided")
return
if is_all:
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in role_ids_for_account(dynamo_table, account_number)
]
)
# filter to show only roles that are scheduled
roles = [role for role in roles if (role.repo_scheduled)]
for role in roles:
set_role_data(
dynamo_table, role.role_id, {"RepoScheduled": 0, "ScheduledPerms": []}
)
LOGGER.info(
"Canceled scheduled repo for roles: {}".format(
", ".join([role.role_name for role in roles])
)
)
"""
Repo all scheduled or eligible roles in an account. Collect any errors and display them at the end.
Args:
account_number (string)
dynamo_table
config
commit (bool): actually make the changes
scheduled (bool): if True only repo the scheduled roles, if False repo all the (eligible) roles
Returns:
None
"""
errors = []
role_ids_in_account = role_ids_for_account(dynamo_table, account_number)
roles = Roles([])
for role_id in role_ids_in_account:
roles.append(Role(get_role_data(dynamo_table, role_id, fields=['Active', 'RoleName', 'RepoScheduled'])))
roles = roles.filter(active=True)
cur_time = int(time.time())
if scheduled:
roles = [role for role in roles if (role.repo_scheduled and cur_time > role.repo_scheduled)]
LOGGER.info('Repoing these {}roles from account {}:\n\t{}'.format('scheduled ' if scheduled else '',
account_number,
', '.join([role.role_name for role in roles])))
for role in roles:
error = repo_role(account_number, role.role_name, dynamo_table, config, hooks, commit=commit)
Display a table with data about all roles in an account and write a csv file with the data.
Args:
account_number (string)
inactive (bool): show roles that have historically (but not currently) existed in the account if True
Returns:
None
"""
headers = ['Name', 'Refreshed', 'Disqualified By', 'Can be repoed', 'Permissions', 'Repoable', 'Repoed',
'Services']
rows = list()
roles = Roles([Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))])
if not inactive:
roles = roles.filter(active=True)
for role in roles:
rows.append([role.role_name,
role.refreshed,
role.disqualified_by,
len(role.disqualified_by) == 0,
role.total_permissions,
role.repoable_permissions,
role.repoed,
role.repoable_services])
rows = sorted(rows, key=lambda x: (x[5], x[0], x[4]))
rows.insert(0, headers)
def schedule_repo(account_number, dynamo_table, config, hooks):
"""
Schedule a repo for a given account. Schedule repo for a time in the future (default 7 days) for any roles in
the account with repoable permissions.
"""
scheduled_roles = []
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))
]
)
scheduled_time = int(time.time()) + (
86400 * config.get("repo_schedule_period_days", 7)
)
for role in roles:
if role.repoable_permissions > 0 and not role.repo_scheduled:
role.repo_scheduled = scheduled_time
# freeze the scheduled perms to whatever is repoable right now
set_role_data(
dynamo_table,
role.role_id,
{
"RepoScheduled": scheduled_time,
"ScheduledPerms": role.repoable_services,
"""
Repo all scheduled or eligible roles in an account. Collect any errors and display them at the end.
Args:
account_number (string)
dynamo_table
config
commit (bool): actually make the changes
scheduled (bool): if True only repo the scheduled roles, if False repo all the (eligible) roles
Returns:
None
"""
errors = []
role_ids_in_account = role_ids_for_account(dynamo_table, account_number)
roles = Roles([])
for role_id in role_ids_in_account:
roles.append(
Role(
get_role_data(
dynamo_table,
role_id,
fields=["Active", "RoleName", "RepoScheduled"],
)
)
)
roles = roles.filter(active=True)
cur_time = int(time.time())
def schedule_repo(account_number, dynamo_table, config, hooks):
"""
Schedule a repo for a given account. Schedule repo for a time in the future (default 7 days) for any roles in
the account with repoable permissions.
"""
scheduled_roles = []
roles = Roles([Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))])
scheduled_time = int(time.time()) + (86400 * config.get('repo_schedule_period_days', 7))
for role in roles:
if role.repoable_permissions > 0:
set_role_data(dynamo_table, role.role_id, {'RepoScheduled': scheduled_time})
scheduled_roles.append(role)
LOGGER.info("Scheduled repo for {} days from now for these roles:\n\t{}".format(
config.get('repo_schedule_period_days', 7), ', '.join([r.role_name for r in scheduled_roles])))
repokid.hooks.call_hooks(hooks, 'AFTER_SCHEDULE_REPO', {'roles': scheduled_roles})
"Name",
"Refreshed",
"Disqualified By",
"Can be repoed",
"Permissions",
"Repoable",
"Repoed",
"Services",
]
rows = list()
roles = Roles(
[
Role(get_role_data(dynamo_table, roleID))
for roleID in tqdm(role_ids_for_account(dynamo_table, account_number))
]
)
if not inactive:
roles = roles.filter(active=True)
for role in roles:
rows.append(
[
role.role_name,
role.refreshed,
role.disqualified_by,
len(role.disqualified_by) == 0,
role.total_permissions,
role.repoable_permissions,
role.repoed,