cantools API

Warning

The examples on this page are provided without support

The Python package cantools extends python-can with support for databases (containing message interpretations) and support for message payload encoding/decoding. Refer to the cantools documentation for more information https://cantools.readthedocs.io/.

This page contains some basic examples on how to use cantools.


Installation

$ pip install cantools

Note

The examples on this page has been tested with version 39.4.13 of cantools


Manually define database

A database of message interpretations can be manually constructed directly in Python.

from cantools.database import Database, Message, Signal
from cantools.database.conversion import BaseConversion

# Define signals
db_msg1_signals = [
    Signal(name="A1", start=0,  length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
    Signal(name="A2", start=16, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
    Signal(name="A3", start=32, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0)),
    Signal(name="A4", start=48, length=16, conversion=BaseConversion.factory(scale=0.625, offset=0))
]

# Define messages
db_msgs = [
    Message(name="A1ToA4", is_extended_frame=False, frame_id=2, length=8, signals=db_msg1_signals)
]

# Define database
db = Database(messages=db_msgs)

Load existing database

cantools supports a range of database file formats. e.g. *.DBC.

from cantools import database

# Load database from file
db = database.load_file("<DBC_FILE_PATH>")

Decode messages

Below example demonstrates how cantools can be used to decode messages.

import can
from cantools import database
from collections import defaultdict

# Load database from file
db = database.load_file("<DBC_FILE_PATH>")

# Decoded signals
results = defaultdict(list)

# Open log file
with can.LogReader("<LOG_FILE_PATH>") as reader:

    # Read messages
    for msg in reader:

        # Get message name from database
        try:
            db_msg = db.get_message_by_frame_id(msg.arbitration_id)
        except KeyError:
            # Message not defined in database
            continue

        # Decode using database
        result = db.decode_message(db_msg.name, data=msg.data)

        # Add timestamp
        result.update({"t": msg.timestamp})

        # Append to results
        results[db_msg.name].append(result)

cantools has weak support for J1939 PGNs. However, by splitting the database into two (one for PDU format 1 and one for PDU format 2), decoding of PGNs can be done as demonstrated below.

Note

For better J1939 support, consider using the dedicated python-can-j1939 package.

import can
import math
from cantools import database, j1939
from collections import defaultdict

# Create a J1939 PDU1 and PDU2 database (with matching PGN ID masks)
db_pdu1 = database.Database(frame_id_mask=0x3FF0000)
db_pdu2 = database.Database(frame_id_mask=0x3FFFF00)

# Load database from file
for db_msg in database.load_file("<DBC_FILE_PATH>").messages:
    if j1939.is_pdu_format_1(j1939.frame_id_unpack(db_msg.frame_id).pdu_format):
        db_pdu1.messages.append(db_msg)
    else:
        db_pdu2.messages.append(db_msg)
db_pdu1.refresh()
db_pdu2.refresh()

# Decoded signals storage
results = defaultdict(list)

# Open log file
with can.LogReader("<LOG_FILE_PATH>") as reader:

    # Read messages
    for msg in reader:

        # J1939 is extended only
        if not msg.is_extended_id:
            continue

        # Select database based on PDU format
        if j1939.is_pdu_format_1(j1939.frame_id_unpack(msg.arbitration_id).pdu_format):
            db = db_pdu1
        else:
            db = db_pdu2

        # Get message name from database
        try:
            db_msg = db.get_message_by_frame_id(msg.arbitration_id)
        except KeyError:
            # Message not defined in database
            continue

        # Decode using database
        result = db.decode_message(db_msg.name, data=msg.data)

        # Ensure that signals satisfy min/max
        for name, value in result.items():
            db_sig = db_msg.get_signal_by_name(name)
            if not db_sig.maximum > value > db_sig.minimum:
                result[name] = math.nan

        # Add timestamp
        result.update({"t": msg.timestamp})

        # Append to results
        results[db_msg.name].append(result)
        print(result)

The decoded results can now easily be used in Python, e.g. with pandas and matplotlib as demonstrated below.

import pandas
from matplotlib import pyplot as plt

...

# Results to dataframes
dfs = {name: pandas.DataFrame.from_dict(value).set_index('t') for name, value in results.items()}

# Plot all dataframes
for name, df in dfs.items():
    df.plot(title=name)
    plt.show()
../../../_images/pyplot1.svg
../../../_images/pyplot2.svg