Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_struct_field_dependencies():
class TestStruct(t.Struct):
foo: t.uint8_t
status: Status
bar: t.uint8_t = t.StructField(requires=lambda s: s.status == Status.SUCCESS)
baz1: t.uint8_t
baz2: typing.Optional[t.uint8_t]
"""
# bar must be defined because status is SUCCESS
with pytest.raises(ValueError):
TestStruct(foo=1, status=Status.SUCCESS, baz1=2)
"""
# Status is FAILURE so bar is not defined
TestStruct(foo=1, status=Status.FAILURE, baz1=2)
"""
# In fact, it cannot be defined
with pytest.raises(ValueError):
async def async_unlock(hass, cluster, entity_id):
"""Test lock functionality from hass."""
from zigpy.zcl.foundation import Status
with patch("zigpy.zcl.Cluster.request", return_value=mock_coro([Status.SUCCESS])):
# lock via UI
await hass.services.async_call(
DOMAIN, "unlock", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == UNLOCK_DOOR
async def async_lock(hass, cluster, entity_id):
"""Test lock functionality from hass."""
from zigpy.zcl.foundation import Status
with patch("zigpy.zcl.Cluster.request", return_value=mock_coro([Status.SUCCESS])):
# lock via UI
await hass.services.async_call(
DOMAIN, "lock", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == LOCK_DOOR
async def test_ota_handle_image_block_no_img(ota_cluster):
ota_cluster.image_block_response = mock.CoroutineMock()
await _ota_image_block(ota_cluster, has_image=False, correct_version=True)
assert ota_cluster.image_block_response.call_count == 1
assert (
ota_cluster.image_block_response.call_args[0][0] == zcl.foundation.Status.ABORT
)
assert len(ota_cluster.image_block_response.call_args[0]) == 1
ota_cluster.image_block_response.reset_mock()
await _ota_image_block(ota_cluster, has_image=False, correct_version=False)
assert ota_cluster.image_block_response.call_count == 1
assert (
ota_cluster.image_block_response.call_args[0][0] == zcl.foundation.Status.ABORT
)
assert len(ota_cluster.image_block_response.call_args[0]) == 1
async def write_attributes(
self, attributes: Dict[Union[str, int], Any], manufacturer: Optional[int] = None
) -> List:
args = self._write_attr_records(attributes)
result = await self._write_attributes(args, manufacturer=manufacturer)
if not isinstance(result[0], list):
return result
records = result[0]
if len(records) == 1 and records[0].status == foundation.Status.SUCCESS:
for attr_rec in args:
self._attr_cache[attr_rec.attrid] = attr_rec.value.value
else:
failed = [rec.attrid for rec in records]
for attr_rec in args:
if attr_rec.attrid not in failed:
self._attr_cache[attr_rec.attrid] = attr_rec.value.value
return result
def deserialize(cls, data):
r = cls()
r.status, data = Status.deserialize(data)
if r.status == Status.SUCCESS:
r.direction, data = t.Optional(t.uint8_t).deserialize(data)
if r.direction is not None:
r.direction = ReportingDirection(r.direction)
r.attrid, data = t.Optional(t.uint16_t).deserialize(data)
return r, data
r.direction, data = ReportingDirection.deserialize(data)
r.attrid, data = t.uint16_t.deserialize(data)
return r, data
if brightness is None and self._off_brightness is not None:
brightness = self._off_brightness
t_log = {}
if (
brightness is not None or transition
) and self._supported_features & light.SUPPORT_BRIGHTNESS:
if brightness is not None:
level = min(254, brightness)
else:
level = self._brightness or 254
result = await self._level_channel.move_to_level_with_on_off(
level, duration
)
t_log["move_to_level_with_on_off"] = result
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
self.debug("turned on: %s", t_log)
return
self._state = bool(level)
if level:
self._brightness = level
if brightness is None or brightness:
# since some lights don't always turn on with move_to_level_with_on_off,
# we should call the on command on the on_off cluster if brightness is not 0.
result = await self._on_off_channel.on()
t_log["on_off"] = result
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
self.debug("turned on: %s", t_log)
return
self._state = True
if (
async def remove_from_group(self, grp_id: int):
try:
res = await self.groups.remove(grp_id)
except AttributeError:
self.debug("Cannot remove 0x%04x group, no groups cluster", grp_id)
return ZCLStatus.FAILURE
if res[0] not in (ZCLStatus.SUCCESS, ZCLStatus.NOT_FOUND):
self.debug("Couldn't remove to 0x%04x group: %s", grp_id, res[0])
return res[0]
if grp_id in self.device.application.groups:
self.device.application.groups[grp_id].remove_member(self)
return res[0]