Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
self._previous_print_len = 0
self.status_messages = {
uavcan.protocol.NodeStatus().MODE_OPERATIONAL: 'OPERATIONAL',
uavcan.protocol.NodeStatus().MODE_INITIALIZATION: 'INITIALIZATION',
uavcan.protocol.NodeStatus().MODE_MAINTENANCE: 'MAINTENANCE',
uavcan.protocol.NodeStatus().MODE_SOFTWARE_UPDATE: 'SOFTWARE_UPDATE',
uavcan.protocol.NodeStatus().MODE_OFFLINE: 'OFFLINE',
}
self.health_messages = {
uavcan.protocol.NodeStatus().HEALTH_OK: 'OK',
uavcan.protocol.NodeStatus().HEALTH_WARNING: 'WARNING',
uavcan.protocol.NodeStatus().HEALTH_ERROR: 'ERROR',
uavcan.protocol.NodeStatus().HEALTH_CRITICAL: 'CRITICAL',
}
def __init__(self, node, node_name):
self.node_name = node_name
self.node_id = None
self.node = node
self.node.add_handler(uavcan.protocol.NodeStatus,
self._node_status_callback)
def main():
args = parse_args()
config = yaml.load(args.config)
config = config['actuator'][args.board]
if args.dsdl:
uavcan.load_dsdl(args.dsdl)
global node
node = uavcan.make_node(args.port, node_id=127)
node.add_handler(uavcan.protocol.NodeStatus, node_status_callback)
req = uavcan.thirdparty.cvra.motor.config.LoadConfiguration.Request()
req.position_pid.kp = config['control']['position']['kp']
req.position_pid.ki = config['control']['position']['ki']
req.position_pid.kd = config['control']['position']['kd']
req.velocity_pid.kp = config['control']['velocity']['kp']
req.velocity_pid.ki = config['control']['velocity']['ki']
req.velocity_pid.kd = config['control']['velocity']['kd']
req.current_pid.kp = config['control']['current']['kp']
req.current_pid.ki = config['control']['current']['ki']
req.current_pid.kd = config['control']['current']['kd']
req.torque_limit = config['control']['torque_limit']
req.acceleration_limit = config['control']['acceleration_limit']
req.low_batt_th = config['control']['low_batt_th']
def node_mode_to_color(mode):
s = uavcan.protocol.NodeStatus()
return {
s.MODE_INITIALIZATION: Qt.cyan,
s.MODE_MAINTENANCE: Qt.magenta,
s.MODE_SOFTWARE_UPDATE: Qt.magenta,
s.MODE_OFFLINE: Qt.red
}.get(mode)
self._can_driver = can_driver
self._node_id = node_id
self._transfer_manager = transport.TransferManager()
self._outstanding_requests = {}
self._outstanding_request_callbacks = {}
self._next_transfer_ids = collections.defaultdict(int)
self.start_time_monotonic = time.monotonic()
# Hooks
self._transfer_hook_dispatcher = TransferHookDispatcher()
# NodeStatus publisher
self.health = uavcan.protocol.NodeStatus().HEALTH_OK # @UndefinedVariable
self.mode = uavcan.protocol.NodeStatus().MODE_INITIALIZATION if mode is None else mode # @UndefinedVariable
self.vendor_specific_status_code = 0
node_status_interval = node_status_interval or DEFAULT_NODE_STATUS_INTERVAL
self.periodic(node_status_interval, self._send_node_status)
# GetNodeInfo server
def on_get_node_info(e):
logger.debug('GetNodeInfo request from %r', e.transfer.source_node_id)
self._fill_node_status(self.node_info.status)
return self.node_info
self.node_info = node_info or uavcan.protocol.GetNodeInfo.Response() # @UndefinedVariable
self.add_handler(uavcan.protocol.GetNodeInfo, on_get_node_info) # @UndefinedVariable
logger.exception('No DSDL loaded from %r, only standard messages will be supported', dsdl_directory)
show_error('DSDL not loaded',
'Could not load DSDL definitions from %r.\n'
'The application will continue to work without the custom DSDL definitions.' % dsdl_directory,
ex, blocking=True)
# Trying to start the node on the specified interface
try:
node_info = uavcan.protocol.GetNodeInfo.Response()
node_info.name = NODE_NAME
node_info.software_version.major = __version__[0]
node_info.software_version.minor = __version__[1]
node = uavcan.make_node(iface,
node_info=node_info,
mode=uavcan.protocol.NodeStatus().MODE_OPERATIONAL,
**iface_kwargs)
# Making sure the interface is alright
node.spin(0.1)
except uavcan.transport.TransferError:
# allow unrecognized messages on startup:
logger.warning('UAVCAN Transfer Error occurred on startup', exc_info=True)
break
except Exception as ex:
logger.error('UAVCAN node init failed', exc_info=True)
show_error('Fatal error', 'Could not initialize UAVCAN node', ex, blocking=True)
else:
break
logger.info('Creating main window; iface %r', iface)
window = MainWindow(node, iface)
def __init__(self):
self._previous_print_len = 0
self.status_messages = {
uavcan.protocol.NodeStatus().MODE_OPERATIONAL: 'OPERATIONAL',
uavcan.protocol.NodeStatus().MODE_INITIALIZATION: 'INITIALIZATION',
uavcan.protocol.NodeStatus().MODE_MAINTENANCE: 'MAINTENANCE',
uavcan.protocol.NodeStatus().MODE_SOFTWARE_UPDATE: 'SOFTWARE_UPDATE',
uavcan.protocol.NodeStatus().MODE_OFFLINE: 'OFFLINE',
}
self.health_messages = {
uavcan.protocol.NodeStatus().HEALTH_OK: 'OK',
uavcan.protocol.NodeStatus().HEALTH_WARNING: 'WARNING',
uavcan.protocol.NodeStatus().HEALTH_ERROR: 'ERROR',
uavcan.protocol.NodeStatus().HEALTH_CRITICAL: 'CRITICAL',
}
def _send_node_status(self):
self._fill_node_status(self.node_info.status)
if self._node_id:
# TODO: transmit self.node_info.status instead of creating a new object
msg = uavcan.protocol.NodeStatus() # @UndefinedVariable
self._fill_node_status(msg)
self.broadcast(msg)
lights.commands.append(lcmd)
lcmd.light_id += 1
lights.commands.append(lcmd)
print(to_yaml(lights))
print(to_yaml(uavcan.equipment.power.BatteryInfo()))
print(to_yaml(uavcan.protocol.param.Empty()))
getset = uavcan.protocol.param.GetSet.Response()
print(to_yaml(getset))
uavcan.switch_union_field(getset.value, 'empty')
print(to_yaml(getset))
# value_to_constant_name()
print(value_to_constant_name(
uavcan.protocol.NodeStatus(mode=uavcan.protocol.NodeStatus().MODE_OPERATIONAL),
'mode'
))
print(value_to_constant_name(
uavcan.protocol.NodeStatus(mode=uavcan.protocol.NodeStatus().HEALTH_OK),
'health'
))
print(value_to_constant_name(
uavcan.equipment.range_sensor.Measurement(reading_type=uavcan.equipment.range_sensor.Measurement()
.READING_TYPE_TOO_FAR),
'reading_type'
))
print(value_to_constant_name(
uavcan.protocol.param.ExecuteOpcode.Request(opcode=uavcan.protocol.param.ExecuteOpcode.Request().OPCODE_ERASE),
'opcode'
))
print(value_to_constant_name(
def node_health_to_color(health):
s = uavcan.protocol.NodeStatus()
return {
s.HEALTH_WARNING: Qt.yellow,
s.HEALTH_ERROR: Qt.magenta,
s.HEALTH_CRITICAL: Qt.red,
}.get(health)