How to use the cartography.driftdetect.serializers.StateSchema function in cartography

To help you get started, we’ve selected a few cartography examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github lyft / cartography / tests / unit / driftdetect / test_detector.py View on Github external
key_2 = "d.test2"
    key_3 = "d.test3"
    results = [
        {key_1: "1", key_2: "8", key_3: ["15", "22", "29"]},
        {key_1: "2", key_2: "9", key_3: ["16", "23", "30"]},
        {key_1: "3", key_2: "10", key_3: ["17", "24", "31"]},
        {key_1: "4", key_2: "11", key_3: ["18", "25", "32"]},
        {key_1: "5", key_2: "12", key_3: ["19", "26", "33"]},
        {key_1: "6", key_2: "13", key_3: ["20", "27", "34"]},
        {key_1: "7", key_2: "14", key_3: ["21", "28", "35"]},
    ]
    mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
    mock_boltstatementresult.__iter__.side_effect = results.__iter__
    mock_session.run.return_value = mock_boltstatementresult
    data = FileSystem.load("tests/data/detectors/test_multiple_properties.json")
    state_old = StateSchema().load(data)
    state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
    get_state(mock_session, state_new)
    state_new.properties = state_old.properties
    drifts = compare_states(state_old, state_new)
    mock_session.run.assert_called_with(state_new.validation_query)
    assert ["7", "14", ["21", "28", "35"]] in drifts
    assert ["3", "10", ["17", "24", "31"]] not in drifts
github lyft / cartography / tests / unit / driftdetect / test_detect_drift.py View on Github external
def test_basic_drift_detection():
    """
    Tests that drift detection works.
    """
    data = FileSystem.load("tests/data/test_cli_detectors/detector/1.json")
    start_state = StateSchema().load(data)
    data = FileSystem.load("tests/data/test_cli_detectors/detector/2.json")
    end_state = StateSchema().load(data)
    new_results, missing_results = perform_drift_detection(start_state, end_state)
    assert ['36', '37', ['38', '39', '40']] in new_results
    assert ['7', '14', ['21', '28', '35']] in missing_results
github lyft / cartography / tests / unit / driftdetect / test_detector.py View on Github external
mock_boltstatementresult = MagicMock()
    results = [
        {key_1: "1", key_2: "8"},
        {key_1: "2", key_2: "9"},
        {key_1: "3", key_2: "10"},
        {key_1: "4", key_2: "11"},
        {key_1: "5", key_2: "12"},
        {key_1: "6", key_2: "13"},
        {key_1: "7", key_2: "14"},
    ]

    mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
    mock_boltstatementresult.__iter__.side_effect = results.__iter__
    mock_session.run.return_value = mock_boltstatementresult
    data = FileSystem.load("tests/data/detectors/test_multiple_expectations.json")
    state_old = StateSchema().load(data)
    state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
    get_state(mock_session, state_new)
    state_new.properties = state_old.properties
    drifts = compare_states(state_old, state_new)
    mock_session.run.assert_called_with(state_new.validation_query)
    assert ["7", "14"] in drifts
github lyft / cartography / tests / unit / driftdetect / test_detector.py View on Github external
mock_boltstatementresult = MagicMock()
    key = "d.test"
    results = [
        {key: "1"},
        {key: "2"},
        {key: "3"},
        {key: "4"},
        {key: "5"},
        {key: "6"},
    ]

    mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
    mock_boltstatementresult.__iter__.side_effect = results.__iter__
    mock_session.run.return_value = mock_boltstatementresult
    data = FileSystem.load("tests/data/detectors/test_expectations.json")
    state_old = StateSchema().load(data)
    state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
    get_state(mock_session, state_new)
    drifts = compare_states(state_old, state_new)
    mock_session.run.assert_called_with(state_new.validation_query)
    assert not drifts
