Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
Args:
account_number (string): The current account number Repokid is being run against
Returns:
None
"""
conn = config['connection_iam']
conn['account_number'] = account_number
roles = Roles([Role(role_data) for role_data in list_roles(**conn)])
active_roles = []
LOGGER.info('Updating role data for account {}'.format(account_number))
for role in tqdm(roles):
role.account = account_number
current_policies = get_role_inline_policies(role.as_dict(), **conn) or {}
active_roles.append(role.role_id)
roledata.update_role_data(dynamo_table, account_number, role, current_policies)
LOGGER.info('Finding inactive accounts')
roledata.find_and_mark_inactive(dynamo_table, account_number, active_roles)
LOGGER.info('Filtering roles')
plugins = FilterPlugins()
# Blacklist needs to know the current account
config['filter_config']['BlacklistFilter']['current_account'] = account_number
for plugin_path in config.get('active_filters'):
plugin_name = plugin_path.split(':')[1]
plugins.load_plugin(plugin_path, config=config['filter_config'].get(plugin_name, None))
def get_blocklist_from_bucket(bucket_config):
try:
s3_resource = boto3_cached_conn('s3', service_type='resource',
account_number=bucket_config.get('account_number'),
assume_role=bucket_config.get('assume_role', None),
session_name='repokid',
region=bucket_config.get('region', 'us-west-2'))
s3_obj = s3_resource.Object(bucket_name=bucket_config['bucket_name'], key=bucket_config['key'])
blocklist = s3_obj.get()['Body'].read().decode("utf-8")
blocklist_json = json.loads(blocklist)
# Blocklist problems are really bad and we should quit rather than silently continue
except (botocore.exceptions.ClientError, AttributeError):
LOGGER.error("S3 blocklist config was set but unable to connect retrieve object, quitting")
sys.exit(1)
except ValueError:
LOGGER.error("S3 blocklist config was set but the returned file is bad, quitting")
sys.exit(1)
if set(blocklist_json.keys()) != set(['arns', 'names']):
@sts_conn('iam', service_type='client')
@rate_limited()
def get_managed_policy_document(policy_arn, policy_metadata=None, client=None, **kwargs):
"""Retrieve the currently active (i.e. 'default') policy version document for a policy.
:param policy_arn:
:param policy_metadata: This is a previously fetch managed policy response from boto/cloudaux.
This is used to prevent unnecessary API calls to get the initial policy default version id.
:param client:
:param kwargs:
:return:
"""
if not policy_metadata:
policy_metadata = client.get_policy(PolicyArn=policy_arn)
policy_document = client.get_policy_version(PolicyArn=policy_arn,
VersionId=policy_metadata['Policy']['DefaultVersionId'])
@sts_conn('sqs')
@rate_limited()
def set_queue_attributes(client=None, **kwargs):
return client.set_queue_attributes(**kwargs)
@sts_conn('sns')
@rate_limited()
def delete_topic(client=None, **kwargs):
return client.delete_topic(**kwargs)
@sts_conn('ec2')
@rate_limited()
def create_group(group, account_number=None, region=None, assume_role=None, client=None):
if group.vpc_id:
group_id = client.create_security_group(
GroupName=group.name,
Description=group.description,
VpcId=group.vpc_id
)['GroupId']
else:
group_id = client.create_security_group(
GroupName=group.name,
Description=group.description
)['GroupId']
return group_id