跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Python

python-can 模块使用记录

python-can 是一个用于控制 CAN 总线的 Python 库,支持多种硬件接口。文章介绍了安装方法、总线实例化(Bus 类)、报文收发、消息属性定义、波特率设置、通知器(Notifier)与监听器(Listener)的使用、日志读写、设备扫描以及总线桥接功能。通过该库可实现 CAN 报文的自动化测试与硬件交互,支持虚拟通道与物理设备的转发,适用于汽车电子领域的开发与测试场景。

协议工匠发布于 2026/3/16更新于 2026/6/1530 浏览

概述

本文记录了 python-can 在汽车领域 CAN 模块自动化测试中的应用,涵盖上位机开发及总线桥接方案。该库基于 python-can 进行开发,支持 Trace、Graph、Log、UDS、刷写等功能。

1 安装

设备的 CAN 驱动需自行手动安装。 使用 pip 安装 python-can:

pip install python-can

可查阅官方手册了解详细功能讲解及支持的硬件设备。

2 功能说明

2.1 bus 的实例化(创建)

2.1.1 Bus 类

can.interface.Bus 是用于创建 bus 实体控制总线行为的关键类。通俗地讲就是打开硬件设备,在此基础去控制报文收发。 以下是一个打开 pcan 设备通道 PCAN_USBBUS1 的例子:

import can
bus = can.interface.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
  • interface:指的是硬件设备,参考 BACKENDS。
  • channel:指的是通道。不同设备可能定义方式不同,pcan 使用字符串'PCAN_USBBUS1',vector 可直接使用数字 0123,具体需要参考设备对应定义的类。
2.1.2 ThreadSafeBus 类

can.ThreadSafeBus 是在 Bus 类的基础上封装,可安全的用于多线程访问,比如多个定时器同时发送报文。使用方式和 Bus 类一样:

import can
bus = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
2.1.3 支持的硬件设备

可通过查看源码中的 BACKENDS 定义了解支持的设备类型。大部分常用接口包括 kvaser, socketcan, serial, pcan, usb2can, ixxat, nican, iscan, virtual, udp_multicast, neovi, vector, slcan, robotell, canalystii, systec, seeedstudio, cantact, gs_usb, nixnet, neousys, etas, socketcand, zlg 等。 Bus 类本质上是对这些类的封装,通过'interface'参数进行识别。也可直接调用设备定义的类,比如 pcan:

import can.interfaces.pcan as pcan
bus = pcan.PcanBus(channel='PCAN_USBBUS1', bitrate=500000)

每个设备定义的类都是基于抽象基类 BusABC,实现报文的收发、波特率设置等通用的功能。

2.2 关闭 bus

bus 在使用完后,要调用 bus.shutdown 方法将设备关闭,否则会影响再次开启。

2.3 收发报文

2.3.1 发送报文

bus 实例化以后,直接调用 send() 方法,传入 message 参数即可发送报文。

bus.send(can.Message(arbitration_id=0x123, data=[,,,,,,,]))
0
1
2
3
4
5
6
7
2.3.2 接收报文

bus.recv(timeout) 用于接收报文,该方法会阻塞当前进程,直到收到报文或者 timeout 发生。通常创建单独线程用于一直等待接收报文。

import threading
def receive_message():
    while True:
        message = bus.recv()
        if message is not None:
            print(message)
t = threading.Thread(target=receive_message)
t.start()
# bus 是可迭代的(iter),也可用以下方式
def receive_message2():
    for message in bus:
        print(message)

2.4 Message

can.Message 用于记录报文的属性,包括以下信息:

  • check (bool) 用于检测数据是否正常,比如 dlc>64.
  • timestamp (float) 接收时会自动打戳,发送则不需要时间戳。
  • arbitration_id (int) 报文 id。
  • is_extended_id (bool) 是否为扩展帧。
  • is_remote_frame (bool) 是否为远程帧。
  • is_error_frame (bool) 是否为错误帧。(设备有可能不支持)
  • channel (int | str | Sequence[int] | None) 和实例化 Bus 的一致。
  • dlc (int | None) 数据长度。取值从 0~64,不是 CAN 线上 4bit 的 dlc。
  • data (bytes | bytearray | int | Iterable[int] | None) 数据。
  • is_fd (bool) 是否为 FD 报文。
  • is_rx (bool) 接收时会自动设为 True,发送则不需要关心。
  • bitrate_switch (bool) FD 的 BRS 位。
  • error_state_indicator (bool) ESI 位,表示主动错误(0)和被动错误(1)。

2.5 Bit timing

timing 用于设置波特率、采样点相关参数。有些 interface 可以只传入 bitrate 参数,不需要设置详细的位域参数。 timing 有两个类 BitTiming() 和 BitTimingFd(),分别对应 CAN 和 CANFD。参数基本类似,以下说明 BitTimingFd():

  • f_clock (int) 时钟频率,Hz.
  • nom_brp (int) 仲裁域的时钟分频。
  • nom_tseg1 (int) 仲裁域的传播段 + 相位缓冲段 1。
  • nom_tseg2 (int) 仲裁域的相位缓冲段 2。
  • nom_sjw (int) 仲裁域的同步段。
  • data_brp (int) 数据域的时钟分频。
  • data_tseg1 (int) 数据域的传播段 + 相位缓冲段 1。
  • data_tseg2 (int) 数据域的相位缓冲段 2。
  • data_sjw (int) 数据域的同步段。

