Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.current_index = 0
if num_samples >= len(self.data):
self.stop("Receiving buffer too small.")
else:
self.stop("Receiving Buffer is full.")
return
self.data[self.current_index:self.current_index + num_samples] = tmp
self.current_index += num_samples
if self.emit_data_received_signal:
self.data_received.emit(tmp)
rcvd = b""
except ValueError:
self.stop("Could not receive data. Is your Hardware ok?")
except RuntimeError:
logger.error("Receiver Thread crashed.")
def __fill_counter_values(self, command: str):
result = []
regex = "(item[0-9]+\.counter_value)"
for token in re.split(regex, command):
if re.match(regex, token) is not None:
try:
result.append(str(self.simulator_config.item_dict[token].value))
except (KeyError, ValueError, AttributeError):
logger.error("Could not get counter value for " + token)
else:
result.append(token)
return "".join(result)
def hex2bit(hex_str: str) -> array.array:
if not isinstance(hex_str, str):
return array.array("B", [])
if hex_str[:2] == "0x":
hex_str = hex_str[2:]
try:
bitstring = "".join("{0:04b}".format(int(h, 16)) for h in hex_str)
return array.array("B", [True if x == "1" else False for x in bitstring])
except (TypeError, ValueError) as e:
logger.error(e)
result = array.array("B", [])
return result
def prepare_send_connection(self):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((self.client_ip, self.client_port))
self.send_connection_established.emit()
return sock
except Exception as e:
msg = "Could not establish connection " + str(e)
self.error_occurred.emit(msg)
logger.error(msg)
return None
def parse_project_file(file_path: str):
import xml.etree.ElementTree as ET
from urh.util.ProjectManager import ProjectManager
result = defaultdict(lambda: None)
if not file_path or not os.path.isfile(file_path):
return result
try:
tree = ET.parse(file_path)
root = tree.getroot()
except Exception as e:
logger.error("Could not read project file {}: {}".format(file_path, e))
return result
ProjectManager.read_device_conf_dict(root.find("device_conf"), target_dict=result)
result["device"] = result["name"]
modulators = Modulator.modulators_from_xml_tag(root)
if len(modulators) > 0:
modulator = modulators[0]
result["carrier_frequency"] = modulator.carrier_freq_hz
result["carrier_amplitude"] = modulator.carrier_amplitude
result["carrier_phase"] = modulator.carrier_phase_deg
result["parameters"] = " ".join(map(str, modulator.parameters))
result["modulation_type"] = modulator.modulation_type
return result
def load_from_file(self, filename: str):
try:
self.modulators = ProjectManager.read_modulators_from_file(filename)
self.table_model.protocol.from_xml_file(filename)
self.refresh_pause_list()
self.refresh_estimated_time()
self.refresh_modulators()
self.show_modulation_info()
self.refresh_table()
self.set_fuzzing_ui_status()
except:
logger.error("You done something wrong to the xml fuzzing profile.")
def on_btn_replay_clicked(self):
project_manager = self.project_manager
try:
dialog = SendDialog(project_manager, modulated_data=self.signal.iq_array, parent=self)
except OSError as e:
logger.error(repr(e))
return
if dialog.has_empty_device_list:
Errors.no_device()
dialog.close()
return
dialog.device_parameters_changed.connect(project_manager.set_device_parameters)
dialog.show()
dialog.graphics_view.show_full_scene(reinitialize=True)
def to_xml(self) -> ET.Element:
result = ET.Element("message_type", attrib={"name": self.name, "id": self.id,
"assigned_by_ruleset": "1" if self.assigned_by_ruleset else "0",
"assigned_by_logic_analyzer": "1" if self.assigned_by_logic_analyzer else "0"})
for lbl in self:
try:
result.append(lbl.to_xml())
except TypeError:
logger.error("Could not save label: " + str(lbl))
result.append(self.ruleset.to_xml())
return result
def start_rx_mode(self):
self.init_recv_buffer()
self.parent_data_conn, self.child_data_conn = Pipe(duplex=False)
self.parent_ctrl_conn, self.child_ctrl_conn = Pipe()
self.is_receiving = True
logger.info("{0}: Starting RX Mode".format(self.__class__.__name__))
self.receive_process = Process(target=self.receive_process_function,
args=self.receive_process_arguments)
self.receive_process.daemon = True
self._start_read_rcv_buffer_thread()
self._start_read_message_thread()
try:
self.receive_process.start()
except OSError as e:
logger.error(repr(e))
self.device_messages.append(repr(e))