Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
key_2 = "d.test2"
key_3 = "d.test3"
results = [
{key_1: "1", key_2: "8", key_3: ["15", "22", "29"]},
{key_1: "2", key_2: "9", key_3: ["16", "23", "30"]},
{key_1: "3", key_2: "10", key_3: ["17", "24", "31"]},
{key_1: "4", key_2: "11", key_3: ["18", "25", "32"]},
{key_1: "5", key_2: "12", key_3: ["19", "26", "33"]},
{key_1: "6", key_2: "13", key_3: ["20", "27", "34"]},
{key_1: "7", key_2: "14", key_3: ["21", "28", "35"]},
]
mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
mock_boltstatementresult.__iter__.side_effect = results.__iter__
mock_session.run.return_value = mock_boltstatementresult
data = FileSystem.load("tests/data/detectors/test_multiple_properties.json")
state_old = StateSchema().load(data)
state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
get_state(mock_session, state_new)
state_new.properties = state_old.properties
drifts = compare_states(state_old, state_new)
mock_session.run.assert_called_with(state_new.validation_query)
assert ["7", "14", ["21", "28", "35"]] in drifts
assert ["3", "10", ["17", "24", "31"]] not in drifts
def test_basic_drift_detection():
"""
Tests that drift detection works.
"""
data = FileSystem.load("tests/data/test_cli_detectors/detector/1.json")
start_state = StateSchema().load(data)
data = FileSystem.load("tests/data/test_cli_detectors/detector/2.json")
end_state = StateSchema().load(data)
new_results, missing_results = perform_drift_detection(start_state, end_state)
assert ['36', '37', ['38', '39', '40']] in new_results
assert ['7', '14', ['21', '28', '35']] in missing_results
mock_boltstatementresult = MagicMock()
results = [
{key_1: "1", key_2: "8"},
{key_1: "2", key_2: "9"},
{key_1: "3", key_2: "10"},
{key_1: "4", key_2: "11"},
{key_1: "5", key_2: "12"},
{key_1: "6", key_2: "13"},
{key_1: "7", key_2: "14"},
]
mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
mock_boltstatementresult.__iter__.side_effect = results.__iter__
mock_session.run.return_value = mock_boltstatementresult
data = FileSystem.load("tests/data/detectors/test_multiple_expectations.json")
state_old = StateSchema().load(data)
state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
get_state(mock_session, state_new)
state_new.properties = state_old.properties
drifts = compare_states(state_old, state_new)
mock_session.run.assert_called_with(state_new.validation_query)
assert ["7", "14"] in drifts
mock_boltstatementresult = MagicMock()
key = "d.test"
results = [
{key: "1"},
{key: "2"},
{key: "3"},
{key: "4"},
{key: "5"},
{key: "6"},
]
mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
mock_boltstatementresult.__iter__.side_effect = results.__iter__
mock_session.run.return_value = mock_boltstatementresult
data = FileSystem.load("tests/data/detectors/test_expectations.json")
state_old = StateSchema().load(data)
state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
get_state(mock_session, state_new)
drifts = compare_states(state_old, state_new)
mock_session.run.assert_called_with(state_new.validation_query)
assert not drifts
SET person.test2 = {test2},
person.test3 = {test3}
"""
for node in data:
test = node[0]
test2 = node[1]
test3 = node[2]
neo4j_session.run(
ingest_nodes,
test=test,
test2=test2,
test3=test3,
)
query_directory = "tests/data/test_update_detectors/test_detector"
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
storage = FileSystem
file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
file_2 = str(datetime.datetime(2019, 1, 1, 0, 0, 1)) + ".json"
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)
detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
detector_1 = state_serializer.load(detector_1_data)
detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
detector_2 = state_serializer.load(detector_2_data)
assert detector_1.name == detector_2.name
mock_boltstatementresult = MagicMock()
results = [
{key: "1"},
{key: "2"},
{key: "3"},
{key: "4"},
{key: "5"},
{key: "6"},
{key: "7"},
]
mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
mock_boltstatementresult.__iter__.side_effect = results.__iter__
mock_session.run.return_value = mock_boltstatementresult
data = FileSystem.load("tests/data/detectors/test_expectations.json")
state_old = StateSchema().load(data)
state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
get_state(mock_session, state_new)
state_new.properties = state_old.properties
drifts = compare_states(state_old, state_new)
mock_session.run.assert_called_with(state_new.validation_query)
assert drifts
assert ["7"] in drifts
def test_drift_detection_errors():
data = FileSystem.load("tests/data/test_cli_detectors/detector/1.json")
start_state = StateSchema().load(data)
data = FileSystem.load("tests/data/test_cli_detectors/detector/2.json")
end_state = StateSchema().load(data)
start_state.name = "Wrong Name"
with pytest.raises(ValueError):
perform_drift_detection(start_state, end_state)
start_state = StateSchema().load(data)
start_state.properties = ["Incorrect", "Properties"]
with pytest.raises(ValueError):
perform_drift_detection(start_state, end_state)
start_state = StateSchema().load(data)
start_state.validation_query = "Invalid Validation Query"
with pytest.raises(ValueError):
perform_drift_detection(start_state, end_state)
SET person.test2 = {test2},
person.test3 = {test3}
"""
for node in data:
test = node[0]
test2 = node[1]
test3 = node[2]
neo4j_session.run(
ingest_nodes,
test=test,
test2=test2,
test3=test3,
)
query_directory = "tests/data/test_update_detectors/invalid_query"
state_serializer = StateSchema()
storage = FileSystem
file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
with pytest.raises(neobolt.exceptions.CypherSyntaxError):
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
query_directory = "tests/data/test_update_detectors/invalid_template"
with pytest.raises(ValidationError):
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
e,
)
else:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. driftdetect attempted to connect to Neo4j "
"with a username and password. Check your Neo4j server settings to see if the username and "
"password provided to driftdetect are valid credentials."
),
e,
)
return
with neo4j_driver.session() as session:
filename = '.'.join([str(i) for i in time.gmtime()] + ["json"])
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
for query_directory in FileSystem.walk(config.drift_detection_directory):
try:
get_query_state(session, query_directory, state_serializer, FileSystem, filename)
add_shortcut(FileSystem, shortcut_serializer, query_directory, 'most-recent', filename)
except ValidationError as err:
msg = "Unable to create State for directory {}, with data \n{}".format(
query_directory,
err.messages,
)
logger.exception(msg)
except KeyError as err:
msg = f"Could not find {err} field in state template for directory {query_directory}."
logger.exception(msg)
except FileNotFoundError as err:
logger.exception(err)
def run_drift_detection(config):
try:
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
start_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.start_state,
config.start_state,
),
),
)
start_state = state_serializer.load(start_state_data)
end_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.end_state,