传入部分参数(这个传入参数较少,会自动计算位域的值。但计算容易报错):

can.BitTimingFd.from_sample_point(
    f_clock=80_000_000,
    nom_bitrate=1_000_000,
    nom_sample_point=75.0,
    data_bitrate=8_000_000,
    data_sample_point=70.0,
)

传入部分参数 1(这个少了分频多了波特率):

can.BitTimingFd.from_bitrate_and_segments(
    f_clock=80_000_000,
    nom_bitrate=1_000_000,
    nom_tseg1=59,
    nom_tseg2=20,
    nom_sjw=10,
    data_bitrate=8_000_000,
    data_tseg1=6,
    data_tseg2=3,
    data_sjw=2,
)

传入全部参数调用:

can.BitTimingFd(
    f_clock=80_000_000,
    nom_brp=1,
    nom_tseg1=59,
    nom_tseg2=20,
    nom_sjw=10,
    data_brp=1,
    data_tseg1=6,
    data_tseg2=3,
    data_sjw=2,
)

strict (bool) ISO-11898 中定义的位域限制。默认 False 忽略即可。

2.6 Notifier

Notifier 官方解释是 bus 的报文分发器。通俗的理解就是,当 Bus 上接收到报文后,去调用通知函数。 Notifier 和前文'接收报文'中的逻辑类似,会创建线程用于处理接收报文。

  • bus (BusABC | list[BusABC]) 即创建的 bus 参数。
  • listeners (Iterable[Listener | Callable[[Message], Awaitable[None] | None]]) 监听列表中可以是 Listener 类,也可以是自定义函数。
  • timeout (float) 用于 bus.recv() 中的 timeout 参数。
  • loop (AbstractEventLoop | None) 多个 listener 是并行调用还是串行。默认是串行的。

方法说明:

  1. add_bus(bus):添加一个 bus,每个 bus 会创建一个单独的线程,但 listeners 是共用的。
  2. add_listener(listener):添加一个 listener,和实例化中的 listener 是一样的类型。
  3. remove_listener(listener):add_listener() 本质是 list.append(listener),remove_listener() 是 list.remove(listener)。注意当 list.remove 不存在的元素时,会报异常。
  4. stop(timeout=5.0):停止监听,主要用于释放线程资源。

实例化 can.Notifier:

import can.notifier
# bus 接收报文后,调用 print 函数打印报文,再调用 my_rx_message 函数处理报文
notifier = can.Notifier(bus, [print, my_rx_message])

2.7 Listener

Listener(ABC) 是抽象基类,使用时要自定义类,并且实现 on_message_received() 方法。这个类用于 Notifier 中的 listener,但 Notifier 也可添加自定义函数,所以很少用 Listener。

import can.listener
class MyListener(can.Listener):
    def on_message_received(self, msg: can.Message) -> None:
        # 处理接收到的 message
        print(msg)
notifier.add_listener(MyListener())

2.8 报文 log 的读写

2.8.1 写入文件

can.Logger 用于将报文 log 写入到文件中,格式包含以下种类:

MESSAGE_WRITERS: Final[dict[str, type[MessageWriter]]] = {
    ".asc": ASCWriter,
    ".blf": BLFWriter,
    ".csv": CSVWriter,
    ".db": SqliteWriter,
    ".log": CanutilsLogWriter,
    ".mf4": MF4Writer,
    ".trc": TRCWriter,
    ".txt": Printer,
}

Logger 可自动识别文件后缀,去调用不同的文件编写器类。

import can.logger
file = "msg_log.blf"
log = can.Logger(file)
# 把 log 添加到 notifier 中,收到报文后写入 notifier 会调用 log.on_message_received(msg)
notifier.add_listener(log)
2.8.2 读取文件

can.LogReader 用于读取报文 log 文件。

import can.logger
read_file = "msg_log.blf"
reader = can.LogReader(read_file)
for message in reader:
    print(message)
# 也可以将读取到的报文写入到另一个文件中,增加自定义的过滤功能。
write_file = "msg_log_filter.blf"
writer = can.Logger(write_file)
for message in reader:
    print(message)
    # 过滤报文,只写入 id 为 0x123 的报文
    if message.arbitration_id == 0x123:
        writer.on_message_received(message)

2.9 扫描连接设备

python-can 支持扫描当前连接的硬件设备,使用 can.interface.detect_available_configs() 函数。

import can
for channel in can.interface.detect_available_configs(interfaces=can.interfaces.VALID_INTERFACES, timeout=5):
    print(channel)
  • interfaces:需要扫描的硬件设备,比如传入'pcan'。
  • timeout:超时时间。 detect_available_configs() 使用同步搜索的方式,即并行搜索传入的 interface。返回的是一个 [dict],可以当 Bus 类的参数使用。
