【Python】python-can使用记录

前言

之前用python做了一些汽车领域CAN模块相关自动化,主要集中在软件的代码开发阶段。随着功能的成熟,逐渐想把测试的工作也自动化起来,后来就写了个简单的上位机,支持Trace、Graph、Log、UDS、刷写等功能,可加载自定义py测试脚本。

近期突然想到能不能用canoe软件做上位机,再结合其他总线设备的桥接方式,从网上真找到了方案,尝试过后发现很棒。

不论是一开始的上位机,还是后面的桥接工具,都是基于这个python-can去做的开发,所以在此记录和分享下这个模块。

1 安装

  1. 设备的CAN驱动需要自行手动安装。

直接使用pip安装python-can

pip install python-can 

可在: pypi.org搜索“python-can”,查阅手册(链接),里面有详细的功能讲解以及支持的硬件设备。

2 功能说明

2.1 bus的实例化(创建)

2.1.1 Bus类

can.interface.Bus是比较关键的一个类,用于创建bus实体控制总线的行为。通俗地讲就是打开硬件设备,在此基础去控制报文收发。
以下是一个打开pcan设备通道PCAN_USBBUS1的例子。

import can bus = can.interface.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
  • interface:指的是硬件设备,参考下面BACKENDS。
  • channel:指的是通道。不同设备可能定义方式不同,pcan使用字符串’PCAN_USBBUS1’,vector可直接使用数字0123,具体需要参考设备对应定义的类。

2.1.2 ThreadSafeBus类

can.ThreadSafeBus是在Bus类的基础上封装,可安全的用于多线程访问,比如多个定时器同时发送报文。使用方式和Bus类一样,以下是一个打开pcan设备通道PCAN_USBBUS1的例子。

import can # bus = can.interface.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000) bus = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)

2.1.3 支持的硬件设备

点进类Bus定义的地方,找到_get_class_for_interface()函数,再跳转到BACKENDS定义的地方,可以看到支持的设备类型。除了几个常用的,大部分都不认识 0.0

# interface_name => (module, classname) BACKENDS:dict[str,tuple[str,str]]={"kvaser":("can.interfaces.kvaser","KvaserBus"),"socketcan":("can.interfaces.socketcan","SocketcanBus"),"serial":("can.interfaces.serial.serial_can","SerialBus"),"pcan":("can.interfaces.pcan","PcanBus"),"usb2can":("can.interfaces.usb2can","Usb2canBus"),"ixxat":("can.interfaces.ixxat","IXXATBus"),"nican":("can.interfaces.nican","NicanBus"),"iscan":("can.interfaces.iscan","IscanBus"),"virtual":("can.interfaces.virtual","VirtualBus"),"udp_multicast":("can.interfaces.udp_multicast","UdpMulticastBus"),"neovi":("can.interfaces.ics_neovi","NeoViBus"),"vector":("can.interfaces.vector","VectorBus"),"slcan":("can.interfaces.slcan","slcanBus"),"robotell":("can.interfaces.robotell","robotellBus"),"canalystii":("can.interfaces.canalystii","CANalystIIBus"),"systec":("can.interfaces.systec","UcanBus"),"seeedstudio":("can.interfaces.seeedstudio","SeeedBus"),"cantact":("can.interfaces.cantact","CantactBus"),"gs_usb":("can.interfaces.gs_usb","GsUsbBus"),"nixnet":("can.interfaces.nixnet","NiXNETcanBus"),"neousys":("can.interfaces.neousys","NeousysBus"),"etas":("can.interfaces.etas","EtasBus"),"socketcand":("can.interfaces.socketcand","SocketCanDaemonBus"),"zlg":("can.interfaces.zlg","ZlgCanBus"),# add zlg"bmcan":("can.interfaces.bmcan","BmCanBus"),# add busmuster}

Bus类本质上就是对上面这些类的封装,通过“interface”参数进行识别。也可直接调用设备定义的类,比如pcan:

import can.interfaces.pcan as pcan bus = pcan.PcanBus(channel='PCAN_USBBUS1', bitrate=500000)

每个设备定义的类都是基于抽象基类BusABC,实现报文的收发、波特率设置等通用的功能。对于读取设备信息、查看总线状态等较特殊的功能是没有的,如有需求可自行扩展实现。

