Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def lora():
"""Instantiate Rak811 class though fixture."""
with patch('rak811.rak811.Rak811Serial', autospec=True):
rak811 = Rak811()
yield rak811
rak811.close()
emulate_rak_input(mock_serial, 1, [
(0, b'at+recv=2,0,0\r\n'),
(0, b'Welcome to RAK811\r\n'),
(0, b'at+recv=0,0,0\r\n'),
])
rs = Rak811Serial()
event = rs.get_events()
assert len(event) == 2
assert event.pop() == 'at+recv=0,0,0'
assert event.pop() == 'at+recv=2,0,0'
rs.close()
# Event timeout
emulate_rak_input(mock_serial, 1, [
])
rs = Rak811Serial(event_timeout=1)
with raises(Rak811TimeoutError,
match='Timeout while waiting for event'):
rs.get_events()
rs.close()
def test_get_response_event(mock_serial):
"""Test response / event sequence."""
# All at once
emulate_rak_input(mock_serial, 1, [
(0, b'Welcome 1\r\n'),
(0, b'OK\r\n'),
(0, b'Welcome 2\r\n'),
(0, b'at+recv=2,0,0\r\n'),
(0, b'Welcome 3\r\n'),
(0, b'at+recv=0,0,0\r\n'),
])
rs = Rak811Serial()
assert rs.get_response() == 'OK'
event = rs.get_events()
assert len(event) == 2
assert event.pop() == 'at+recv=0,0,0'
assert event.pop() == 'at+recv=2,0,0'
rs.close()
# Same scenario with delay between response and event
emulate_rak_input(mock_serial, 1, [
(0, b'Welcome 1\r\n'),
(0, b'OK\r\n'),
(0, b'Welcome 2\r\n'),
(1, b'at+recv=2,0,0\r\n'),
(0, b'Welcome 3\r\n'),
(0, b'at+recv=0,0,0\r\n'),
])
def test_send_string(mock_serial):
"""Test Rak811Serial.send_string."""
rs = Rak811Serial()
rs.send_string('Hello world')
mock_serial.return_value.write.assert_called_once_with(b'Hello world')
rs.close()
def test_send_command(mock_serial):
"""Test Rak811Serial.send_command."""
rs = Rak811Serial()
rs.send_command('RESET')
mock_serial.return_value.write.assert_called_once_with(b'at+RESET\r\n')
rs.close()
(0.5, b'\r\n'),
(0, b'\r\n'),
(0, b'OK\r\n'),
])
rs = Rak811Serial()
assert rs.get_response() == 'OK'
rs.close()
# Handle non-ASCII characters
emulate_rak_input(mock_serial, 1, [
(0, b'Non ASCII: \xde\xad\xbe\xef\r\n'),
(0.5, b'\r\n'),
(0, b'\r\n'),
(0, b'OK\r\n'),
])
rs = Rak811Serial()
assert rs.get_response() == 'OK'
rs.close()
# Response timeout
emulate_rak_input(mock_serial, 1, [
])
rs = Rak811Serial(response_timeout=1)
with raises(Rak811TimeoutError,
match='Timeout while waiting for response'):
rs.get_response()
rs.close()
def test_instantiate_default(mock_serial):
"""Test that Rak811Serial can be instantiated.
Check for basic initialisation and teardown of the serial interface.
"""
rs = Rak811Serial()
# Test default parameters are used
mock_serial.assert_called_once_with(port=PORT,
baudrate=BAUDRATE,
timeout=TIMEOUT)
# Class initialization
mock_serial.return_value.reset_input_buffer.assert_called_once()
assert rs._alive
# Test tear down
rs.close()
mock_serial.return_value.close.assert_called_once()
assert not rs._alive
# Handle non-ASCII characters
emulate_rak_input(mock_serial, 1, [
(0, b'Non ASCII: \xde\xad\xbe\xef\r\n'),
(0.5, b'\r\n'),
(0, b'\r\n'),
(0, b'OK\r\n'),
])
rs = Rak811Serial()
assert rs.get_response() == 'OK'
rs.close()
# Response timeout
emulate_rak_input(mock_serial, 1, [
])
rs = Rak811Serial(response_timeout=1)
with raises(Rak811TimeoutError,
match='Timeout while waiting for response'):
rs.get_response()
rs.close()
def test_instantiate_custom(mock_serial):
"""Test that Rak811Serial can be instantiated - custom parameters."""
port = '/dev/ttyAMA0'
timeout = 5
bytesize = EIGHTBITS
rs = Rak811Serial(port=port, timeout=timeout, bytesize=bytesize)
mock_serial.assert_called_once_with(
port=port,
baudrate=BAUDRATE,
timeout=timeout,
bytesize=bytesize
)
rs.close()
def test_get_events(mock_serial):
"""Test Rak811Serial.get_events."""
# Single command
emulate_rak_input(mock_serial, 1, [
(0, b'at+recv=8,0,0\r\n'),
])
rs = Rak811Serial()
event = rs.get_events()
assert len(event) == 1
assert event.pop() == 'at+recv=8,0,0'
rs.close()
# Multiple commands
emulate_rak_input(mock_serial, 1, [
(0, b'at+recv=2,0,0\r\n'),
(0, b'Welcome to RAK811\r\n'),
(0, b'at+recv=0,0,0\r\n'),
])
rs = Rak811Serial()
event = rs.get_events()
assert len(event) == 2
assert event.pop() == 'at+recv=0,0,0'
assert event.pop() == 'at+recv=2,0,0'