import can
for info in can.interface.detect_available_configs(interfaces='pcan', timeout=5):
    bus = can.interface.Bus(interface=info['interface'], channel=info['channel'])
    print(bus)

can.interface.Bus(**info) 直接传入参数可能会造成使用通道错误,因为有些设备也可通过其他参数开启,比如 vector 的 channel_index,pcan 的 device_id。

2.10 bridge

这里的 bridge 指的是将一个设备的报文转发到另一个设备上,当了解 python-can 连接设备和收发报文后,这个功能就不难实现了。python-can 中的 bridge.py 文件中有一个示例,可以参考。 另外一个关键的用途是,可将 vector CAN 上位机(CANoe、CANape)收发的报文和其他硬件设备(比如 PCAN)进行交互,降低了硬件费用带来的学习门槛。

import can
bus_virtual = can.interface.Bus(interface='vector', channel_index=0, channel=0)
# vector 虚拟通道得使用 channel_index 进行识别
bus_real = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1')
# virtual 的接收发送到 real
notifier_virtual = can.Notifier(bus_virtual, [bus_real.send])
# real 的接收发送到 virtual
notifier_real = can.Notifier(bus_real, [bus_virtual.send])
while True:
    pass

在此基础上,增加读取 virtual 通道波特率参数、自动识别硬件设备等功能,再写个界面并加以完善细节,就可很方便的解决没有硬件设备的烦恼了。

总结

python-can 是支持 CAN 最强的一个 Python 库。虽然有些设备在库中不支持,但可在硬件设备的官网找到代码集成进去,或者自行开发也是很方便的。以上仅记录的一些常用的内容,详细内容还请参考官方文档。

目录

  1. 概述
  2. 1 安装
  3. 2 功能说明
  4. 2.1 bus 的实例化(创建)
  5. 2.1.1 Bus 类
  6. 2.1.2 ThreadSafeBus 类
  7. 2.1.3 支持的硬件设备
  8. 2.2 关闭 bus
  9. 2.3 收发报文
  10. 2.3.1 发送报文
  11. 2.3.2 接收报文
  12. bus 是可迭代的(iter),也可用以下方式
  13. 2.4 Message
  14. 2.5 Bit timing
  15. 2.6 Notifier
  16. bus 接收报文后,调用 print 函数打印报文,再调用 myrxmessage 函数处理报文
  17. 2.7 Listener
  18. 2.8 报文 log 的读写
  19. 2.8.1 写入文件
  20. 把 log 添加到 notifier 中,收到报文后写入 notifier 会调用 log.onmessagereceived(msg)
  21. 2.8.2 读取文件
  22. 也可以将读取到的报文写入到另一个文件中,增加自定义的过滤功能。
  23. 2.9 扫描连接设备
  24. 2.10 bridge
  25. vector 虚拟通道得使用 channel_index 进行识别
  26. virtual 的接收发送到 real
  27. real 的接收发送到 virtual
  28. 总结
  • 免费图片AI生成工具免费生成了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 免费图片视频在线生成30秒,将你的创意变成现实开始设计
  • X/Twitter免费视频下载器免登陆无限额度免费视频解析下载了解详情
  • 100+免费在线小游戏爽一把
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • OpenClaw 浏览器控制方案:利用 Chrome Debug 模式实现全自动控制
  • 前端开发中支持跨域的 HTML 标签、属性及缓存机制总结
  • Vue 组件开发中的枚举值验证:从 Type 属性错误说起
  • 腾讯混元 Image 2.1 GGUF 版:消费级显卡本地部署方案
  • 智能无人机平台 V4 版本功能升级与实现
  • MySQL 基础入门:数据库概念、架构与 SQL 分类详解
  • Spring Boot 自定义错误页面:404/500 页面定制与 ErrorController
  • 2025-0xGame Web 安全挑战全解
  • ROS 导航实战:使用 mpc_local_planner 实现高效避障与参数调优
  • 重要算法思想:贪心、二分、正难则反、多重与完全背包精练
  • 网络安全行业职业发展路径与技能要求详解
  • Python 驱动的 ADS 自动化仿真框架与 API 实战指南
  • OpenAI Python API 使用指南:基础功能与异步处理
  • Llama-2-7b 昇腾 NPU 部署与性能测评实战
  • OpenViking 部署与应用:字节跳动开源 AI 代理上下文数据库
  • Sora 火爆背后:国内真正合格的 AI 产品经理为何稀缺?
  • 微信小程序原生开发入门:从零构建首个可交互页面
  • 解决 Python 报错:No module named pkg_resources
  • whisper.cpp 离线语音识别快速上手指南
  • Llama-Factory 支持百种预训练模型详解与微调方案

相关免费在线工具

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online

  • Markdown转HTML

    将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online

  • HTML转Markdown

    将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online

  • JSON 压缩

    通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online