【Python】python-can使用记录
前言
之前用python做了一些汽车领域CAN模块相关自动化,主要集中在软件的代码开发阶段。随着功能的成熟,逐渐想把测试的工作也自动化起来,后来就写了个简单的上位机,支持Trace、Graph、Log、UDS、刷写等功能,可加载自定义py测试脚本。
近期突然想到能不能用canoe软件做上位机,再结合其他总线设备的桥接方式,从网上真找到了方案,尝试过后发现很棒。
不论是一开始的上位机,还是后面的桥接工具,都是基于这个python-can去做的开发,所以在此记录和分享下这个模块。
1 安装
- 设备的CAN驱动需要自行手动安装。
直接使用pip安装python-can
pip install python-can 可在: pypi.org搜索“python-can”,查阅手册(链接),里面有详细的功能讲解以及支持的硬件设备。
2 功能说明
2.1 bus的实例化(创建)
2.1.1 Bus类
can.interface.Bus是比较关键的一个类,用于创建bus实体控制总线的行为。通俗地讲就是打开硬件设备,在此基础去控制报文收发。
以下是一个打开pcan设备通道PCAN_USBBUS1的例子。
import can bus = can.interface.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)- interface:指的是硬件设备,参考下面BACKENDS。
- channel:指的是通道。不同设备可能定义方式不同,pcan使用字符串’PCAN_USBBUS1’,vector可直接使用数字0123,具体需要参考设备对应定义的类。
2.1.2 ThreadSafeBus类
can.ThreadSafeBus是在Bus类的基础上封装,可安全的用于多线程访问,比如多个定时器同时发送报文。使用方式和Bus类一样,以下是一个打开pcan设备通道PCAN_USBBUS1的例子。
import can # bus = can.interface.Bus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000) bus = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)2.1.3 支持的硬件设备
点进类Bus定义的地方,找到_get_class_for_interface()函数,再跳转到BACKENDS定义的地方,可以看到支持的设备类型。除了几个常用的,大部分都不认识 0.0
# interface_name => (module, classname) BACKENDS:dict[str,tuple[str,str]]={"kvaser":("can.interfaces.kvaser","KvaserBus"),"socketcan":("can.interfaces.socketcan","SocketcanBus"),"serial":("can.interfaces.serial.serial_can","SerialBus"),"pcan":("can.interfaces.pcan","PcanBus"),"usb2can":("can.interfaces.usb2can","Usb2canBus"),"ixxat":("can.interfaces.ixxat","IXXATBus"),"nican":("can.interfaces.nican","NicanBus"),"iscan":("can.interfaces.iscan","IscanBus"),"virtual":("can.interfaces.virtual","VirtualBus"),"udp_multicast":("can.interfaces.udp_multicast","UdpMulticastBus"),"neovi":("can.interfaces.ics_neovi","NeoViBus"),"vector":("can.interfaces.vector","VectorBus"),"slcan":("can.interfaces.slcan","slcanBus"),"robotell":("can.interfaces.robotell","robotellBus"),"canalystii":("can.interfaces.canalystii","CANalystIIBus"),"systec":("can.interfaces.systec","UcanBus"),"seeedstudio":("can.interfaces.seeedstudio","SeeedBus"),"cantact":("can.interfaces.cantact","CantactBus"),"gs_usb":("can.interfaces.gs_usb","GsUsbBus"),"nixnet":("can.interfaces.nixnet","NiXNETcanBus"),"neousys":("can.interfaces.neousys","NeousysBus"),"etas":("can.interfaces.etas","EtasBus"),"socketcand":("can.interfaces.socketcand","SocketCanDaemonBus"),"zlg":("can.interfaces.zlg","ZlgCanBus"),# add zlg"bmcan":("can.interfaces.bmcan","BmCanBus"),# add busmuster}Bus类本质上就是对上面这些类的封装,通过“interface”参数进行识别。也可直接调用设备定义的类,比如pcan:
import can.interfaces.pcan as pcan bus = pcan.PcanBus(channel='PCAN_USBBUS1', bitrate=500000)每个设备定义的类都是基于抽象基类BusABC,实现报文的收发、波特率设置等通用的功能。对于读取设备信息、查看总线状态等较特殊的功能是没有的,如有需求可自行扩展实现。
2.2 关闭bus
bus在使用完后,要调用bus.shutdown方法将设备关闭,否则会影响再次开启。
2.3 收发报文
2.3.1 发送报文
bus实例化以后,直接调用send()方法,传入message参数即可发送报文。如:
bus.send(can.Message(arbitration_id=0x123, data=[0,1,2,3,4,5,6,7]))2.3.2 接收报文
bus.recv(timeout)用于接收报文,该方法会阻塞当前进程,直到收到报文或者timeout发生。通常创建单独线程用于一直等待接收报文,如:
import threading defreceive_message():whileTrue: message = bus.recv()if message isnotNone:print(message) t = threading.Thread(target=receive_message) t.start()# bus是可迭代的(iter),也可用以下方式defreceive_message2():for message in bus:print(message)2.4 Message
can.Message用于记录报文的属性,包括以下信息:
- check (bool) 用于检测数据是否正常,比如dlc>64.
- timestamp (float) 接收时会自动打戳,发送则不需要时间戳。
- arbitration_id (int) 报文id。
- is_extended_id (bool) 是否为扩展帧。
- is_remote_frame (bool) 是否为远程帧。
- is_error_frame (bool) 是否为错误帧。(设备有可能不支持)
- channel (int | str | Sequence[int] | None) 和实例化Bus的一致。
- dlc (int | None) 数据长度。取值从0~64,不是CAN线上4bit的dlc。
- data (bytes | bytearray | int | Iterable[int] | None) 数据。
- is_fd (bool) 是否为FD报文。
- is_rx (bool) 接收时会自动设为True,发送则不需要关心。
- bitrate_switch (bool) FD的BRS位。
- error_state_indicator (bool) ESI位,表示主动错误(0)和被动错误(1)。
2.5 Bit timing
timing用于设置波特率、采样点相关参数。有些interface可以只传入buadrate参数,不需要设置详细的位域参数。
timing有两个类BitTiming()和BitTimingFd(),分别对应CAN和CANFD。参数基本类似,以下说明BitTimingFd():
- f_clock (int) 时钟频率,Hz.
- nom_brp (int) 仲裁域的时钟分频。
- nom_tseg1 (int) 仲裁域的传播段+相位缓冲段1。参考下图示例
- nom_tseg2 (int) 仲裁域的相位缓冲段2。
- nom_sjw (int) 仲裁域的同步段。
- data_brp (int) 数据域的时钟分频。
- data_tseg1 (int) 数据域的传播段+相位缓冲段1。
- data_tseg2 (int) 数据域的相位缓冲段2。
- data_sjw (int) 数据域的同步段。
传入部分参数2(这个传入参数较少,会自动计算位域的值。但计算容易报错)
can.BitTimingFd.from_sample_point( f_clock=80_000_000, nom_bitrate=1_000_000, nom_sample_point=75.0, data_bitrate=8_000_000, data_sample_point=70.0,)传入部分参数1(这个少了分频多了波特率)
can.BitTimingFd.from_bitrate_and_segments( f_clock=80_000_000, nom_bitrate=1_000_000, nom_tseg1=59, nom_tseg2=20, nom_sjw=10, data_bitrate=8_000_000, data_tseg1=6, data_tseg2=3, data_sjw=2,)传入全部参数调用
can.BitTimingFd( f_clock=80_000_000, nom_brp=1, nom_tseg1=59, nom_tseg2=20, nom_sjw=10, data_brp=1, data_tseg1=6, data_tseg2=3, data_sjw=2,)strict (bool) ISO-11898中定义的位域限制。默认False 忽略即可。

