Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def send_attribute_report(hass, cluster, attrid, value):
"""Cause the sensor to receive an attribute report from the network.
This is to simulate the normal device communication that happens when a
device is paired to the zigbee network.
"""
attr = make_attribute(attrid, value)
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
cluster.handle_message(hdr, [[attr]])
await hass.async_block_till_done()
def __init__(
self,
frame_control: FrameControl,
tsn: Union[int, t.uint8_t] = 0,
command_id: Union[Command, int, t.uint8_t] = 0,
manufacturer: Optional[Union[int, t.uint16_t]] = None,
) -> None:
"""Initialize ZCL Frame instance."""
self._frc = frame_control
if frame_control.is_general:
self._cmd_id = Command(command_id)
else:
self._cmd_id = t.uint8_t(command_id)
self._manufacturer = manufacturer
if manufacturer is not None:
self.frame_control.is_manufacturer_specific = True
self._tsn = t.uint8_t(tsn)
general_command, foundation.Command.Configure_Reporting
)
_read_attributes = functools.partialmethod(
general_command, foundation.Command.Read_Attributes
)
_read_attributes_rsp = functools.partialmethod(
general_command, foundation.Command.Read_Attributes_rsp
)
_write_attributes = functools.partialmethod(
general_command, foundation.Command.Write_Attributes
)
_write_attributes_undivided = functools.partialmethod(
general_command, foundation.Command.Write_Attributes_Undivided
)
discover_attributes = functools.partialmethod(
general_command, foundation.Command.Discover_Attributes
)
discover_attributes_extended = functools.partialmethod(
general_command, foundation.Command.Discover_Attribute_Extended
)
discover_commands_received = functools.partialmethod(
general_command, foundation.Command.Discover_Commands_Received
)
discover_commands_generated = functools.partialmethod(
general_command, foundation.Command.Discover_Commands_Generated
)
class ClusterPersistingListener:
def __init__(self, applistener, cluster):
self._applistener = applistener
self._cluster = cluster
True,
),
Command.Discover_Attribute_Extended: ((t.uint16_t, t.uint8_t), False),
Command.Discover_Attribute_Extended_rsp: (
(t.Bool, t.List(DiscoverAttributesExtendedResponseRecord)),
True,
),
Command.Discover_Commands_Generated: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Generated_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Discover_Commands_Received: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Received_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Read_Attributes: ((t.List(t.uint16_t),), False),
Command.Read_Attributes_rsp: ((t.List(ReadAttributeRecord),), True),
# Command.Read_Attributes_Structured: ((, ), False),
Command.Read_Reporting_Configuration: ((t.List(ReadReportingConfigRecord),), False),
Command.Read_Reporting_Configuration_rsp: (
(t.List(AttributeReportingConfig),),
True,
),
Command.Report_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes_No_Response: ((t.List(Attribute),), False),
Command.Write_Attributes_rsp: ((WriteAttributesResponse,), True),
# Command.Write_Attributes_Structured: ((, ), False),
# Command.Write_Attributes_Structured_rsp: ((, ), True),
Command.Write_Attributes_Undivided: ((t.List(Attribute),), False),
}
class FrameType(t.enum8):
"""ZCL Frame Type."""
Command.Configure_Reporting: ((t.List(AttributeReportingConfig),), False),
Command.Configure_Reporting_rsp: ((ConfigureReportingResponse,), True),
Command.Default_Response: ((t.uint8_t, Status), True),
Command.Discover_Attributes: ((t.uint16_t, t.uint8_t), False),
Command.Discover_Attributes_rsp: (
(t.Bool, t.List(DiscoverAttributesResponseRecord)),
True,
),
Command.Discover_Attribute_Extended: ((t.uint16_t, t.uint8_t), False),
Command.Discover_Attribute_Extended_rsp: (
(t.Bool, t.List(DiscoverAttributesExtendedResponseRecord)),
True,
),
Command.Discover_Commands_Generated: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Generated_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Discover_Commands_Received: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Received_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Read_Attributes: ((t.List(t.uint16_t),), False),
Command.Read_Attributes_rsp: ((t.List(ReadAttributeRecord),), True),
# Command.Read_Attributes_Structured: ((, ), False),
Command.Read_Reporting_Configuration: ((t.List(ReadReportingConfigRecord),), False),
Command.Read_Reporting_Configuration_rsp: (
(t.List(AttributeReportingConfig),),
True,
),
Command.Report_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes_No_Response: ((t.List(Attribute),), False),
Command.Write_Attributes_rsp: ((WriteAttributesResponse,), True),
# Command.Write_Attributes_Structured: ((, ), False),
# Command.Write_Attributes_Structured_rsp: ((, ), True),
Command.Write_Attributes_Undivided: ((t.List(Attribute),), False),
Discover_Attributes = 0x0C
Discover_Attributes_rsp = 0x0D
# Read_Attributes_Structured = 0x0e
# Write_Attributes_Structured = 0x0f
# Write_Attributes_Structured_rsp = 0x10
Discover_Commands_Received = 0x11
Discover_Commands_Received_rsp = 0x12
Discover_Commands_Generated = 0x13
Discover_Commands_Generated_rsp = 0x14
Discover_Attribute_Extended = 0x15
Discover_Attribute_Extended_rsp = 0x16
COMMANDS = {
# id: (params, is_response)
Command.Configure_Reporting: ((t.List(AttributeReportingConfig),), False),
Command.Configure_Reporting_rsp: ((ConfigureReportingResponse,), True),
Command.Default_Response: ((t.uint8_t, Status), True),
Command.Discover_Attributes: ((t.uint16_t, t.uint8_t), False),
Command.Discover_Attributes_rsp: (
(t.Bool, t.List(DiscoverAttributesResponseRecord)),
True,
),
Command.Discover_Attribute_Extended: ((t.uint16_t, t.uint8_t), False),
Command.Discover_Attribute_Extended_rsp: (
(t.Bool, t.List(DiscoverAttributesExtendedResponseRecord)),
True,
),
Command.Discover_Commands_Generated: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Generated_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Discover_Commands_Received: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Received_rsp: ((t.Bool, t.List(t.uint8_t)), True),
(t.Bool, t.List(DiscoverAttributesExtendedResponseRecord)),
True,
),
Command.Discover_Commands_Generated: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Generated_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Discover_Commands_Received: ((t.uint8_t, t.uint8_t), False),
Command.Discover_Commands_Received_rsp: ((t.Bool, t.List(t.uint8_t)), True),
Command.Read_Attributes: ((t.List(t.uint16_t),), False),
Command.Read_Attributes_rsp: ((t.List(ReadAttributeRecord),), True),
# Command.Read_Attributes_Structured: ((, ), False),
Command.Read_Reporting_Configuration: ((t.List(ReadReportingConfigRecord),), False),
Command.Read_Reporting_Configuration_rsp: (
(t.List(AttributeReportingConfig),),
True,
),
Command.Report_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes: ((t.List(Attribute),), False),
Command.Write_Attributes_No_Response: ((t.List(Attribute),), False),
Command.Write_Attributes_rsp: ((WriteAttributesResponse,), True),
# Command.Write_Attributes_Structured: ((, ), False),
# Command.Write_Attributes_Structured_rsp: ((, ), True),
Command.Write_Attributes_Undivided: ((t.List(Attribute),), False),
}
class FrameType(t.enum8):
"""ZCL Frame Type."""
GLOBAL_COMMAND = 0b00
CLUSTER_COMMAND = 0b01
RESERVED_2 = 0b10
RESERVED_3 = 0b11
True, command_id, schema, *args, manufacturer=manufacturer, tsn=tsn
)
return self.request(
True,
command_id,
schema,
*args,
manufacturer=manufacturer,
expect_reply=expect_reply,
tries=tries,
tsn=tsn,
)
_configure_reporting = functools.partialmethod(
general_command, foundation.Command.Configure_Reporting
)
_read_attributes = functools.partialmethod(
general_command, foundation.Command.Read_Attributes
)
_read_attributes_rsp = functools.partialmethod(
general_command, foundation.Command.Read_Attributes_rsp
)
_write_attributes = functools.partialmethod(
general_command, foundation.Command.Write_Attributes
)
_write_attributes_undivided = functools.partialmethod(
general_command, foundation.Command.Write_Attributes_Undivided
)
discover_attributes = functools.partialmethod(
general_command, foundation.Command.Discover_Attributes
)
def handle_cluster_general_request(
self, hdr: foundation.ZCLHeader, args: List
) -> None:
if hdr.command_id == foundation.Command.Report_Attributes:
valuestr = ", ".join(
[
f"{self.attributes.get(a.attrid, [a.attrid])[0]}={a.value.value}"
for a in args[0]
]
)
self.debug("Attribute report received: %s", valuestr)
for attr in args[0]:
try:
value = self.attributes[attr.attrid][1](attr.value.value)
except KeyError:
value = attr.value.value
except ValueError:
self.debug(
"Couldn't normalize %a attribute with %s value",
attr.attrid,