2.2 关闭bus

bus在使用完后,要调用bus.shutdown方法将设备关闭,否则会影响再次开启。

2.3 收发报文

2.3.1 发送报文

bus实例化以后,直接调用send()方法,传入message参数即可发送报文。如:

bus.send(can.Message(arbitration_id=0x123, data=[0,1,2,3,4,5,6,7]))

2.3.2 接收报文

bus.recv(timeout)用于接收报文,该方法会阻塞当前进程,直到收到报文或者timeout发生。通常创建单独线程用于一直等待接收报文,如:

import threading defreceive_message():whileTrue: message = bus.recv()if message isnotNone:print(message) t = threading.Thread(target=receive_message) t.start()# bus是可迭代的(iter),也可用以下方式defreceive_message2():for message in bus:print(message)

2.4 Message

can.Message用于记录报文的属性,包括以下信息:

  • check (bool) 用于检测数据是否正常,比如dlc>64.
  • timestamp (float) 接收时会自动打戳,发送则不需要时间戳。
  • arbitration_id (int) 报文id。
  • is_extended_id (bool) 是否为扩展帧。
  • is_remote_frame (bool) 是否为远程帧。
  • is_error_frame (bool) 是否为错误帧。(设备有可能不支持)
  • channel (int | str | Sequence[int] | None) 和实例化Bus的一致。
  • dlc (int | None) 数据长度。取值从0~64,不是CAN线上4bit的dlc。
  • data (bytes | bytearray | int | Iterable[int] | None) 数据。
  • is_fd (bool) 是否为FD报文。
  • is_rx (bool) 接收时会自动设为True,发送则不需要关心。
  • bitrate_switch (bool) FD的BRS位。
  • error_state_indicator (bool) ESI位,表示主动错误(0)和被动错误(1)。

2.5 Bit timing

timing用于设置波特率、采样点相关参数。有些interface可以只传入buadrate参数,不需要设置详细的位域参数。

timing有两个类BitTiming()和BitTimingFd(),分别对应CAN和CANFD。参数基本类似,以下说明BitTimingFd():

  • f_clock (int) 时钟频率,Hz.
  • nom_brp (int) 仲裁域的时钟分频。
  • nom_tseg1 (int) 仲裁域的传播段+相位缓冲段1。参考下图示例
  • nom_tseg2 (int) 仲裁域的相位缓冲段2。
  • nom_sjw (int) 仲裁域的同步段。
  • data_brp (int) 数据域的时钟分频。
  • data_tseg1 (int) 数据域的传播段+相位缓冲段1。
  • data_tseg2 (int) 数据域的相位缓冲段2。
  • data_sjw (int) 数据域的同步段。

传入部分参数2(这个传入参数较少,会自动计算位域的值。但计算容易报错)

 can.BitTimingFd.from_sample_point( f_clock=80_000_000, nom_bitrate=1_000_000, nom_sample_point=75.0, data_bitrate=8_000_000, data_sample_point=70.0,)

传入部分参数1(这个少了分频多了波特率)

 can.BitTimingFd.from_bitrate_and_segments( f_clock=80_000_000, nom_bitrate=1_000_000, nom_tseg1=59, nom_tseg2=20, nom_sjw=10, data_bitrate=8_000_000, data_tseg1=6, data_tseg2=3, data_sjw=2,)

传入全部参数调用

can.BitTimingFd( f_clock=80_000_000, nom_brp=1, nom_tseg1=59, nom_tseg2=20, nom_sjw=10, data_brp=1, data_tseg1=6, data_tseg2=3, data_sjw=2,)

strict (bool) ISO-11898中定义的位域限制。默认False 忽略即可。

在这里插入图片描述


实例化方法有多个,可以传入全部参数,也可以只传入部分参数,如下:

2.6 Notifier

Notifier官方解释是bus的报文分发器。通俗的理解就是,当Bus上接收到报文后,去调用通知函数。