github lyft / cartography / tests / integration / driftdetect / test_get_state.py View on Github external
SET person.test2 = {test2},
        person.test3 = {test3}
        """
    for node in data:
        test = node[0]
        test2 = node[1]
        test3 = node[2]
        neo4j_session.run(
            ingest_nodes,
            test=test,
            test2=test2,
            test3=test3,
        )

    query_directory = "tests/data/test_update_detectors/test_detector"
    state_serializer = StateSchema()
    shortcut_serializer = ShortcutSchema()
    storage = FileSystem

    file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
    file_2 = str(datetime.datetime(2019, 1, 1, 0, 0, 1)) + ".json"

    get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
    add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)

    detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
    detector_1 = state_serializer.load(detector_1_data)

    detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
    detector_2 = state_serializer.load(detector_2_data)

    assert detector_1.name == detector_2.name
github lyft / cartography / tests / unit / driftdetect / test_detector.py View on Github external
mock_boltstatementresult = MagicMock()
    results = [
        {key: "1"},
        {key: "2"},
        {key: "3"},
        {key: "4"},
        {key: "5"},
        {key: "6"},
        {key: "7"},
    ]

    mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
    mock_boltstatementresult.__iter__.side_effect = results.__iter__
    mock_session.run.return_value = mock_boltstatementresult
    data = FileSystem.load("tests/data/detectors/test_expectations.json")
    state_old = StateSchema().load(data)
    state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
    get_state(mock_session, state_new)
    state_new.properties = state_old.properties
    drifts = compare_states(state_old, state_new)
    mock_session.run.assert_called_with(state_new.validation_query)
    assert drifts
    assert ["7"] in drifts
github lyft / cartography / tests / unit / driftdetect / test_detect_drift.py View on Github external
def test_drift_detection_errors():
    data = FileSystem.load("tests/data/test_cli_detectors/detector/1.json")
    start_state = StateSchema().load(data)
    data = FileSystem.load("tests/data/test_cli_detectors/detector/2.json")
    end_state = StateSchema().load(data)

    start_state.name = "Wrong Name"
    with pytest.raises(ValueError):
        perform_drift_detection(start_state, end_state)

    start_state = StateSchema().load(data)
    start_state.properties = ["Incorrect", "Properties"]
    with pytest.raises(ValueError):
        perform_drift_detection(start_state, end_state)

    start_state = StateSchema().load(data)
    start_state.validation_query = "Invalid Validation Query"
    with pytest.raises(ValueError):
        perform_drift_detection(start_state, end_state)
github lyft / cartography / tests / integration / driftdetect / test_get_state.py View on Github external
SET person.test2 = {test2},
            person.test3 = {test3}
            """
    for node in data:
        test = node[0]
        test2 = node[1]
        test3 = node[2]
        neo4j_session.run(
            ingest_nodes,
            test=test,
            test2=test2,
            test3=test3,
        )

    query_directory = "tests/data/test_update_detectors/invalid_query"
    state_serializer = StateSchema()
    storage = FileSystem

    file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
    with pytest.raises(neobolt.exceptions.CypherSyntaxError):
        get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)

    query_directory = "tests/data/test_update_detectors/invalid_template"

    with pytest.raises(ValidationError):
        get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
github lyft / cartography / cartography / driftdetect / get_states.py View on Github external
e,
            )
        else:
            logger.error(
                (
                    "Unable to auth to Neo4j, an error occurred: '%s'. driftdetect attempted to connect to Neo4j "
                    "with a username and password. Check your Neo4j server settings to see if the username and "
                    "password provided to driftdetect are valid credentials."
                ),
                e,
            )
        return

    with neo4j_driver.session() as session:
        filename = '.'.join([str(i) for i in time.gmtime()] + ["json"])
        state_serializer = StateSchema()
        shortcut_serializer = ShortcutSchema()
        for query_directory in FileSystem.walk(config.drift_detection_directory):
            try:
                get_query_state(session, query_directory, state_serializer, FileSystem, filename)
                add_shortcut(FileSystem, shortcut_serializer, query_directory, 'most-recent', filename)
            except ValidationError as err:
                msg = "Unable to create State for directory {}, with data \n{}".format(
                    query_directory,
                    err.messages,
                )
                logger.exception(msg)
            except KeyError as err:
                msg = f"Could not find {err} field in state template for directory {query_directory}."
                logger.exception(msg)
            except FileNotFoundError as err:
                logger.exception(err)
github lyft / cartography / cartography / driftdetect / detect_deviations.py View on Github external
def run_drift_detection(config):
    try:
        if not valid_directory(config.query_directory):
            logger.error("Invalid Drift Detection Directory")
            return
        state_serializer = StateSchema()
        shortcut_serializer = ShortcutSchema()
        shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
        shortcut = shortcut_serializer.load(shortcut_data)
        start_state_data = FileSystem.load(
            os.path.join(
                config.query_directory, shortcut.shortcuts.get(
                    config.start_state,
                    config.start_state,
                ),
            ),
        )
        start_state = state_serializer.load(start_state_data)
        end_state_data = FileSystem.load(
            os.path.join(
                config.query_directory, shortcut.shortcuts.get(
                    config.end_state,