Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
# coding=utf-8
import codecs
import sys
sys.path.append('..')
import canmatrix
#
# create target Matrix
#
db = canmatrix.CanMatrix()
db.boardUnits.add(canmatrix.BoardUnit("testBU"))
db.boardUnits.add(canmatrix.BoardUnit("recBU"))
myFrame1 = canmatrix.Frame("canFdStandard1",Id=1, dlc=24, is_fd = True, transmitter=["testBU"])
myFrame2 = canmatrix.Frame("CanFdExtended2",Id=2, dlc=16, extended = True, is_fd = True, transmitter=["testBU"])
myFrame3 = canmatrix.Frame("CanExtended3", Id=3, dlc=8, extended = True, transmitter=["testBU"])
myFrame4 = canmatrix.Frame("CanStandard4", Id=4, dlc=8)
mySignal1 = canmatrix.Signal("signal1", signalSize=64, startBit=0, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal2 = canmatrix.Signal("signal2", signalSize=64, startBit=64, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal3 = canmatrix.Signal("signal3", signalSize=64, startBit=128, is_little_endian=True, min=0, max=0, is_signed=True)
myFrame1.addSignal(mySignal3)
myFrame1.addSignal(mySignal2)
myFrame1.addSignal(mySignal1)
#!/usr/bin/env python
# coding=utf-8
import codecs
import sys
sys.path.append('..')
import canmatrix
#
# create target Matrix
#
db = canmatrix.CanMatrix()
db.ecus.add(canmatrix.ecu("testBU"))
db.ecus.add(canmatrix.ecu("recBU"))
myFrame1 = canmatrix.Frame("canFdStandard1",Id=1, dlc=24, is_fd = True, transmitter=["testBU"])
myFrame2 = canmatrix.Frame("CanFdExtended2",Id=2, dlc=16, extended = True, is_fd = True, transmitter=["testBU"])
myFrame3 = canmatrix.Frame("CanExtended3", Id=3, dlc=8, extended = True, transmitter=["testBU"])
myFrame4 = canmatrix.Frame("CanStandard4", Id=4, dlc=8)
mySignal1 = canmatrix.Signal("signal1", signalSize=64, startBit=0, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal2 = canmatrix.Signal("signal2", signalSize=64, startBit=64, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal3 = canmatrix.Signal("signal3", signalSize=64, startBit=128, is_little_endian=True, min=0, max=0, is_signed=True)
myFrame1.add_signal(mySignal3)
myFrame1.add_signal(mySignal2)
myFrame1.add_signal(mySignal1)
def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix
dbf_import_encoding = options.get("dbfImportEncoding", 'iso-8859-1')
float_factory = options.get('float_factory', default_float_factory)
db = canmatrix.CanMatrix()
mode = ''
for line in f:
line = line.decode(dbf_import_encoding).strip()
if mode == 'SignalDescription':
if line.startswith(
"[END_DESC_SIG]") or line.startswith("[END_DESC]"):
mode = ''
else:
(bo_id, tem_s, signal_name, comment) = line.split(' ', 3)
comment = comment.replace('"', '').replace(';', '')
db.frame_by_id(canmatrix.ArbitrationId.from_compound_integer(int(bo_id))).signal_by_name(
signal_name).add_comment(comment)
if mode == 'BUDescription':
import sys
#
# read source Can-Matrixes
#
# import of one CAN-Matrix (*.dbc, *.dbf, *.kcd, *.arxml)
db1 = canmatrix.formats.loadp("first.dbc", flat_import=True)
# import of a second CAN-Matrix (*.dbc, *.dbf, *.kcd, *.arxml)
db2 = canmatrix.formats.loadp("second.dbc", flat_import=True)
#
# create target Matrix
#
db3 = canmatrix.CanMatrix()
#
# Here a new Can-Matrix can be 'programmed':
# -----------------------------------------------------
#
# Copy Can-ID 1234 from second CAN-Matrix to target-Matrix
canmatrix.copy.copy_frame(1234, db2, db3)
# Copy frame "Engine_123" from first CAN-Matrix to target-Matrix
canmatrix.copy.copy_frame("Engine_123", db1, db3)
# Copy ECU (with all Frames) "Gateway" from first CAN-Matrix to target-Matrix
canmatrix.copy.copy_ecu_with_frames("Gateway", db1, db3)
#
def load(f, **_options):
# type: (typing.BinaryIO, **str) -> canmatrix.CanMatrix
db = canmatrix.CanMatrix()
if sys.version_info > (3, 0):
import io
json_data = json.load(io.TextIOWrapper(f, encoding='UTF-8'))
else:
json_data = json.load(f)
if "messages" in json_data:
for frame in json_data["messages"]:
# new_frame = Frame(frame["id"],frame["name"],8,None)
new_frame = canmatrix.Frame(frame["name"], arbitration_id=frame["id"], size=8)
if "length" in frame:
new_frame.size = frame["length"]
new_frame.arbitration_id.extended = frame.get("is_extended_frame", False)
def load(filename, **options):
# type: (typing.BinaryIO, **str) -> canmatrix.CanMatrix
# use xlrd excel reader if available, because its more robust
if options.get('xlsxLegacy', False) is True:
logger.error("xlsx: using legacy xlsx-reader - please get xlrd working for better results!")
else:
import canmatrix.formats.xls as xls_loader # we need alias, otherwise we hide the globally imported canmatrix
return xls_loader.load(filename, **options)
# else use this hack to read xlsx
motorola_bit_format = options.get("xlsMotorolaBitFormat", "msbreverse")
sheet = read_xlsx(filename, sheet=1, header=True)
db = canmatrix.CanMatrix()
all_letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
letter_index = list(all_letters)
letter_index += ["%s%s" % (a, b) for a in all_letters for b in all_letters]
# Defines not imported...
db.add_frame_defines("GenMsgDelayTime", 'INT 0 65535')
db.add_frame_defines("GenMsgCycleTimeActive", 'INT 0 65535')
db.add_frame_defines("GenMsgNrOfRepetitions", 'INT 0 65535')
launch_types = [] # type: typing.List[str]
db.add_signal_defines("GenSigSNA", 'STRING')
ecu_start = ecu_end = 0
if 'Byteorder' in list(sheet[0].values()):
for key in sheet[0]:
if sheet[0][key].strip() == 'Byteorder':
def load(f, **options):
# type: (typing.IO, **typing.Any) -> typing.Dict[str, canmatrix.CanMatrix]
float_factory = options.get("float_factory", default_float_factory) # type: typing.Callable
dbs = {} # type: typing.Dict[str, canmatrix.CanMatrix]
tree = lxml.etree.parse(f)
root = tree.getroot()
namespace = "{" + tree.xpath('namespace-uri(.)') + "}"
node_list = {}
nodes = root.findall('./' + namespace + 'Node')
buses = root.findall('./' + namespace + 'Bus')
counter = 0
for bus in buses:
db = canmatrix.CanMatrix()
db.add_frame_defines("GenMsgCycleTime", 'INT 0 65535')
for node in nodes:
db.ecus.append(canmatrix.Ecu(node.get('name')))
node_list[node.get('id')] = node.get('name')
messages = bus.findall('./' + namespace + 'Message')
for message in messages:
dlc = None
# new_frame = Frame(int(message.get('id'), 16), message.get('name'), 1, None)
new_frame = canmatrix.Frame(message.get('name'))
if 'triggered' in message.attrib:
new_frame.add_attribute("GenMsgCycleTime", message.get('interval'))
if 'length' in message.attrib:
#!/usr/bin/env python3
import canmatrix.formats
cm = canmatrix.CanMatrix()
#
# create frame Node604
#
frame1 = canmatrix.Frame("Node604", j1939_pgn = 0xff00, j1939_prio = 0x6,
j1939_source = 0x80,
comment = "J1939 packet containing >8 byte payload")
for i in range(1,9):
sig = canmatrix.Signal("ch%d" % i, size = 32, is_float = True, is_little_endian = False, startBit = (i-1)*32)
frame1.add_signal(sig)
cm.add_frame(frame1)
#
# create frame Node605
#
frame2 = canmatrix.Frame("Node605", j1939_pgn = 0xff01, j1939_prio = 0x6,
logger.debug("%d frames in arxml...", len(frames))
can_triggers = root.findall('.//' + ns + 'CAN-FRAME-TRIGGERING')
logger.debug("%d can-frame-triggering in arxml...", len(can_triggers))
sig_pdu_map = root.findall('.//' + ns + 'SIGNAL-TO-PDU-MAPPINGS')
logger.debug("%d SIGNAL-TO-PDU-MAPPINGS in arxml...", len(sig_pdu_map))
sig_ipdu = root.findall('.//' + ns + 'I-SIGNAL-TO-I-PDU-MAPPING')
logger.debug("%d I-SIGNAL-TO-I-PDU-MAPPING in arxml...", len(sig_ipdu))
if ignore_cluster_info is True:
ccs = [lxml.etree.Element("ignoreClusterInfo")] # type: typing.Sequence[_Element]
else:
ccs = root.findall('.//' + ns + 'CAN-CLUSTER')
for cc in ccs: # type: _Element
db = canmatrix.CanMatrix()
# Defines not jet imported...
db.add_ecu_defines("NWM-Stationsadresse", 'HEX 0 63')
db.add_ecu_defines("NWM-Knoten", 'ENUM "nein","ja"')
db.add_signal_defines("LongName", 'STRING')
db.add_frame_defines("GenMsgDelayTime", 'INT 0 65535')
db.add_frame_defines("GenMsgNrOfRepetitions", 'INT 0 65535')
db.add_frame_defines("GenMsgStartValue", 'STRING')
db.add_frame_defines("GenMsgStartDelayTime", 'INT 0 65535')
db.add_frame_defines(
"GenMsgSendType",
'ENUM "cyclicX","spontanX","cyclicIfActiveX","spontanWithDelay","cyclicAndSpontanX","cyclicAndSpontanWithDelay","spontanWithRepitition","cyclicIfActiveAndSpontanWD","cyclicIfActiveFast","cyclicWithRepeatOnDemand","none"')
if ignore_cluster_info is True:
can_frame_trig = root.findall('.//' + ns + 'CAN-FRAME-TRIGGERING')
bus_name = ""
else:
def dump(can_matrix_or_cluster, file_object, export_type, **options):
# type: (typing.Union[canmatrix.CanMatrix, typing.Mapping[str, canmatrix.CanMatrix]], typing.IO, str, **str) -> None
module_instance = sys.modules["canmatrix.formats." + export_type]
if isinstance(can_matrix_or_cluster, canmatrix.CanMatrix):
module_instance.dump(can_matrix_or_cluster, file_object, **options) # type: ignore
elif "clusterExporter" in supportedFormats[export_type]:
module_instance.dump(can_matrix_or_cluster, file_object, **options) # type: ignore