Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
{
"and": [
{"exists": {"ref": "resource.Properties.RoleName"}},
{"regex": ["^prefix-.*$", {"ref": "resource.Properties.RoleName"}]},
]
},
{"eq": [{"ref": "principal"}, "arn:aws:iam::999999999:role/someuser@bla.com"]},
]
},
),
]
)
},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_cross_account_role_with_name, mock_config)
assert result.valid
{
"and": [
{"exists": {"ref": "resource.Properties.RoleName"}},
{"regex": ["^prefix-.*$", {"ref": "resource.Properties.RoleName"}]},
]
},
{"eq": [{"ref": "principal"}, "arn:aws:iam::999999999:role/someuser@bla.com"]},
]
},
),
]
)
},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_cross_account_role_no_name, mock_config)
assert not result.valid
def test_non_matching_filters_are_reported_normally(single_security_group_one_cidr_ingress):
mock_config = Config(
rules=["EC2SecurityGroupMissingEgressRule"],
aws_account_id="123456789",
stack_name="mockstack",
rules_config={
"EC2SecurityGroupMissingEgressRule": RuleConfig(
filters=[
Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "anotherstack"]})
],
)
},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(single_security_group_one_cidr_ingress, mock_config)
assert not result.valid
assert len(result.failed_rules) == 1
assert len(result.failed_monitored_rules) == 0
assert result.failed_rules[0].rule == "EC2SecurityGroupMissingEgressRule"
assert (
result.failed_rules[0].reason
== "Missing egress rule in sg means all traffic is allowed outbound. Make this explicit if it is desired configuration"
)
def test_resource_whitelisting_works_as_expected(template_two_roles_dict, expected_result_two_roles):
mock_rule_to_resource_whitelist = {"CrossAccountTrustRule": {".*": {"RootRoleOne"}}}
mock_config = Config(
rules=["CrossAccountTrustRule"],
aws_account_id="123456789",
rule_to_resource_whitelist=mock_rule_to_resource_whitelist,
stack_name="mockstack",
stack_whitelist={},
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, mock_config)
assert not result.valid
assert result.failed_rules[0] == expected_result_two_roles[-1]
def test_all_rules_valid():
for r in DEFAULT_RULES.values():
if r.RULE_MODE not in ["BLOCKING", "MONITOR", "DEBUG"]:
assert False
assert True
def test_whitelisted_stacks_do_not_report_anything(template_two_roles_dict):
mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
mock_config = Config(
rules=["CrossAccountTrustRule"],
aws_account_id="123456789",
stack_name="mockstack",
stack_whitelist=mock_stack_whitelist,
)
rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, mock_config)
assert result.valid
config = Config(
project_name=event.get("project"),
service_name=event.get("serviceName"),
stack_name=event.get("stack", {}).get("name"),
rules=DEFAULT_RULES.keys(),
event=event.get("event"),
template_url=event.get("stack_template_url", "N/A"),
aws_region=event.get("region", "N/A"),
aws_account_name=event.get("account", {}).get("name", "N/A"),
aws_account_id=event.get("account", {}).get("id", "N/A"),
aws_user_agent=event.get("user_agent", "N/A"),
)
logger.info("Scan started for: {}; {}; {};".format(config.project_name, config.service_name, config.stack_name))
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
# TODO get AWS variables/parameters and pass them to resolve
cfmodel = pycfmodel.parse(template).resolve()
result = processor.process_cf_template(cfmodel=cfmodel, config=config, extras=extras)
perform_logging(result, config, event)
return {
"valid": result.valid,
"reason": ",".join(["{}-{}".format(r.rule, r.reason) for r in result.failed_rules]),
"failed_rules": [
failure.serializable() for failure in RuleProcessor.remove_debug_rules(rules=result.failed_rules)
],
"exceptions": [x.args[0] for x in result.exceptions],