Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read(handle, timeout):
"""Read an incoming packet from the receiver.
:returns: a tuple of (report_id, devnumber, data), or `None`.
:raises NoReceiver: if the receiver is no longer available, i.e. has
been physically removed from the machine, or the kernel driver has been
unloaded. The handle will be closed automatically.
"""
try:
# convert timeout to milliseconds, the hidapi expects it
timeout = int(timeout * 1000)
data = _hid.read(int(handle), _MAX_READ_SIZE, timeout)
except Exception as reason:
_log.error("read failed, assuming handle %r no longer available", handle)
close(handle)
raise NoReceiver(reason=reason)
if data:
assert isinstance(data, bytes), (repr(data), type(data))
report_id = ord(data[:1])
assert ((report_id & 0xF0 == 0) or
(report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE) or
(report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE) or
(report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)), \
"unexpected message size: report_id %02X message %s" % (report_id, _strhex(data))
if report_id & 0xF0 == 0x00:
# These all should be normal HID reports that shouldn't really be reported in debugging
# if _log.isEnabledFor(_DEBUG):
def _continuous_read(handle, timeout=2000):
while True:
try:
reply = _hid.read(handle, 128, timeout)
except OSError as e:
_error("Read failed, aborting: " + str(e), True)
break
assert reply is not None
if reply:
_print('>>', reply, True)
def _skip_incoming(handle, ihandle, notifications_hook):
"""Read anything already in the input buffer.
Used by request() and ping() before their write.
"""
while True:
try:
# read whatever is already in the buffer, if any
data = _hid.read(ihandle, _MAX_READ_SIZE, 0)
except Exception as reason:
_log.error("read failed, assuming receiver %s no longer available", handle)
close(handle)
raise NoReceiver(reason=reason)
if data:
assert isinstance(data, bytes), (repr(data), type(data))
report_id = ord(data[:1])
if _log.isEnabledFor(_DEBUG):
assert ((report_id & 0xF0 == 0) or
(report_id == 0x10 and len(data) == _SHORT_MESSAGE_SIZE) or
(report_id == 0x11 and len(data) == _LONG_MESSAGE_SIZE) or
(report_id == 0x20 and len(data) == _MEDIUM_MESSAGE_SIZE)), \
"unexpected message size: report_id %02X message %s" % (report_id, _strhex(data))
if notifications_hook and report_id & 0xF0:
n = make_notification(ord(data[1:2]), data[2:])