Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_context_menu(self):
assert isinstance(self.form, MainController)
self.add_signal_to_form("esaver.complex")
self.form.signal_tab_controller.signal_frames[0].ui.cbProtoView.setCurrentIndex(2)
logger.debug("Get text edit")
text_edit = self.form.signal_tab_controller.signal_frames[0].ui.txtEdProto
menu = text_edit.create_context_menu()
line_wrap_action = next(action for action in menu.actions() if action.text().startswith("Linewrap"))
checked = line_wrap_action.isChecked()
line_wrap_action.trigger()
menu = text_edit.create_context_menu()
line_wrap_action = next(action for action in menu.actions() if action.text().startswith("Linewrap"))
self.assertNotEqual(checked, line_wrap_action.isChecked())
self.assertEqual(len([action for action in menu.actions() if action.text() == "Participant"]), 0)
self.form.project_manager.participants.append(Participant("Alice", "A"))
text_edit.selectAll()
menu = text_edit.create_context_menu()
self.assertEqual(len([action for action in menu.actions() if action.text() == "Participant"]), 1)
def decimal2bit(number: str, num_bits: int) -> array.array:
try:
number = int(number)
except ValueError as e:
logger.error(e)
return array.array("B", [])
fmt_str = "{0:0" + str(num_bits) + "b}"
return array.array("B", map(int, fmt_str.format(number)))
def stop_rx_mode(self, msg):
try:
self.parent_ctrl_conn.send(self.Command.STOP.name)
except (BrokenPipeError, OSError) as e:
logger.debug("Closing parent control connection: " + str(e))
logger.info("{0}: Stopping RX Mode: {1}".format(self.__class__.__name__, msg))
if hasattr(self, "receive_process") and self.receive_process.is_alive():
self.receive_process.join(self.JOIN_TIMEOUT)
if self.receive_process.is_alive():
logger.warning("{0}: Receive process is still alive, terminating it".format(self.__class__.__name__))
self.receive_process.terminate()
self.receive_process.join()
self.is_receiving = False
for connection in (self.parent_ctrl_conn, self.parent_data_conn, self.child_ctrl_conn, self.child_data_conn):
try:
connection.close()
except OSError as e:
logger.exception(e)
def get_selected_label_index(self, row: int, column: int):
if self.row_count == 0:
return -1
try:
msg = self.protocol.messages[row]
except IndexError:
logger.warning("{} is out of range for generator protocol".format(row))
return -1
for i, lbl in enumerate(msg.message_type):
if column in range(*msg.get_label_range(lbl, self.proto_view, False)):
return i
return -1
QMessageBox.critical(self, self.tr("Error"),
self.tr("Could not find corresponding signal frame."))
return
signal_frame.set_roi_from_protocol_analysis(min(messages), start, max(messages), end + 1, view_type)
last_sig_frame = signal_frame
msg_total += n
focus_frame = last_sig_frame
if last_sig_frame is not None:
self.signal_tab_controller.ui.scrollArea.ensureWidgetVisible(last_sig_frame, 0, 0)
QApplication.instance().processEvents()
self.ui.tabWidget.setCurrentIndex(0)
if focus_frame is not None:
focus_frame.ui.txtEdProto.setFocus()
except Exception as e:
logger.exception(e)
3) Fulltext search for addresses based on participant subgroups
:param messages: messages a field shall be searched for
:type messages: list of Message
"""
try:
if self.backend == self.Backend.python:
self._py_find_field(messages)
elif self.backend == self.Backend.cython:
self._cy_find_field(messages)
elif self.backend == self.Backend.plainc:
self._c_find_field(messages)
else:
raise ValueError("Unsupported backend {}".format(self.backend))
except NotImplementedError:
logger.info("Skipped {} because not implemented yet".format(self.__class__.__name__))
param = param.upper().replace(",", ".")
factor = 1
if param.endswith("G"):
factor = 10 ** 9
param = param[:-1]
elif param.endswith("M"):
factor = 10 ** 6
param = param[:-1]
elif param.endswith("K"):
factor = 10 ** 3
param = param[:-1]
try:
parameters.append(factor * float(param))
except ValueError:
logger.warning("Could not convert {} to number".format(param))
return
self.current_modulator.parameters[:] = array("f", parameters)
self.draw_modulated()
self.show_full_scene()
from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.Modulator import Modulator
from urh.util.Logger import logger
import time
endless_sender = EndlessSender(BackendHandler(), "HackRF")
msg = Message([1, 0] * 16 + [1, 1, 0, 0] * 8 + [0, 0, 1, 1] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4, 0,
MessageType("empty_message_type"))
modulator = Modulator("test_modulator")
modulator.samples_per_symbol = 1000
modulator.carrier_freq_hz = 55e3
logger.debug("Starting endless sender")
endless_sender.start()
time.sleep(1)
logger.debug("Pushing data")
endless_sender.push_data(modulator.modulate(msg.encoded_bits))
logger.debug("Pushed data")
time.sleep(5)
logger.debug("Stopping endless sender")
endless_sender.stop()
time.sleep(1)
logger.debug("bye")
def prepare_modulation_buffer(self, total_samples: int, show_error=True) -> IQArray:
dtype = Modulator.get_dtype()
n = 2 if dtype == np.int8 else 4 if dtype == np.int16 else 8
memory_size_for_buffer = total_samples * n
logger.debug("Allocating {0:.2f}MB for modulated samples".format(memory_size_for_buffer / (1024 ** 2)))
try:
# allocate it three times as we need the same amount for the sending process
IQArray(None, dtype=dtype, n=3*total_samples)
except MemoryError:
# will go into continuous mode in this case
if show_error:
Errors.not_enough_ram_for_sending_precache(3*memory_size_for_buffer)
return None
return IQArray(None, dtype=dtype, n=total_samples)