Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not self.supports_operation_mode:
raise DeviceIllegalValue("operation mode not supported", operation_mode)
if self.group_address_operation_mode is not None:
await self.send(
self.group_address_operation_mode,
DPTArray(DPTHVACMode.to_knx(operation_mode)))
if self.group_address_operation_mode_protection is not None:
protection_mode = operation_mode == HVACOperationMode.FROST_PROTECTION
await self.send(
self.group_address_operation_mode_protection,
DPTBinary(protection_mode))
if self.group_address_operation_mode_night is not None:
night_mode = operation_mode == HVACOperationMode.NIGHT
await self.send(
self.group_address_operation_mode_night,
DPTBinary(night_mode))
if self.group_address_operation_mode_comfort is not None:
comfort_mode = operation_mode == HVACOperationMode.COMFORT
await self.send(
self.group_address_operation_mode_comfort,
DPTBinary(comfort_mode))
if self.group_address_controller_status is not None:
await self.send(
self.group_address_controller_status,
DPTArray(DPTControllerStatus.to_knx(operation_mode)))
if self.group_address_controller_mode is not None:
await self.send(
self.group_address_controller_mode,
DPTArray(DPTHVACContrMode.to_knx(operation_mode)))
await self._set_internal_operation_mode(operation_mode)
def from_knx(self, payload):
"""Convert current payload to value."""
if payload == DPTBinary(0):
return self.Direction.DECREASE if self.invert else self.Direction.INCREASE
if payload == DPTBinary(1):
return self.Direction.INCREASE if self.invert else self.Direction.DECREASE
raise CouldNotParseTelegram("payload invalid", payload=payload, device_name=self.device_name)
print("Connecting to {}:{} from {}".format(
gateway.ip_addr,
gateway.port,
gateway.local_ip))
tunnel = Tunnel(
xknx,
src_address,
local_ip=gateway.local_ip,
gateway_ip=gateway.ip_addr,
gateway_port=gateway.port)
await tunnel.connect_udp()
await tunnel.connect()
await tunnel.send_telegram(Telegram(GroupAddress('1/0/15'), payload=DPTBinary(1)))
await asyncio.sleep(2)
await tunnel.send_telegram(Telegram(GroupAddress('1/0/15'), payload=DPTBinary(0)))
await asyncio.sleep(2)
await tunnel.connectionstate()
await tunnel.disconnect()
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return isinstance(payload, DPTBinary)
def calculated_length(self):
"""Get length of KNX/IP body."""
if self.payload is None:
return 11
if isinstance(self.payload, DPTBinary):
return 11
if isinstance(self.payload, DPTArray):
return 11 + len(self.payload.value)
raise TypeError()
def calculate_payload(attr_payload):
"""Calculate payload depending on type of attribute."""
if isinstance(attr_payload, int):
return DPTBinary(attr_payload)
return DPTArray(attr_payload)
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return isinstance(payload, DPTBinary)
try:
self.cmd = APCICommand(tpci_apci & 0xFFC0)
except ValueError:
raise CouldNotParseKNXIP(
"APCI not supported: {0:#012b}".format(tpci_apci & 0xFFC0))
apdu = cemi[10 + addil:]
if len(apdu) != self.mpdu_len:
raise CouldNotParseKNXIP(
"APDU LEN should be {} but is {}".format(
self.mpdu_len, len(apdu)))
if len(apdu) == 1:
apci = tpci_apci & DPTBinary.APCI_BITMASK
self.payload = DPTBinary(apci)
else:
self.payload = DPTArray(cemi[11 + addil:])
return 10 + addil + len(apdu)
def encode_cmd_and_payload(cmd, encoded_payload=0, appended_payload=None):
"""Encode cmd and payload."""
if appended_payload is None:
appended_payload = []
data = [
1 + len(appended_payload),
(cmd.value >> 8) & 0xff,
(cmd.value & 0xff) |
(encoded_payload & DPTBinary.APCI_BITMASK)]
data.extend(appended_payload)
return data
def from_knx(self, payload):
"""Convert current payload to value."""
if payload == DPTBinary(1):
return self.operation_mode
if payload == DPTBinary(0):
# return the other operation mode
return next((_op for _op in self.supported_operation_modes() if _op is not self.operation_mode),
None)
raise CouldNotParseTelegram("payload invalid",
payload=payload, device_name=self.device_name, feature_name=self.feature_name)