Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_without_preamble(self):
alice = Participant("Alice", address_hex="24")
broadcast = Participant("Broadcast", address_hex="ff")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x8e88"},
preambles_by_mt={mb.message_type: "10" * 8},
participants=[alice, broadcast])
for i in range(20):
data_bits = 16 if i % 2 == 0 else 32
source = pg.participants[i % 2]
destination = pg.participants[(i + 1) % 2]
pg.generate_message(data="1010" * (data_bits // 4), source=source, destination=destination)
self.save_protocol("without_preamble", pg)
def test_without_preamble(self):
alice = Participant("Alice", address_hex="24")
broadcast = Participant("Broadcast", address_hex="ff")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x8e88"},
preambles_by_mt={mb.message_type: "10" * 8},
participants=[alice, broadcast])
for i in range(20):
data_bits = 16 if i % 2 == 0 else 32
source = pg.participants[i % 2]
destination = pg.participants[(i + 1) % 2]
pg.generate_message(data="1010" * (data_bits // 4), source=source, destination=destination)
for i in range(num_messages):
if i % 4 == 0:
data = "1" * data_length
elif i % 4 == 1:
data = "0" * data_length
elif i % 4 == 2:
data = "10" * (data_length // 2)
else:
data = "01" * (data_length // 2)
pg.generate_message(data=data)
self.save_protocol("easy_length", pg)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
length_engine = LengthEngine(ff.bitvectors)
highscored_ranges = length_engine.find(n_gram_length=8)
self.assertEqual(len(highscored_ranges), 4)
ff.perform_iteration()
self.assertEqual(len(ff.message_types), 1)
self.assertGreater(len(ff.message_types[0]), 0)
print(ff.message_types[0])
label = ff.message_types[0].get_first_label_with_type(FieldType.Function.LENGTH)
self.assertIsInstance(label, ProtocolLabel)
self.assertEqual(label.start, 32)
self.assertEqual(label.length, 8)
if resume_on_full_receive_buffer:
if spectrum_mode:
num_samples = constants.SPECTRUM_BUFFER_SIZE
else:
num_samples = constants.SNIFF_BUFFER_SIZE
else:
# Take 60% of avail memory
threshold = constants.SETTINGS.value('ram_threshold', 0.6, float)
num_samples = threshold * (psutil.virtual_memory().available / 8)
# Do not let it allocate too much on 32 bit
if 8*num_samples > sys.maxsize // 2:
num_samples = sys.maxsize // (8 * 2)
logger.info("Correcting buffer size to {}".format(num_samples))
logger.info("Initializing receive buffer with size {0}B".format(Formatter.big_value_with_suffix(num_samples*8)))
return int(num_samples)
The sync is chosen by comparing all constant ranges that lay right behind the preamble and chose the most frequent one.
:return:
"""
if not self.is_initialized:
self.__search_constant_intervals()
possible_sync_pos = defaultdict(int)
for const_range in (cr for const_interval in self.common_intervals.values() for cr in const_interval):
const_range = Interval(4 * ((const_range.start + 1) // 4) - 1, 4 * ((const_range.end + 1) // 4) - 1) # align to nibbles
possible_sync_pos[const_range] += int(const_range.start == self.preamble_end)
sync_interval = max(possible_sync_pos, key=possible_sync_pos.__getitem__)
self.sync_end = sync_interval.end
return ProtocolLabel(start=sync_interval.start + 1, end=sync_interval.end - 1, name="Sync", color_index=None)
def setUp(self):
super().setUp()
self.form.ui.tabWidget.setCurrentIndex(2)
logger.debug("Preparing Modulation dialog")
self.dialog, _ = self.form.generator_tab_controller.prepare_modulation_dialog()
QApplication.instance().processEvents()
QTest.qWait(self.WAIT_TIMEOUT_BEFORE_NEW)
if self.SHOW:
self.dialog.show()
logger.debug("Initializing Modulation dialog")
self.dialog.initialize("1111")
logger.debug("Preparation success")
def test_enocean_crc_polynomial(self):
e = Encoding()
msg1 = "aa9a6d201006401009802019e411e8035b"
msg2 = "aa9a6d2010000ffdaaf01019e411e8071b"
# Remove Preamble + SOF + EOF for CRC calculation
msg1 = util.hex2bit("a6d201006401009802019e411e8035")
crc1 = util.hex2bit("35")
msg2 = util.hex2bit("a6d2010000ffdaaf01019e411e8071")
crc2 = util.hex2bit("71")
wsp_checker = WSPChecksum()
calc_crc1 = wsp_checker.calculate(msg1)
calc_crc2 = wsp_checker.calculate(msg2)
self.assertTrue(calc_crc1 == crc1)
self.assertTrue(calc_crc2 == crc2)
def test_enocean_crc8_message(self):
e = Encoding()
received = util.hex2bit("aacbac4cddd5ddd3bddd5ddcc5ddcddd4c2d5d5c2cdddab200000")
preamble, sof, eof = "aa", "9", "b"
decoded, err, state = e.code_enocean(decoding=True, inpt=received)
self.assertEqual(err, 0)
self.assertEqual(state, e.ErrorState.SUCCESS)
self.assertIn(preamble, util.bit2hex(decoded))
self.assertIn(sof, util.bit2hex(decoded))
self.assertIn(eof, util.bit2hex(decoded))
reencoded, errors, state = e.code_enocean(decoding=False, inpt=decoded)
self.assertEqual(errors, 0)
self.assertEqual(state, e.ErrorState.SUCCESS)
redecoded, errors, state = e.code_enocean(decoding=True, inpt=reencoded)
self.assertEqual(errors, 0)
self.assertEqual(state, e.ErrorState.SUCCESS)
for i, message in enumerate(self.protocol.messages):
if i in alice_indices:
message.participant = alice
else:
message.participant = bob
self.assertEqual(self.protocol.num_messages, 42)
self.assertEqual(self.protocol.plain_hex_str[0][16:18], "2d")
self.decodings = []
self.decodings.append(Encoding(['Non Return To Zero (NRZ)']))
self.decodings.append(Encoding(['Non Return To Zero Inverted (NRZ-I)', 'Invert']))
self.decodings.append(Encoding(['Manchester I', 'Edge Trigger']))
self.decodings.append(Encoding(['Manchester II', 'Edge Trigger', 'Invert']))
self.decodings.append(Encoding(['Differential Manchester', 'Edge Trigger', 'Differential Encoding', ]))
self.decodings.append(Encoding(['DeWhitening Special', constants.DECODING_DATAWHITENING, '0x9a7d9a7d;0x21;0']))
self.decodings.append(Encoding(['DeWhitening', constants.DECODING_DATAWHITENING, '0x67686768;0x21;0']))
def test_set_icon_theme(self):
constants.SETTINGS.setValue("icon_theme_index", 0)
util.set_icon_theme()
self.assertEqual(QIcon.themeName(), "oxy")
constants.SETTINGS.setValue("icon_theme_index", 1)
util.set_icon_theme()
if sys.platform == "linux":
self.assertNotEqual(QIcon.themeName(), "oxy")
else:
self.assertEqual(QIcon.themeName(), "oxy")