Notifier和前文“接收报文”中的逻辑类似,会创建线程用于处理接收报文。

    • bus (BusABC | list[BusABC]) 即创建的bus参数。
    • listeners (Iterable[Listener | Callable[[Message], Awaitable[None] | None]]) 监听列表中可以是Listener类,也可以是自定义函数。
    • timeout (float) 用于bus.recv()中的timeout参数。
    • loop (AbstractEventLoop | None) 多个listener是并行调用还是串行。默认是串行的。
  1. 方法 add_bus(bus)
    添加一个bus,每个bus会创建一个单独的线程,但listeners是共用的。
  2. 方法 add_listener(listener)
    添加一个listener,和实例化中的listener是一样的类型。
  3. 方法 remove_listener(listener)
    add_listener()本质是list.append(listener),remove_listener()是list.remove(listener)。注意当list.remove不存在的元素时,会报异常。
  4. 方法 stop(timeout=5.0)
    停止监听,主要用于释放线程资源。

实例化can.Notifier
can.Notifier(bus, listeners, timeout=1.0, loop=None)

import can.notifier # bus接收报文后,调用print函数打印报文,再调用my_rx_message函数处理报文 notifier = can.Notifier(bus,[print, my_rx_message])

2.7 Listener

Listener(ABC)是抽象基类,使用时要自定义类,并且实现on_message_received()方法。这个类用于Notifier中的listener,但Notifier也可添加自定义函数,所以很少用Listener。以下是一个示例:

import can.listener classMyListener(can.Listener):defon_message_received(self, msg: can.Message)->None:# 处理接收到的messageprint(msg) notifier.add_listener(MyListener())

2.8 报文log的读写

2.8.1 写入文件

can.Logger用于将报文log写入到文件中,格式包含以下种类:

MESSAGE_WRITERS: Final[dict[str,type[MessageWriter]]]={".asc": ASCWriter,".blf": BLFWriter,".csv": CSVWriter,".db": SqliteWriter,".log": CanutilsLogWriter,".mf4": MF4Writer,".trc": TRCWriter,".txt": Printer,}

Logger可自动识别文件后缀,去调用不同的文件编写器类。以下是一个示例:

import can.logger file="msg_log.blf" log = can.Logger(file)# 把log添加到notifier中,收到报文后写入notifier会调用log.on_message_received(msg) notifier.add_listener(log)

2.8.2 读取文件

can.LogReader用于读取报文log文件,示例如下:

import can.logger read_file ="msg_log.blf" reader = can.LogReader(read_file)for message in reader:print(message)# 也可以将读取到的报文写入到另一个文件中,增加自定义的过滤功能。 write_file ="msg_log_filter.blf" writer = can.Logger(write_file)for message in reader:print(message)# 过滤报文,只写入id为0x123的报文if message.arbitration_id ==0x123: writer.on_message_received(message)

2.9 扫描连接设备

python-can支持扫描当前连接的硬件设备,使用can.interface.detect_available_configs()函数,示例如下:

