Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rx_get_no_data(mock_events, lora):
"""Test rx_get with no data."""
mock_events.side_effect = Rak811TimeoutError()
lora.rx_get(10)
mock_events.assert_called_once()
assert lora.nb_downlinks == 0
assert lora.get_downlink() is None
# Handle non-ASCII characters
emulate_rak_input(mock_serial, 1, [
(0, b'Non ASCII: \xde\xad\xbe\xef\r\n'),
(0.5, b'\r\n'),
(0, b'\r\n'),
(0, b'OK\r\n'),
])
rs = Rak811Serial()
assert rs.get_response() == 'OK'
rs.close()
# Response timeout
emulate_rak_input(mock_serial, 1, [
])
rs = Rak811Serial(response_timeout=1)
with raises(Rak811TimeoutError,
match='Timeout while waiting for response'):
rs.get_response()
rs.close()
(0, b'at+recv=2,0,0\r\n'),
(0, b'Welcome to RAK811\r\n'),
(0, b'at+recv=0,0,0\r\n'),
])
rs = Rak811Serial()
event = rs.get_events()
assert len(event) == 2
assert event.pop() == 'at+recv=0,0,0'
assert event.pop() == 'at+recv=2,0,0'
rs.close()
# Event timeout
emulate_rak_input(mock_serial, 1, [
])
rs = Rak811Serial(event_timeout=1)
with raises(Rak811TimeoutError,
match='Timeout while waiting for event'):
rs.get_events()
rs.close()
def get_response(self, timeout=None):
"""Get response from module.
This is a blocking call: it will return a response line or raise
Rak811TimeoutError if a response line is not received in time.
"""
if timeout is None:
timeout = self._response_timeout
with self._cv_serial:
while len(self._read_buffer) == 0:
success = self._cv_serial.wait(timeout)
if not success:
raise Rak811TimeoutError(
'Timeout while waiting for response'
)
response = self._read_buffer.pop(0)
return response
def get_events(self, timeout=None):
"""Get events from module.
This is a blocking call: it will return a list of events or raise
Rak811TimeoutError if no event line is received in time.
"""
if timeout is None:
timeout = self._event_timeout
with self._cv_serial:
while len(self._read_buffer) == 0:
success = self._cv_serial.wait(timeout)
if not success:
raise Rak811TimeoutError('Timeout while waiting for event')
event = self._read_buffer
self._read_buffer = []
return event
def rx_get(self, timeout):
"""Get LoraP2P message.
This is a blocking call: wait until we receive a message or we reach a
timeout.
The downlink receive buffer is populated, actual data is retrieved with
get_downlink().
"""
try:
self._process_events(timeout)
except Rak811TimeoutError:
pass