cantools API
Warning
The examples on this page are provided without support
The Python package cantools extends python-can with support for databases (containing message interpretations) and support for message payload encoding/decoding. Refer to the cantools documentation for more information https://cantools.readthedocs.io/.
This page contains some basic examples on how to use cantools.
Manually define database
A database of message interpretations can be manually constructed directly in Python.
from cantools.database import Database, Message, Signal
from cantools.database.conversion import BaseConversion
# Define signals
db_msg1_signals = [
Signal(name="A1", start=0, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
Signal(name="A2", start=16, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
Signal(name="A3", start=32, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
Signal(name="A4", start=48, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0))]
# Define messages
db_msgs = [
Message(name="A1ToA4", is_extended_frame=False, frame_id=2, length=8, signals=db_msg1_signals)]
# Define database
db = Database(messages=db_msgs)
Load existing database
cantools supports a range of database file formats. e.g. *.DBC
.
from cantools import database
# Load database from file
db = database.load_file("<DBC_FILE_PATH>")
Decode messages
Below example demonstrates how cantools can be used to decode messages.
Note
Alternatively, messages can be logged to a file (see can-logger) and decoded later.
import can
from cantools import database
# Load database from file
db = database.load_file("<DBC_FILE_PATH>")
# Open bus
with can.Bus(interface="csscan_serial", channel="<CHANNEL>") as bus:
# Receive messages
for msg in bus:
# Get message name from database
try:
db_msg = db.get_message_by_frame_id(msg.arbitration_id)
except KeyError:
# Message not defined in database
continue
# Decode using database
result = db.decode_message(db_msg.name, data=msg.data)
# Add timestamp
result.update({"t": msg.timestamp})
print(f"{db_msg.name}: {result}")
cantools has weak support for J1939 PGNs. However, by splitting the database into two (one for PDU format 1 and one for PDU format 2), decoding of PGNs can be done as demonstrated below.
Note
For better J1939 support, consider using the dedicated python-can-j1939 package.
import can
import math
from cantools import database, j1939
# Create a J1939 PDU1 and PDU2 database (with matching PGN ID masks)
db_pdu1 = database.Database(frame_id_mask=0x3FF0000)
db_pdu2 = database.Database(frame_id_mask=0x3FFFF00)
# Load database from file
for db_msg in database.load_file("<DBC_FILE_PATH>").messages:
if j1939.is_pdu_format_1(j1939.frame_id_unpack(db_msg.frame_id).pdu_format):
db_pdu1.messages.append(db_msg)
else:
db_pdu2.messages.append(db_msg)
db_pdu1.refresh()
db_pdu2.refresh()
# Open bus
with can.Bus(interface="csscan_serial", channel="<CHANNEL>") as bus:
# Receive messages
for msg in bus:
# J1939 is extended only
if not msg.is_extended_id:
continue
# Select database based on PDU format
if j1939.is_pdu_format_1(j1939.frame_id_unpack(msg.arbitration_id).pdu_format):
db = db_pdu1
else:
db = db_pdu2
# Get message name from database
try:
db_msg = db.get_message_by_frame_id(msg.arbitration_id)
except KeyError:
# Message not defined in database
continue
# Decode using database
result = db.decode_message(db_msg.name, data=msg.data)
# Ensure that signals satisfy min/max
for name, value in result.items():
db_sig = db_msg.get_signal_by_name(name)
if not db_sig.maximum > value > db_sig.minimum:
result[name] = math.nan
# Add timestamp
result.update({"t": msg.timestamp})
print(result)
Encode messages
Below example demonstrates how cantools can be used to encode messages. The example assumes that the database contains a message Analog1To4 with signals Analog1, Analog2, Analog3, and Analog4.
import can
from cantools import database
# Load database
db = database.load_file("<DBC_FILE_PATH>")
# Open bus
with can.Bus(interface="csscan_serial", channel="<CHANNEL>") as bus:
# Get message to transmit
db_msg = db.get_message_by_name("Analog1To4")
# Encode data
data = db_msg.encode({"Analog1": 0, "Analog2": 2500, "Analog3": 5000, "Analog4": 7500})
# Transmit message with encoded data
bus.send(can.Message(arbitration_id=db_msg.frame_id,
is_extended_id=db_msg.is_extended_frame,
data=data))