Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_record(self):
test_od = od.ObjectDictionary()
record = od.Record("Test Record", 0x1001)
var = od.Variable("Test Subindex", 0x1001, 1)
record.add_member(var)
test_od.add_object(record)
self.assertEqual(test_od["Test Record"], record)
self.assertEqual(test_od[0x1001], record)
self.assertEqual(test_od["Test Record"]["Test Subindex"], var)
def test_add_array(self):
test_od = od.ObjectDictionary()
array = od.Array("Test Array", 0x1002)
array.add_member(od.Variable("Last subindex", 0x1002, 0))
test_od.add_object(array)
self.assertEqual(test_od["Test Array"], array)
self.assertEqual(test_od[0x1002], array)
def __init__(self, node_id, object_dictionary):
self.network = None
if not isinstance(object_dictionary, objectdictionary.ObjectDictionary):
object_dictionary = objectdictionary.import_od(object_dictionary, node_id)
self.object_dictionary = object_dictionary
self.id = node_id or self.object_dictionary.node_id
def import_eds(source, node_id):
eds = RawConfigParser()
if hasattr(source, "read"):
fp = source
else:
fp = open(source)
try:
# Python 3
eds.read_file(fp)
except AttributeError:
# Python 2
eds.readfp(fp)
fp.close()
od = objectdictionary.ObjectDictionary()
if eds.has_section("DeviceComissioning"):
od.bitrate = int(eds.get("DeviceComissioning", "Baudrate")) * 1000
od.node_id = int(eds.get("DeviceComissioning", "NodeID"))
for section in eds.sections():
# Match dummy definitions
match = re.match(r"^[Dd]ummy[Uu]sage$", section)
if match is not None:
for i in range(1, 8):
key = "Dummy%04d" % i
if eds.getint(section, key) == 1:
var = objectdictionary.Variable(key, i, 0)
var.data_type = i
var.access_type = "const"
od.add_object(var)
def import_from_node(node_id, network):
""" Download the configuration from the remote node
:param int node_id: Identifier of the node
:param network: network object
"""
# Create temporary SDO client
sdo_client = SdoClient(0x600 + node_id, 0x580 + node_id, objectdictionary.ObjectDictionary())
sdo_client.network = network
# Subscribe to SDO responses
network.subscribe(0x580 + node_id, sdo_client.on_response)
# Create file like object for Store EDS variable
try:
eds_fp = sdo_client.open(0x1021, 0, "rt")
od = import_eds(eds_fp, node_id)
except Exception as e:
logger.error("No object dictionary could be loaded for node %d: %s",
node_id, e)
od = None
finally:
network.unsubscribe(0x580 + node_id)
return od
def import_epf(epf):
"""Import an EPF file.
:param epf:
Either a path to an EPF-file, a file-like object, or an instance of
:class:`xml.etree.ElementTree.Element`.
:returns:
The Object Dictionary.
:rtype: canopen.ObjectDictionary
"""
od = objectdictionary.ObjectDictionary()
if etree.iselement(epf):
tree = epf
else:
tree = etree.parse(epf).getroot()
# Find and set default bitrate
can_config = tree.find("Configuration/CANopen")
if can_config is not None:
bitrate = can_config.get("BitRate", "250")
bitrate = bitrate.replace("U", "")
od.bitrate = int(bitrate) * 1000
# Parse Object Dictionary
for group_tree in tree.iterfind("Dictionary/Parameters/Group"):
name = group_tree.get("SymbolName")
parameters = group_tree.findall("Parameter")