Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_instrument_is_started_before_processing_message(
self, message_with_trace_parent, instrument_mock):
callback = Callback(sub_stub)
callback(message_with_trace_parent)
instrument_mock.called_once()
def test_log_does_not_ack_called_message_when_execution_fails(
self, caplog, message_wrapper, published_at
):
@sub(topic="some-cool-topic", prefix="rele")
def crashy_sub_stub(data, **kwargs):
raise ValueError("I am an exception from a sub")
callback = Callback(crashy_sub_stub)
res = callback(message_wrapper)
assert res is None
message_wrapper.ack.assert_not_called()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
"message for rele-some-cool-topic - "
"crashy_sub_stub: ValueError"
)
assert failed_log.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "failed",
def test_span_as_child_of_parent_context_is_started_before_processing_message(
self, message_with_trace_parent, extract_mock,
start_active_span_mock):
parent_context_mock = Mock(spec=SpanContext)
extract_mock.return_value = parent_context_mock
callback = Callback(sub_stub)
callback(message_with_trace_parent)
start_active_span_mock.assert_called_with(
ANY,
child_of=parent_context_mock,
finish_on_close=False
)
def test_published_time_as_message_attribute(self, message_wrapper, caplog):
callback = Callback(sub_published_time_type)
callback(message_wrapper)
success_log = caplog.records[-2]
assert success_log.message == ""
def test_parent_trace_is_extracted_before_processing_message(
self, message_with_trace_parent, extract_mock):
callback = Callback(sub_stub)
callback(message_with_trace_parent)
assert ELASTIC_APM_TRACE_PARENT in extract_mock.call_args[0][1].keys()
def test_log_acks_called_message_when_not_json_serializable(
self, caplog, message_wrapper_invalid_json, published_at
):
callback = Callback(sub_stub)
res = callback(message_wrapper_invalid_json)
assert res is None
message_wrapper_invalid_json.ack.assert_called_once()
failed_log = caplog.records[-1]
assert failed_log.message == (
"Exception raised while processing "
"message for rele-some-cool-topic - "
"sub_stub: JSONDecodeError"
)
assert failed_log.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "failed",
def test_log_start_processing_when_callback_called(
self, caplog, message_wrapper, published_at
):
with caplog.at_level(logging.DEBUG):
callback = Callback(sub_stub)
res = callback(message_wrapper)
assert res == 123
log1 = caplog.records[0]
assert log1.message == (
"Start processing message for " "rele-some-cool-topic - sub_stub"
)
assert log1.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "received",
"subscription": "rele-some-cool-topic",
"attributes": {"lang": "es", "published_at": str(published_at),},
},
def test_log_when_execution_is_succesful(
self, message_wrapper, caplog, published_at
):
callback = Callback(sub_stub)
callback(message_wrapper)
success_log = caplog.records[-1]
assert success_log.message == (
"Successfully processed message for " "rele-some-cool-topic - sub_stub"
)
assert success_log.metrics == {
"name": "subscriptions",
"data": {
"agent": "rele",
"topic": "some-cool-topic",
"status": "succeeded",
"subscription": "rele-some-cool-topic",
"duration_seconds": pytest.approx(0.5, abs=0.5),
"attributes": {"lang": "es", "published_at": str(published_at),},
},
def test_old_django_connections_closed_when_middleware_is_used(
self, mock_close_old_connections, message_wrapper, config
):
config.middleware = ["rele.contrib.DjangoDBMiddleware"]
register_middleware(config)
callback = Callback(sub_stub)
res = callback(message_wrapper)
assert res == 123
assert mock_close_old_connections.call_count == 2
def test_acks_message_when_execution_successful(self, caplog, message_wrapper):
with caplog.at_level(logging.DEBUG):
callback = Callback(sub_stub)
res = callback(message_wrapper)
assert res == 123
message_wrapper.ack.assert_called_once()
assert len(caplog.records) == 3
message_wrapper_log = caplog.records[1]
assert message_wrapper_log.message == (
"I am a task doing " "stuff with ID 123 (es)"
)