Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_retry(
self, publisher, get_message_from_queue, rabbit_config
):
mock_publish = MagicMock(__name__="", __doc__="", __module__="")
mock_publish.side_effect = RecoverableConnectionError("error")
expected_retries = publisher.retry_policy['max_retries'] + 1
# with retry
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(OperationalError):
publisher.publish("payload", retry=True)
assert mock_publish.call_count == 1 + expected_retries
mock_publish.reset_mock()
# retry disabled
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(RecoverableConnectionError):
publisher.publish("payload", retry=False)
assert mock_publish.call_count == 1
mock_publish = MagicMock(__name__="", __doc__="", __module__="")
mock_publish.side_effect = RecoverableConnectionError("error")
expected_retries = publisher.retry_policy['max_retries'] + 1
# with retry
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(OperationalError):
publisher.publish("payload", retry=True)
assert mock_publish.call_count == 1 + expected_retries
mock_publish.reset_mock()
# retry disabled
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(RecoverableConnectionError):
publisher.publish("payload", retry=False)
assert mock_publish.call_count == 1
def test_retry_policy(
self, publisher, get_message_from_queue, rabbit_config
):
mock_publish = MagicMock(__name__="", __doc__="", __module__="")
mock_publish.side_effect = RecoverableConnectionError("error")
retry_policy = {
'max_retries': 5
}
expected_retries = retry_policy['max_retries'] + 1
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(OperationalError):
publisher.publish("payload", retry_policy=retry_policy)
assert mock_publish.call_count == 1 + expected_retries
def _read(loop):
if not connection.connected:
raise RecoverableConnectionError('Socket was disconnected')
try:
drain_events(timeout=0)
except timeout:
return
except error as exc:
if exc.errno in _unavail:
return
raise
loop.call_soon(_read, loop)