import can for channel in can.interface.detect_available_configs(interfaces=can.interfaces.VALID_INTERFACES, timeout=5):print(channel)# 打印信息如下:# Kvaser canlib is unavailable. argument of type 'NoneType' is not iterable# uptime library not available, timestamps are relative to boot time and not to Epoch UTC# zlg library not found: zlgcan.dll# BusMaster library not found: BMAPI64.dll# ... 省略部分log信息,某些pip模块没安装或者设备驱动没安装。# {'interface': 'vector', 'channel': 0, 'serial': 100, 'channel_index': 0, 'hw_type': <XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, 'hw_index': 0, 'hw_channel': 0, 'supports_fd': True, 'vector_channel_config': VectorChannelConfig(name='Virtual Channel 1', hw_type=<XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, hw_index=0, hw_channel=0, channel_index=0, channel_mask=1, channel_capabilities=<XL_ChannelCapabilities.XL_CHANNEL_FLAG_TIME_SYNC_RUNNING|XL_CHANNEL_FLAG_CANFD_BOSCH_SUPPORT|XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT|6: 2684354567>, channel_bus_capabilities=<XL_BusCapabilities.XL_BUS_COMPATIBLE_CAN|XL_BUS_ACTIVE_CAP_CAN: 65537>, is_on_bus=False, connected_bus_type=<XL_BusTypes.XL_BUS_TYPE_NONE: 0>, bus_params=VectorBusParams(bus_type=<XL_BusTypes.XL_BUS_TYPE_CAN: 1>, can=VectorCanParams(bitrate=500000, sjw=1, tseg1=4, tseg2=3, sam=1, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>), canfd=VectorCanFdParams(bitrate=500000, data_bitrate=0, sjw_abr=1, tseg1_abr=4, tseg2_abr=3, sam_abr=1, sjw_dbr=0, tseg1_dbr=0, tseg2_dbr=0, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>)), serial_number=100, article_number=7000, transceiver_name=' Virtual CAN')}# {'interface': 'vector', 'channel': 1, 'serial': 100, 'channel_index': 1, 'hw_type': <XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, 'hw_index': 0, 'hw_channel': 1, 'supports_fd': True, 'vector_channel_config': VectorChannelConfig(name='Virtual Channel 2', hw_type=<XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, hw_index=0, hw_channel=1, channel_index=1, channel_mask=2, channel_capabilities=<XL_ChannelCapabilities.XL_CHANNEL_FLAG_TIME_SYNC_RUNNING|XL_CHANNEL_FLAG_CANFD_BOSCH_SUPPORT|XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT|6: 2684354567>, channel_bus_capabilities=<XL_BusCapabilities.XL_BUS_COMPATIBLE_CAN|XL_BUS_ACTIVE_CAP_CAN: 65537>, is_on_bus=False, connected_bus_type=<XL_BusTypes.XL_BUS_TYPE_NONE: 0>, bus_params=VectorBusParams(bus_type=<XL_BusTypes.XL_BUS_TYPE_CAN: 1>, can=VectorCanParams(bitrate=500000, sjw=1, tseg1=4, tseg2=3, sam=1, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>), canfd=VectorCanFdParams(bitrate=500000, data_bitrate=0, sjw_abr=1, tseg1_abr=4, tseg2_abr=3, sam_abr=1, sjw_dbr=0, tseg1_dbr=0, tseg2_dbr=0, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>)), serial_number=100, article_number=7000, transceiver_name=' Virtual CAN')}# {'interface': 'virtual', 'channel': 'channel-2952'}# {'interface': 'pcan', 'channel': 'PCAN_USBBUS1', 'supports_fd': True, 'controller_number': 0, 'device_features': 3, 'device_id': 4294967295, 'device_name': 'PCAN-USB Pro FD', 'device_type': 5, 'channel_condition': 1}# {'interface': 'pcan', 'channel': 'PCAN_USBBUS2', 'supports_fd': True, 'controller_number': 1, 'device_features': 3, 'device_id': 4294967295, 'device_name': 'PCAN-USB Pro FD', 'device_type': 5, 'channel_condition': 1}
  • interfaces:需要扫描的硬件设备,比如传入‘pcan’。
  • timeout:超时时间。

detect_available_configs()使用同步搜索的方式,即并行搜索传入的interface。返回的是一个[dict],可以当Bus类的参数使用,比如:

import can for info in can.interface.detect_available_configs(interfaces='pcan', timeout=5): bus = can.interface.Bus(interface=info['interface'], channel=info['channel'])print(bus)

can.interface.Bus(**info)直接传入参数可能会造成使用通道错误,因为有些设备也可通过其他参数开启,比如vector的channel_index,pcan的device_id。

2.10 bridge

这里的bridge指的是将一个设备的报文转发到另一个设备上,当了解python-can连接设备和收发报文后,这个功能就不难实现了。python-can中的bridge.py文件中有一个示例,可以参考。

另外一个关键的用途是,可将vector CAN上位机(CANoe、CANape)收发的报文和其他硬件设备(比如PCAN)进行交互,降低了硬件费用带来的学习门槛。可参考这两个文章:

以下是一个示例:

import can bus_virtual = can.interface.Bus(interface='vector', channel_index=0, channel=0)# vector虚拟通道得使用channel_index进行识别 bus_real = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1')# virtual的接收发送到real notifier_virtual = can.Notifier(bus_virtual,[bus_real.send])# real的接收发送到virtual notifier_real = can.Notifier(bus_real,[bus_virtual.send])whileTrue:pass

在此基础上,增加读取virtual通道波特率参数、自动识别硬件设备等功能,再写个界面并加以完善细节,就可很方便的解决没有硬件设备的烦恼了 ^_^

