Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_struct_init():
class TestStruct(t.Struct):
a: t.uint8_t
b: t.uint16_t
c: t.CharacterString
ts = TestStruct(a=1, b=0x0100, c="TestStruct")
assert repr(ts)
assert isinstance(ts.a, t.uint8_t)
assert isinstance(ts.b, t.uint16_t)
assert isinstance(ts.c, t.CharacterString)
assert ts.a == 1
assert ts.b == 0x100
assert ts.c == "TestStruct"
ts2, remaining = TestStruct.deserialize(b"\x01\x00\x01\x0aTestStruct")
assert not remaining
assert ts == ts2
assert ts.serialize() == ts2.serialize()
ts3 = ts2.replace(b=0x0100)
assert ts3 == ts2
assert ts3.serialize() == ts2.serialize()
ts4 = ts2.replace(b=0x0101)
assert ts4 != ts2
def test_size_prefixed_simple_descriptor():
sd = types.SizePrefixedSimpleDescriptor()
sd.endpoint = t.uint8_t(1)
sd.profile = t.uint16_t(2)
sd.device_type = t.uint16_t(3)
sd.device_version = t.uint8_t(4)
sd.input_clusters = t.LVList(t.uint16_t)([t.uint16_t(5), t.uint16_t(6)])
sd.output_clusters = t.LVList(t.uint16_t)([t.uint16_t(7), t.uint16_t(8)])
ser = sd.serialize()
assert ser[0] == len(ser) - 1
sd2, data = types.SizePrefixedSimpleDescriptor.deserialize(ser + b"extra")
assert sd.input_clusters == sd2.input_clusters
assert sd.output_clusters == sd2.output_clusters
assert isinstance(sd2, types.SizePrefixedSimpleDescriptor)
assert data == b"extra"
def data(tag_id, payload=b""):
r = t.uint16_t(tag_id).serialize()
r += t.uint32_t(len(payload)).serialize()
return r + payload
raw = data[: cls._size].split(b"\x00")[0]
return cls(raw.decode("utf8", errors="replace")), data[cls._size :]
def serialize(self):
return self.encode("utf8").ljust(self._size, b"\x00")
class OTAImageHeader(t.Struct):
MAGIC_VALUE = 0x0BEEF11E
OTA_HEADER = MAGIC_VALUE.to_bytes(4, "little")
upgrade_file_id: t.uint32_t
header_version: t.uint16_t
header_length: t.uint16_t
field_control: t.uint16_t
manufacturer_id: t.uint16_t
image_type: t.uint16_t
file_version: t.uint32_t
stack_version: t.uint16_t
header_string: HeaderString
image_size: t.uint32_t
@property
def security_credential_version_present(self) -> bool:
if self.field_control is None:
return None
return bool(self.field_control & 0x01)
@property
def device_specific_file(self) -> bool:
if self.field_control is None:
return None
0x0402,
0x0B05,
SmartthingsRelativeHumidityCluster,
]
}
}
}
class SmartThingsAccelCluster(CustomCluster):
cluster_id = 0xFC02
name = "Smartthings Accelerometer"
ep_attribute = "accelerometer"
attributes = {
0x0000: ("motion_threshold_multiplier", t.uint8_t),
0x0002: ("motion_threshold", t.uint16_t),
0x0010: ("acceleration", t.bitmap8), # acceleration detected
0x0012: ("x_axis", t.int16s),
0x0013: ("y_axis", t.int16s),
0x0014: ("z_axis", t.int16s),
}
client_commands = {}
server_commands = {}
class SmartthingsMultiPurposeSensor(CustomDevice):
signature = {
"endpoints": {
#
def deserialize(cls, data):
r = cls()
r.addrmode, data = data[0], data[1:]
if r.addrmode == 0x01:
r.nwk, data = t.uint16_t.deserialize(data)
elif r.addrmode == 0x03:
r.ieee, data = t.EUI64.deserialize(data)
r.endpoint, data = t.uint8_t.deserialize(data)
else:
raise ValueError("Invalid MultiAddress - unknown address mode")
return r, data
0x0002: ("current_max_demand_delivered", t.uint48_t),
0x0003: ("current_max_demand_received", t.uint48_t),
0x0004: ("dft_summ", t.uint48_t),
0x0005: ("daily_freeze_time", t.uint16_t),
0x0006: ("power_factor", t.int8s),
0x0007: ("reading_snapshot_time", t.uint32_t),
0x0008: ("current_max_demand_deliverd_time", t.uint32_t),
0x0009: ("current_max_demand_received_time", t.uint32_t),
0x000A: ("default_update_period", t.uint8_t),
0x000B: ("fast_poll_update_period", t.uint8_t),
0x000C: ("current_block_period_consump_delivered", t.uint48_t),
0x000D: ("daily_consump_target", t.uint24_t),
0x000E: ("current_block", t.enum8),
0x000F: ("profile_interval_period", t.enum8),
# 0x0010: ('interval_read_reporting_period', UNKNOWN),
0x0011: ("preset_reading_time", t.uint16_t),
0x0012: ("volume_per_report", t.uint16_t),
0x0013: ("flow_restriction", t.uint8_t),
0x0014: ("supply_status", t.enum8),
0x0015: ("current_in_energy_carrier_summ", t.uint48_t),
0x0016: ("current_out_energy_carrier_summ", t.uint48_t),
0x0017: ("inlet_tempreature", t.int24s),
0x0018: ("outlet_tempreature", t.int24s),
0x0019: ("control_tempreature", t.int24s),
0x001A: ("current_in_energy_carrier_demand", t.int24s),
0x001B: ("current_out_energy_carrier_demand", t.int24s),
0x001D: ("current_block_period_consump_received", t.uint48_t),
0x001E: ("current_block_received", t.uint48_t),
# 0x0100: ('change_reporting_profile', UNKNOWN),
0x0100: ("current_tier1_summ_delivered", t.uint48_t),
0x0101: ("current_tier1_summ_received", t.uint48_t),
0x0102: ("current_tier2_summ_delivered", t.uint48_t),
0x001B: ("primary3_intensity", t.uint8_t),
# Additional Defined Primaries Information
0x0020: ("primary4_x", t.uint16_t),
0x0021: ("primary4_y", t.uint16_t),
0x0022: ("primary4_intensity", t.uint8_t),
0x0024: ("primary5_x", t.uint16_t),
0x0025: ("primary5_y", t.uint16_t),
0x0026: ("primary5_intensity", t.uint8_t),
0x0028: ("primary6_x", t.uint16_t),
0x0029: ("primary6_y", t.uint16_t),
0x002A: ("primary6_intensity", t.uint8_t),
# Defined Color Point Settings
0x0030: ("white_point_x", t.uint16_t),
0x0031: ("white_point_y", t.uint16_t),
0x0032: ("color_point_r_x", t.uint16_t),
0x0033: ("color_point_r_y", t.uint16_t),
0x0034: ("color_point_r_intensity", t.uint8_t),
0x0036: ("color_point_g_x", t.uint16_t),
0x0037: ("color_point_g_y", t.uint16_t),
0x0038: ("color_point_g_intensity", t.uint8_t),
0x003A: ("color_point_b_x", t.uint16_t),
0x003B: ("color_point_b_y", t.uint16_t),
0x003C: ("color_point_b_intensity", t.uint8_t),
# ...
0x4000: ("enhanced_current_hue", t.uint16_t),
0x4001: ("enhanced_color_mode", t.enum8),
0x4002: ("color_loop_active", t.uint8_t),
0x4003: ("color_loop_direction", t.uint8_t),
0x4004: ("color_loop_time", t.uint16_t),
0x4005: ("color_loop_start_enhanced_hue", t.uint16_t),
0x4006: ("color_loop_stored_enhanced_hue", t.uint16_t),
0x400A: ("color_capabilities", t.bitmap16),
0x0001: ("multiple_scheduling", t.uint8_t),
0x0002: ("energy_formatting", t.bitmap8),
0x0003: ("energy_remote", t.Bool),
0x0004: ("schedule_mode", t.bitmap8),
}
class ScheduleRecord(t.Struct):
phase_id: t.uint8_t
scheduled_time: t.uint16_t
class PowerProfilePhase(t.Struct):
energy_phase_id: t.uint8_t
macro_phase_id: t.uint8_t
expected_duration: t.uint16_t
peak_power: t.uint16_t
energy: t.uint16_t
class PowerProfile(t.Struct):
power_profile_id: t.uint8_t
energy_phase_id: t.uint8_t
power_profile_remote_control: t.Bool
power_profile_state: t.uint8_t
server_commands = {
0x0000: ("power_profile_request", (t.uint8_t,), False),
0x0001: ("power_profile_state_request", (), False),
0x0002: (
"get_power_profile_price_response",
(t.uint8_t, t.uint16_t, t.uint32_t, t.uint8_t),
True,
),
0x0003: (