Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_non_windows(source):
"""
Read from Emotiv hid device.
:param source: Emotiv hid device
:return: Next encrypted packet from Emotiv device.
"""
# Doesn't seem to matter how big we make the buffer 32 returned every time, 33 for other platforms
# Set timeout for 1 second, to help with thread shutdown.
data = validate_data(hidapi.hid_read_timeout(source, 34, 1000))
if data is not None:
return ''.join(map(chr, data[1:]))
def read_non_windows(source, new_format=False):
"""
Read from Emotiv hid device.
:param source: Emotiv hid device
:param new_format: Read more data?
:return: Next encrypted packet from Emotiv device.
"""
# Doesn't seem to matter how big we make the buffer 32 returned every time, 33 for other platforms
# Set timeout for 1 second, to help with thread shutdown.
if new_format:
data = validate_data(hidapi.hid_read_timeout(source, 64, 1000), new_format)
else:
data = validate_data(hidapi.hid_read_timeout(source, 34, 1000), new_format)
if data is not None:
return ''.join(map(chr, data[1:]))
def cor_nb(self, timeout=100) -> bool:
"""
[RX] Detects if there's a signal from the Radio.
Non-blocking, returns after a timeout even if there's no signal from
the Radio.
"""
cor_status: bool = False
hid_read = hidapi.hid_read_timeout(
self.hid_device, roip.READ_SIZE, timeout)
self._logger.debug('read="%s"', hid_read)
if roip.COR_START in hid_read:
cor_status = True
elif roip.COR_STOP in hid_read:
cor_status = False
self._logger.debug('cor_status="%s"', cor_status)
return cor_status
def read_non_windows(source, new_format=False):
"""
Read from Emotiv hid device.
:param source: Emotiv hid device
:param new_format: Read more data?
:return: Next encrypted packet from Emotiv device.
"""
# Doesn't seem to matter how big we make the buffer 32 returned every time, 33 for other platforms
# Set timeout for 1 second, to help with thread shutdown.
if new_format:
data = validate_data(hidapi.hid_read_timeout(source, 64, 1000), new_format)
else:
data = validate_data(hidapi.hid_read_timeout(source, 34, 1000), new_format)
if data is not None:
return ''.join(map(chr, data[1:]))