实例化方法有多个,可以传入全部参数,也可以只传入部分参数,如下:
2.6 Notifier
Notifier官方解释是bus的报文分发器。通俗的理解就是,当Bus上接收到报文后,去调用通知函数。
Notifier和前文“接收报文”中的逻辑类似,会创建线程用于处理接收报文。
- bus (BusABC | list[BusABC]) 即创建的bus参数。
- listeners (Iterable[Listener | Callable[[Message], Awaitable[None] | None]]) 监听列表中可以是Listener类,也可以是自定义函数。
- timeout (float) 用于bus.recv()中的timeout参数。
- loop (AbstractEventLoop | None) 多个listener是并行调用还是串行。默认是串行的。
- 方法 add_bus(bus)
添加一个bus,每个bus会创建一个单独的线程,但listeners是共用的。 - 方法 add_listener(listener)
添加一个listener,和实例化中的listener是一样的类型。 - 方法 remove_listener(listener)
add_listener()本质是list.append(listener),remove_listener()是list.remove(listener)。注意当list.remove不存在的元素时,会报异常。 - 方法 stop(timeout=5.0)
停止监听,主要用于释放线程资源。
实例化can.Notifier
can.Notifier(bus, listeners, timeout=1.0, loop=None)
import can.notifier # bus接收报文后,调用print函数打印报文,再调用my_rx_message函数处理报文 notifier = can.Notifier(bus,[print, my_rx_message])2.7 Listener
Listener(ABC)是抽象基类,使用时要自定义类,并且实现on_message_received()方法。这个类用于Notifier中的listener,但Notifier也可添加自定义函数,所以很少用Listener。以下是一个示例:
import can.listener classMyListener(can.Listener):defon_message_received(self, msg: can.Message)->None:# 处理接收到的messageprint(msg) notifier.add_listener(MyListener())2.8 报文log的读写
2.8.1 写入文件
can.Logger用于将报文log写入到文件中,格式包含以下种类:
MESSAGE_WRITERS: Final[dict[str,type[MessageWriter]]]={".asc": ASCWriter,".blf": BLFWriter,".csv": CSVWriter,".db": SqliteWriter,".log": CanutilsLogWriter,".mf4": MF4Writer,".trc": TRCWriter,".txt": Printer,}Logger可自动识别文件后缀,去调用不同的文件编写器类。以下是一个示例:
import can.logger file="msg_log.blf" log = can.Logger(file)# 把log添加到notifier中,收到报文后写入notifier会调用log.on_message_received(msg) notifier.add_listener(log)2.8.2 读取文件
can.LogReader用于读取报文log文件,示例如下:
import can.logger read_file ="msg_log.blf" reader = can.LogReader(read_file)for message in reader:print(message)# 也可以将读取到的报文写入到另一个文件中,增加自定义的过滤功能。 write_file ="msg_log_filter.blf" writer = can.Logger(write_file)for message in reader:print(message)# 过滤报文,只写入id为0x123的报文if message.arbitration_id ==0x123: writer.on_message_received(message)2.9 扫描连接设备
python-can支持扫描当前连接的硬件设备,使用can.interface.detect_available_configs()函数,示例如下:
import can for channel in can.interface.detect_available_configs(interfaces=can.interfaces.VALID_INTERFACES, timeout=5):print(channel)# 打印信息如下:# Kvaser canlib is unavailable. argument of type 'NoneType' is not iterable# uptime library not available, timestamps are relative to boot time and not to Epoch UTC# zlg library not found: zlgcan.dll# BusMaster library not found: BMAPI64.dll# ... 省略部分log信息,某些pip模块没安装或者设备驱动没安装。# {'interface': 'vector', 'channel': 0, 'serial': 100, 'channel_index': 0, 'hw_type': <XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, 'hw_index': 0, 'hw_channel': 0, 'supports_fd': True, 'vector_channel_config': VectorChannelConfig(name='Virtual Channel 1', hw_type=<XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, hw_index=0, hw_channel=0, channel_index=0, channel_mask=1, channel_capabilities=<XL_ChannelCapabilities.XL_CHANNEL_FLAG_TIME_SYNC_RUNNING|XL_CHANNEL_FLAG_CANFD_BOSCH_SUPPORT|XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT|6: 2684354567>, channel_bus_capabilities=<XL_BusCapabilities.XL_BUS_COMPATIBLE_CAN|XL_BUS_ACTIVE_CAP_CAN: 65537>, is_on_bus=False, connected_bus_type=<XL_BusTypes.XL_BUS_TYPE_NONE: 0>, bus_params=VectorBusParams(bus_type=<XL_BusTypes.XL_BUS_TYPE_CAN: 1>, can=VectorCanParams(bitrate=500000, sjw=1, tseg1=4, tseg2=3, sam=1, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>), canfd=VectorCanFdParams(bitrate=500000, data_bitrate=0, sjw_abr=1, tseg1_abr=4, tseg2_abr=3, sam_abr=1, sjw_dbr=0, tseg1_dbr=0, tseg2_dbr=0, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>)), serial_number=100, article_number=7000, transceiver_name=' Virtual CAN')}# {'interface': 'vector', 'channel': 1, 'serial': 100, 'channel_index': 1, 'hw_type': <XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, 'hw_index': 0, 'hw_channel': 1, 'supports_fd': True, 'vector_channel_config': VectorChannelConfig(name='Virtual Channel 2', hw_type=<XL_HardwareType.XL_HWTYPE_VIRTUAL: 1>, hw_index=0, hw_channel=1, channel_index=1, channel_mask=2, channel_capabilities=<XL_ChannelCapabilities.XL_CHANNEL_FLAG_TIME_SYNC_RUNNING|XL_CHANNEL_FLAG_CANFD_BOSCH_SUPPORT|XL_CHANNEL_FLAG_CANFD_ISO_SUPPORT|6: 2684354567>, channel_bus_capabilities=<XL_BusCapabilities.XL_BUS_COMPATIBLE_CAN|XL_BUS_ACTIVE_CAP_CAN: 65537>, is_on_bus=False, connected_bus_type=<XL_BusTypes.XL_BUS_TYPE_NONE: 0>, bus_params=VectorBusParams(bus_type=<XL_BusTypes.XL_BUS_TYPE_CAN: 1>, can=VectorCanParams(bitrate=500000, sjw=1, tseg1=4, tseg2=3, sam=1, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>), canfd=VectorCanFdParams(bitrate=500000, data_bitrate=0, sjw_abr=1, tseg1_abr=4, tseg2_abr=3, sam_abr=1, sjw_dbr=0, tseg1_dbr=0, tseg2_dbr=0, output_mode=<XL_OutputMode.XL_OUTPUT_MODE_NORMAL: 1>, can_op_mode=<XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20: 1>)), serial_number=100, article_number=7000, transceiver_name=' Virtual CAN')}# {'interface': 'virtual', 'channel': 'channel-2952'}# {'interface': 'pcan', 'channel': 'PCAN_USBBUS1', 'supports_fd': True, 'controller_number': 0, 'device_features': 3, 'device_id': 4294967295, 'device_name': 'PCAN-USB Pro FD', 'device_type': 5, 'channel_condition': 1}# {'interface': 'pcan', 'channel': 'PCAN_USBBUS2', 'supports_fd': True, 'controller_number': 1, 'device_features': 3, 'device_id': 4294967295, 'device_name': 'PCAN-USB Pro FD', 'device_type': 5, 'channel_condition': 1}- interfaces:需要扫描的硬件设备,比如传入‘pcan’。
- timeout:超时时间。
detect_available_configs()使用同步搜索的方式,即并行搜索传入的interface。返回的是一个[dict],可以当Bus类的参数使用,比如:
import can for info in can.interface.detect_available_configs(interfaces='pcan', timeout=5): bus = can.interface.Bus(interface=info['interface'], channel=info['channel'])print(bus)can.interface.Bus(**info)直接传入参数可能会造成使用通道错误,因为有些设备也可通过其他参数开启,比如vector的channel_index,pcan的device_id。
2.10 bridge
这里的bridge指的是将一个设备的报文转发到另一个设备上,当了解python-can连接设备和收发报文后,这个功能就不难实现了。python-can中的bridge.py文件中有一个示例,可以参考。
另外一个关键的用途是,可将vector CAN上位机(CANoe、CANape)收发的报文和其他硬件设备(比如PCAN)进行交互,降低了硬件费用带来的学习门槛。可参考这两个文章:
以下是一个示例:
import can bus_virtual = can.interface.Bus(interface='vector', channel_index=0, channel=0)# vector虚拟通道得使用channel_index进行识别 bus_real = can.ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1')# virtual的接收发送到real notifier_virtual = can.Notifier(bus_virtual,[bus_real.send])# real的接收发送到virtual notifier_real = can.Notifier(bus_real,[bus_virtual.send])whileTrue:pass在此基础上,增加读取virtual通道波特率参数、自动识别硬件设备等功能,再写个界面并加以完善细节,就可很方便的解决没有硬件设备的烦恼了 ^_^
总结
个人认为python-can是支持can最强的一个python库。虽然有些设备在库中不支持,但可在硬件设备的官网找到代码集成进去,或者自行开发也是很方便的。
以上仅记录的一些常用的内容,详细内容还请参考官方文档,或者阅读推荐的这篇博客python-can简述
== 欢迎批评指正和探讨交流 ^_^ ==