How to use the cfripper.rule_processor.RuleProcessor function in cfripper

To help you get started, we’ve selected a few cfripper examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Skyscanner / cfripper / tests / rules / test_CrossAccountTrustRule.py View on Github external
def test_whitelisted_stacks_do_not_report_anything(template_two_roles_dict):
    mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
    mock_config = Config(
        rules=["CrossAccountTrustRule"],
        aws_account_id="123456789",
        stack_name="mockstack",
        stack_whitelist=mock_stack_whitelist,
    )
    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(template_two_roles_dict, mock_config)

    assert result.valid
github Skyscanner / cfripper / tests / rules / test_CrossAccountTrustRule.py View on Github external
def test_filter_do_not_report_anything(template_two_roles_dict):
    mock_config = Config(
        rules=["CrossAccountTrustRule"],
        aws_account_id="123456789",
        stack_name="mockstack",
        rules_config={
            "CrossAccountTrustRule": RuleConfig(
                filters=[
                    Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "mockstack"]})
                ],
            )
        },
    )
    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(template_two_roles_dict, mock_config)

    assert result.valid
github Skyscanner / cfripper / tests / rules / test_EC2SecurityGroupMissingEgressRule.py View on Github external
def test_filter_do_not_report_anything(single_security_group_one_cidr_ingress):
    mock_config = Config(
        rules=["EC2SecurityGroupMissingEgressRule"],
        aws_account_id="123456789",
        stack_name="mockstack",
        rules_config={
            "EC2SecurityGroupMissingEgressRule": RuleConfig(
                filters=[
                    Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "mockstack"]},)
                ],
            )
        },
    )
    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(single_security_group_one_cidr_ingress, mock_config)

    assert result.valid
github Skyscanner / cfripper / tests / config / test_filter.py View on Github external
"and": [
                                        {"exists": {"ref": "resource.Properties.RoleName"}},
                                        {"regex": ["^prefix-.*$", {"ref": "resource.Properties.RoleName"}]},
                                    ]
                                },
                                {"eq": [{"ref": "principal"}, "arn:aws:iam::999999999:role/someuser@bla.com"]},
                            ]
                        },
                    ),
                ]
            )
        },
    )

    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(template_cross_account_role_no_name, mock_config)
    assert not result.valid
github Skyscanner / cfripper / tests / rules / test_EC2SecurityGroupMissingEgressRule.py View on Github external
def test_non_matching_filters_are_reported_normally(single_security_group_one_cidr_ingress):
    mock_config = Config(
        rules=["EC2SecurityGroupMissingEgressRule"],
        aws_account_id="123456789",
        stack_name="mockstack",
        rules_config={
            "EC2SecurityGroupMissingEgressRule": RuleConfig(
                filters=[
                    Filter(rule_mode=RuleMode.WHITELISTED, eval={"eq": [{"ref": "config.stack_name"}, "anotherstack"]})
                ],
            )
        },
    )
    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(single_security_group_one_cidr_ingress, mock_config)

    assert not result.valid
    assert len(result.failed_rules) == 1
    assert len(result.failed_monitored_rules) == 0
    assert result.failed_rules[0].rule == "EC2SecurityGroupMissingEgressRule"
    assert (
        result.failed_rules[0].reason
        == "Missing egress rule in sg means all traffic is allowed outbound. Make this explicit if it is desired configuration"
    )
github Skyscanner / cfripper / tests / model / test_rule_processor.py View on Github external
config = Config(stack_name="abcd", rules=["MockRule"], rule_to_action_whitelist=whitelist_for_all_stacks)

    result = Result()
    failed_rules = [
        Failure(
            rule="MockRule",
            reason="MockRule is invalid for some actions",
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.HIGH,
            actions={"s3:ListBucket", "s3:GetBucket"},
            granularity=RuleGranularity.ACTION,
        )
    ]
    result.failed_rules = failed_rules

    RuleProcessor.remove_failures_of_whitelisted_actions(config=config, result=result)
    assert result.failed_rules == [
        Failure(
            rule="MockRule",
            reason="MockRule is invalid for some actions",
            rule_mode=RuleMode.BLOCKING,
            risk_value=RuleRisk.HIGH,
            actions={"s3:GetBucket"},
            granularity=RuleGranularity.ACTION,
        )
github Skyscanner / cfripper / tests / model / test_rule_processor.py View on Github external
def test_with_no_rules(mock_remove_whitelisted_actions, mock_remove_whitelisted_resources, template):
    processor = RuleProcessor()
    config = Mock()

    processor.process_cf_template(template, config)
    mock_remove_whitelisted_actions.assert_called()
    mock_remove_whitelisted_resources.assert_called()
github Skyscanner / cfripper / tests / rules / test_CrossAccountTrustRule.py View on Github external
def test_non_whitelisted_stacks_are_reported_normally(template_two_roles_dict, expected_result_two_roles):
    mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
    mock_config = Config(
        rules=["CrossAccountTrustRule"],
        aws_account_id="123456789",
        stack_name="anotherstack",
        stack_whitelist=mock_stack_whitelist,
    )
    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(template_two_roles_dict, mock_config)
    assert not result.valid
    assert result.failed_rules == expected_result_two_roles
github Skyscanner / cfripper / tests / config / test_filter.py View on Github external
"and": [
                                        {"exists": {"ref": "resource.Properties.RoleName"}},
                                        {"regex": ["^prefix-.*$", {"ref": "resource.Properties.RoleName"}]},
                                    ]
                                },
                                {"eq": [{"ref": "principal"}, "arn:aws:iam::999999999:role/someuser@bla.com"]},
                            ]
                        },
                    ),
                ]
            )
        },
    )

    rules = [DEFAULT_RULES.get(rule)(mock_config) for rule in mock_config.rules]
    processor = RuleProcessor(*rules)
    result = processor.process_cf_template(template_cross_account_role_with_name, mock_config)
    assert result.valid
github Skyscanner / cfripper / cfripper / main.py View on Github external
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
    processor = RuleProcessor(*rules)

    # TODO get AWS variables/parameters and pass them to resolve
    cfmodel = pycfmodel.parse(template).resolve()

    result = processor.process_cf_template(cfmodel=cfmodel, config=config, extras=extras)

    perform_logging(result, config, event)

    return {
        "valid": result.valid,
        "reason": ",".join(["{}-{}".format(r.rule, r.reason) for r in result.failed_rules]),
        "failed_rules": [
            failure.serializable() for failure in RuleProcessor.remove_debug_rules(rules=result.failed_rules)
        ],
        "exceptions": [x.args[0] for x in result.exceptions],
        "warnings": [
            failure.serializable() for failure in RuleProcessor.remove_debug_rules(rules=result.failed_monitored_rules)
        ],