总结

个人认为python-can是支持can最强的一个python库。虽然有些设备在库中不支持,但可在硬件设备的官网找到代码集成进去,或者自行开发也是很方便的。
以上仅记录的一些常用的内容,详细内容还请参考官方文档,或者阅读推荐的这篇博客python-can简述

== 欢迎批评指正和探讨交流 ^_^ ==

Read more

Flutter 三方库 async_extension 的鸿蒙化适配指南 - 实现具备高级异步编排算法与流操作扩展的并发工具集、支持端侧复杂业务流的函数式处理实战

Flutter 三方库 async_extension 的鸿蒙化适配指南 - 实现具备高级异步编排算法与流操作扩展的并发工具集、支持端侧复杂业务流的函数式处理实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 async_extension 的鸿蒙化适配指南 - 实现具备高级异步编排算法与流操作扩展的并发工具集、支持端侧复杂业务流的函数式处理实战 前言 在进行 Flutter for OpenHarmony 的大规模异步业务系统(如实时行情刷新、多源数据聚合)开发时,如何更优雅地处理 Future 的超时竞争、Stream 的防抖(Debounce)或复杂的并发队列控制?虽然 Dart async 包提供了基础功能,但 async_extension 进一步扩展了异步编程的边界,提供了更符合函数式范式的工具。本文将探讨如何在鸿蒙端构建极致、高效的异步处理链路。 一、原直观解析 / 概念介绍 1.1 基础原理 该库通过对 Dart 核心异步类的非侵入式扩展(Extensions)

By Ne0inhk
计算机毕业设计必看必学~基于springboot大学生实习管理系统的设计与实现,原创定制程序、单片机、java、PHP、Python、小程序、文案全套、毕设成品等!

计算机毕业设计必看必学~基于springboot大学生实习管理系统的设计与实现,原创定制程序、单片机、java、PHP、Python、小程序、文案全套、毕设成品等!

springboot大学生实习管理系统 摘要 随着大学生实习的日益重要和广泛普及,建立一套高效、便捷的大学生实习管理系统对于高校和学生都具有重要意义。本文基于Spring Boot框架,设计并开发了一套大学生实习管理系统,旨在提供一个全面、可靠的平台,方便学生、教师和企业进行实习管理。 该系统采用了前后端分离的架构,前端使用Vue.js技术栈开发,后端使用Spring Boot框架搭建。系统实现了学生信息管理、实习岗位发布与申请、实习评分等功能。学生可以通过系统查看自己的实习信息、提交实习报告和评价,并与企业和指导老师进行沟通交流。教师可以管理学生的实习任务、审核实习报告以及评定实习成绩。企业可以发布实习岗位、筛选学生申请,并进行实习任务的管理和评价。 在系统实现过程中,充分考虑了用户体验和安全性。通过合理的权限控制机制,确保不同角色用户只能访问其具备权限的功能。同时,系统也提供了数据备份和恢复功能,保障数据的安全性和可靠性。 实际测试结果表明,该系统具有良好的稳定性和可用性,满足了大学生实习管理的需求。用户对系统的易用性和功能完善性给予了积极评价。 综上所述,本文设计和开

By Ne0inhk
vue3+python基于python的球类体育赛事发布和在线购票选座系统60576715

vue3+python基于python的球类体育赛事发布和在线购票选座系统60576715

目录 * 技术栈与项目概述 * 系统架构设计 * 数据库模型设计 * 前端关键技术实现 * 后端核心逻辑 * 安全与性能优化 * 测试与部署 * 扩展方向 * 开发技术路线 * 源码lw获取/同行可拿货,招校园代理 :文章底部获取博主联系方式! 技术栈与项目概述 * 前端框架: Vue 3 (Composition API + TypeScript) * 后端语言: Python (FastAPI/Django 选型分析) * 数据库: PostgreSQL/MySQL 与 Redis 缓存 * 核心功能: 赛事发布、在线选座购票、支付集成、实时数据更新 系统架构设计 * 前后端分离: RESTful API 接口设计规范 * 微服务模块划分: 用户服务、赛事管理、订单支付、座位库存 * WebSocket 应用: 实时推送座位锁定状态与赛事更新

By Ne0inhk