Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ensure_cfn_bucket_does_not_exist_us_east(self):
"""Test ensure cfn bucket does not exist us east."""
session = get_session("us-east-1")
provider = Provider(session)
action = BaseAction(
context=mock_context("mynamespace"),
provider_builder=MockProviderBuilder(provider)
)
stubber = Stubber(action.s3_conn)
stubber.add_client_error(
"head_bucket",
service_error_code="NoSuchBucket",
service_message="Not Found",
http_status_code=404,
)
stubber.add_response(
"create_bucket",
service_response={},
expected_params={
def test_ensure_cfn_bucket_does_not_exist_us_west(self):
"""Test ensure cfn bucket does not exist us west."""
session = get_session("us-west-1")
provider = Provider(session)
action = BaseAction(
context=mock_context("mynamespace"),
provider_builder=MockProviderBuilder(provider, region="us-west-1")
)
stubber = Stubber(action.s3_conn)
stubber.add_client_error(
"head_bucket",
service_error_code="NoSuchBucket",
service_message="Not Found",
http_status_code=404,
)
stubber.add_response(
"create_bucket",
service_response={},
expected_params={
and reference it within CFNgin (NOTE: the path should be relative
to the CFNgin config file)::
conf_key: ${kms file://kms_value.txt}
# Both of the above would resolve to
conf_key: PASSWORD
"""
value = read_value_from_path(value)
region = None
if "@" in value:
region, value = value.split("@", 1)
kms = get_session(region).client('kms')
# encode str value as an utf-8 bytestring for use with codecs.decode.
value = value.encode('utf-8')
# get raw but still encrypted value from base64 version.
decoded = codecs.decode(value, 'base64')
# decrypt and return the plain text raw value.
return kms.decrypt(CiphertextBlob=decoded)["Plaintext"]
executable_users (comma delimited) OPTIONAL ONCE:
aws_account_id | amazon | self
Any other arguments specified are sent as filters to the aws api
For example, ``architecture:x86_64`` will add a filter
""" # noqa
value = read_value_from_path(value)
if "@" in value:
region, value = value.split("@", 1)
else:
region = provider.region
ec2 = get_session(region).client('ec2')
values = {}
describe_args = {}
# now find any other arguments that can be filters
matches = re.findall(r'([0-9a-zA-z_-]+:[^\s$]+)', value)
for match in matches:
k, v = match.split(':', 1)
values[k] = v
if not values.get('owners'):
raise Exception("'owners' value required when using ami")
owners = values.pop('owners').split(',')
describe_args["Owners"] = owners
if not values.get('name_regex'):
conf_key: ${ssmstore file://ssmstore_value.txt}
# Both of the above would resolve to
conf_key: PASSWORD
"""
warnings.warn(cls.DEPRECATION_MSG, DeprecationWarning)
LOGGER.warning(cls.DEPRECATION_MSG)
value = read_value_from_path(value)
region = "us-east-1"
if "@" in value:
region, value = value.split("@", 1)
client = get_session(region).client("ssm")
response = client.get_parameters(
Names=[
value,
],
WithDecryption=True
)
if 'Parameters' in response:
return str(response['Parameters'][0]['Value'])
raise ValueError('SSMKey "{}" does not exist in region {}'.format(
value, region))
class_.__name__,
config['key'],
config['bucket'])
dir_name = self.sanitize_uri_path(
"s3-%s-%s" % (config['bucket'],
config['key'][:-len(suffix)])
)
break
if extractor is None:
raise ValueError(
"Archive type could not be determined for S3 object \"%s\" "
"in bucket %s." % (config['key'], config['bucket'])
)
session = get_session(region=None)
extra_s3_args = {}
if config.get('requester_pays', False):
extra_s3_args['RequestPayer'] = 'requester'
# We can skip downloading the archive if it's already been cached
if config.get('use_latest', True):
try:
# LastModified should always be returned in UTC, but it doesn't
# hurt to explicitly convert it to UTC again just in case
modified_date = session.client('s3').head_object(
Bucket=config['bucket'],
Key=config['key'],
**extra_s3_args
)['LastModified'].astimezone(dateutil.tz.tzutc())
except botocore.exceptions.ClientError as client_error:
LOGGER.error("Error checking modified date of "
custom_bucket_region,
context.config.cfngin_bucket_region,
provider.region
)
# Check if we should walk / follow symlinks
follow_symlinks = kwargs.get('follow_symlinks', False)
if not isinstance(follow_symlinks, bool):
raise ValueError('follow_symlinks option must be a boolean')
# Check for S3 object acl. Valid values from:
# https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
payload_acl = kwargs.get('payload_acl', 'private')
# Always use the global client for s3
session = get_session(bucket_region)
s3_client = session.client('s3')
ensure_s3_bucket(s3_client, bucket_name, bucket_region)
prefix = kwargs.get('prefix', '')
results = {}
for name, options in kwargs['functions'].items():
sys_path = (os.path.dirname(context.config_path)
if os.path.isfile(context.config_path)
else context.config_path)
results[name] = _upload_function(s3_client, bucket_name, prefix, name,
options, follow_symlinks, payload_acl,
sys_path)
return results
def build(context, provider, **kwargs): # pylint: disable=unused-argument
"""Build static site."""
session = get_session(provider.region)
options = kwargs.get('options', {})
context_dict = {}
context_dict['artifact_key_prefix'] = "%s-%s-" % (options['namespace'], options['name']) # noqa
default_param_name = "%shash" % context_dict['artifact_key_prefix']
if options.get('build_output'):
build_output = os.path.join(
options['path'],
options['build_output']
)
else:
build_output = options['path']
context_dict['artifact_bucket_name'] = RxrefLookup.handle(
kwargs.get('artifact_bucket_rxref_lookup'),
provider=provider,
def purge_bucket(context, provider, **kwargs):
"""Delete objects in bucket."""
session = get_session(provider.region)
if kwargs.get('bucket_name'):
bucket_name = kwargs['bucket_name']
else:
if kwargs.get('bucket_output_lookup'):
value = kwargs['bucket_output_lookup']
handler = OutputLookup.handle
elif kwargs.get('bucket_rxref_lookup'):
value = kwargs['bucket_rxref_lookup']
handler = RxrefLookup.handle
elif kwargs.get('bucket_xref_lookup'):
value = kwargs['bucket_xref_lookup']
handler = XrefLookup.handle
else:
LOGGER.fatal('No bucket name/source provided.')
return False
def build(self, region=None, profile=None):
"""Get or create the provider for the given region and profile."""
with self.lock:
# memorization lookup key derived from region + profile.
key = "{}-{}".format(profile, region)
try:
# assume provider is in provider dictionary.
provider = self.providers[key]
except KeyError:
LOGGER.debug('Missed memorized lookup (%s), creating new AWS '
'Provider.', key)
if not region:
region = self.region
# memoize the result for later.
self.providers[key] = Provider(
get_session(region=region, profile=profile),
region=region,
**self.kwargs
)
provider = self.providers[key]
return provider