跳到主要内容 智能车竞赛惯导与视觉避障思路分享 | 极客日志
Python AI 算法
智能车竞赛惯导与视觉避障思路分享 分享了智能车竞赛中的关键技术思路,涵盖网络延迟优化、上位机可视化辅助、深度相机扫码策略、基于逆透视变换与 YOLO 的终点坐标校准、STM32 底层参数调整以及数据集自动化处理流程。重点解决了通信稳定性、视觉识别精度及路径规划准确性问题,提供了相关代码实现与配置参考。
DataScient 发布于 2026/4/5 更新于 2026/4/13 1 浏览概述
在智能车竞赛中,网络延迟、视觉识别精度及路径规划是核心挑战。本文分享了备赛过程中在网络优化、上位机辅助处理、二维码扫码、终点校准、STM32 底层修改及数据处理方面的技术思路。
网络问题
网络延迟是影响比赛表现的关键因素。建议优先使用有线连接上位机与小车,避免使用板载无线网卡。路由器信道选择至关重要,尽量避开拥堵信道(如区域赛前调试发现某些信道干扰较大)。若现场网络不稳定,可考虑本地部署大模型作为备用方案,但需注意性能限制。
图 1 路由器设备
上位机辅助处理
通过上位机脚本接收 YOLO 检测结果,利用 tkinter 绘制障碍物位置,帮助机师快速定位。同时添加按键控制任务切换和 API 调用。
import tkinter as tk
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
class LLM2Origincar ():
def __init__ (self, host, port ):
self .ros = None
self .host = host
self .port = port
self .roadblock_list = []
self .end_list = []
self .init_ros()
self .init_topic()
self .init_thread()
self .keep()
def init_topic (self ):
self .yolo_sub = Topic(self .ros, '/hobot_dnn_detection' , 'ai_msgs/msg/PerceptionTargets' , latch=True )
self .yolo_sub.subscribe(self .yolo_sub_callback)
( ):
.roadblock_list.clear()
.end_list.clear()
target msg[ ]:
target[ ] == :
rect = target[ ][ ][ ]
.roadblock_list.append({
: rect[ ],
: rect[ ],
: rect[ ] + rect[ ],
})
target[ ] == :
rect = target[ ][ ][ ]
.end_list.append({
: rect[ ],
: rect[ ],
: rect[ ],
: rect[ ] + rect[ ],
: target[ ][ ][ ],
})
( ):
:
:
canvas.delete( )
canvas.create_line( , , , , fill= , width= )
canvas.create_line( , , , , fill= , width= )
.roadblock_list:
obst .roadblock_list:
b = (obst[ ] * )
canvas.create_line(
(obst[ ] * ),
b,
((obst[ ] + obst[ ]) * ),
b,
fill= ,
width=
)
.end_list:
end .end_list:
x1 = (end[ ] * )
y1 = (end[ ] * )
x2 = ((end[ ] + end[ ]) * )
y2 = (end[ ] * )
canvas.create_line(x1, y2, x2, y2, fill= , width= )
canvas.create_text( ((x1+x2)/ ), (y1- ) (y1- ) > , text= . (end[ ]), fill= )
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
def
yolo_sub_callback
self, msg
self
self
for
in
'targets'
if
'type'
'roadblock'
'rois'
0
'rect'
self
'x'
'x_offset'
'w'
'width'
'b'
'y_offset'
'height'
elif
'type'
'end'
'rois'
0
'rect'
self
'x'
'x_offset'
'y'
'y_offset'
'w'
'width'
'b'
'y_offset'
'height'
'c'
'rois'
0
'confidence'
def
keep
self
try
while
True
"all"
141
0
141
680
"red"
1
689
0
689
680
"red"
1
if
self
for
in
self
int
'b'
1.42
int
'x'
1.41
int
'x'
'w'
1.41
"red"
2
if
self
for
in
self
int
'x'
1.41
int
'y'
1.41
int
'x'
'w'
1.41
int
'b'
1.42
"blue"
1
int
2
20
if
20
0
else
0
"conf:{:.2f}"
format
'c'
'cyan'
def keyboard_thread (self ):
while True :
sleep(0.05 )
if keyboard.is_pressed('b' ) or keyboard.is_pressed('B' ):
self .sign4return_pub.publish(self .sign4return_data)
sleep(0.5 )
if keyboard.is_pressed('r' ) or keyboard.is_pressed('R' ):
self .sign4return_data['data' ] = 5
self .sign4return_pub.publish(self .sign4return_data)
self .sign4return_data['data' ] = 0
sleep(0.5 )
if keyboard.is_pressed('p' ) or keyboard.is_pressed('P' ):
self .sign4return_data['data' ] = 6
self .sign4return_pub.publish(self .sign4return_data)
self .sign4return_data['data' ] = 0
sleep(0.5 )
if keyboard.is_pressed('j' ) or keyboard.is_pressed('J' ):
self .llm_data['data' ] = 1
self .llm_pub.publish(self .llm_data)
sleep(1 )
半场扫码 深度相机相比 USB 相机清晰度更高,适合扫码。为避免 CPU 占用过高,仅在特定任务状态且小车越过半场时开启扫码节点。
import rclpy
from rclpy.node import Node
import cv2
import numpy as np
from sensor_msgs.msg import Image
from std_msgs.msg import String, Int32
from nav_msgs.msg import Odometry
from origincar_msg.msg import Sign
from cv_bridge import CvBridge
TASK1 = 1
TASK2_WAITFOR_CMD = 2
TASK2 = 3
TASK3 = 4
TASK_STOP = 5
class QrCodeDetection (Node ):
def __init__ (self ):
super ().__init__('QRcodeSub' )
self .Sign4ReturnSub = self .create_subscription(Int32, 'sign4return' , self .sign4return_callback, 10 )
self .ImageSub = self .create_subscription(Image, '/aurora/rgb/image_raw' , self .image_callback, 10 )
self .OdomSub = self .create_subscription(Odometry, '/odom_combined' , self .Odom_callback, 10 )
self .qrcode_publisher = self .create_publisher(String, "/qrcode_information" , 10 )
self .info_result = String()
self .sign_publisher = self .create_publisher(Sign, '/sign_switch' , 10 )
self .sign_msg = Sign()
self .detector = cv2.wechat_qrcode_WeChatQRCode(
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.prototxt" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.caffemodel" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.prototxt" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.caffemodel"
)
self .bridge = CvBridge()
self .node_run = False
self .task = TASK1
def image_callback (self, msg ):
if self .node_run and (self .task == TASK1 or self .task == TASK2):
cv2_image = self .bridge.imgmsg_to_cv2(msg, desired_encoding='mono8' )[155 :,:]
res = self .detector.detectAndDecode(cv2_image)[0 ]
if res:
self .node_run = False
for r in res:
self .info_result.data = str (r)
self .qrcode_publisher.publish(self .info_result)
self .get_logger().info("\033[94m{}\033[0m" .format (self .info_result.data))
if self .info_result.data == "AntiClockWise" :
self .sign_msg.sign_data = 4
elif self .info_result.data == "ClockWise" :
self .sign_msg.sign_data = 3
else :
try :
data = int (r)
if data % 2 :
self .sign_msg.sign_data = 3
else :
self .sign_msg.sign_data = 4
except :
pass
self .sign_publisher.publish(self .sign_msg)
self .info_result.data = "None"
self .sign_msg.sign_data = 0
else :
return
def sign4return_callback (self, msg ):
if msg.data == 0 or msg.data == -1 :
self .task = TASK1
self .node_run = False
elif msg.data == 5 :
self .task = TASK2
elif msg.data == 6 :
self .task = TASK3
def Odom_callback (self, msg ):
if self .task == TASK1 and msg.pose.pose.position.x > 2 :
self .node_run = True
if __name__ == '__main__' :
rclpy.init(args=None )
qrCodeDetection = QrCodeDetection()
while rclpy.ok():
rclpy.spin(qrCodeDetection)
qrCodeDetection.destroy_node()
rclpy.shutdown()
为提高效率,对图像进行裁剪,去除无关区域后再送入扫码模型。
准确返回 P 点
思路 1——使用地图的固定元素来校准 重置里程计,将通道处设为原点。利用逆透视变换计算线的相对位置,结合已知终点坐标(如 1.9m, -1.5m)求解旋转矩阵 R 和平移向量 t,从而校准 P 点坐标。
def end_point (x1, y1, x2, y2, x3, y3, x1_, y1_, x2_, y2_ ):
delta_x = x1 - x2
delta_y = y1 - y2
delta_x_ = x1_ - x2_
delta_y_ = y1_ - y2_
den = delta_x ** 2 + delta_y ** 2
a = (delta_x * delta_x_ + delta_y * delta_y_) / den
b = (delta_x * delta_y_ - delta_y * delta_x_) / den
tx = x1_ - a * x1 + b * y1
ty = y1_ - b * x1 - a * y1
x3_ = a * x3 - b * y3 + tx
y3_ = b * x3 + a * y3 + ty
print (f"(x1, y1): ({x1, y1} ), (x2, y2): ({x2, y2} ), (x3, y3): ({x3, y3} ) delta x: {delta_x} , delta y: {delta_y} , den: {den} " )
return x3_, y3_
思路 2——不重置里程计,使用 YOLO 识别 P 点结果来校正终点 全程不重置里程计,直接冲出去。利用 YOLO 识别 P 点像素坐标,通过单应性矩阵 H 转换为全局坐标。
H = np.array([
[-4.66389128e-04 , -2.26288030e-04 , -4.92300831e-02 ],
[7.59821540e-04 , 5.20569143e-05 , -2.33074608e-01 ],
[-6.59643252e-04 , -7.15022786e-03 , 1.00000000e+00 ],
])
def pixel2global (self, pixel_x, pixel_y ):
pixel = np.array([pixel_x, pixel_y, 1 ], dtype=np.float32)
local = np.dot(H, pixel)
local /= local[2 ]
local[0 ] += 0.25
car_cos = np.cos(self .current_pos[2 ])
car_sin = np.sin(self .current_pos[2 ])
global_x = self .current_pos[0 ] + car_cos * local[0 ] - car_sin * local[1 ]
global_y = self .current_pos[1 ] + car_sin * local[0 ] + car_cos * local[1 ]
return global_x, global_y
需采集大量数据训练 YOLO 模型,包括遮挡、远距离等场景。
修改 STM32 源码 调整舵机转角多项式系数,使左右转角度对称。提高串口发送频率至 50Hz,波特率至 921600,关闭无用外设。同步修改 EKF 配置以匹配更新频率。
补充 采用模型预标注加人工复核的方式提高效率。编写脚本删除无效图片标签、进行数据增强及分批次打包数据集。
自动标注脚本 import argparse
import os
import shutil
import time
from pathlib import Path
import torch
import torch.backends.cudnn as cudnn
import cv2
from models.experimental import attempt_load
from utils.datasets import LoadImages
from utils.utils import non_max_suppression, scale_coords, xyxy2xywh
from utils.torch_utils import select_device, time_synchronized
def auto_annotate (source, weights, output, img_size=640 , conf_thres=0.25 , iou_thres=0.45 , view_img=False ):
device = select_device(device)
half = device.type != 'cpu'
model = attempt_load(weights, map_location=device)
imgsz = img_size
if half:
model.half()
names = model.module.names if hasattr (model, 'module' ) else model.names
dataset = LoadImages(source, img_size=imgsz)
t0 = time.time()
img = torch.zeros((1 , 3 , imgsz, imgsz), device=device)
_ = model(img.half() if half else img)
for path, img, im0s, _ in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float ()
img /= 255.0
if img.ndimension() == 3 :
img = img.unsqueeze(0 )
t1 = time_synchronized()
pred = model(img, augment=False )[0 ]
pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None , agnostic=False )
t2 = time_synchronized()
p, im0 = path, im0s.copy()
txt_path = str (Path(output) / Path(p).stem) + ('.txt' )
open (txt_path, 'w' ).close()
if pred is not None :
for i, det in enumerate (pred):
if det is not None and len (det):
det[:, :4 ] = scale_coords(img.shape[2 :], det[:, :4 ], im0.shape).round ()
with open (txt_path, 'w' ) as f:
if det is not None and len (det):
for *xyxy, conf, cls in reversed (det):
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1 , 4 )) / gn).view(-1 ).tolist()
line = "%d %.6f %.6f %.6f %.6f" % (cls, *xywh)
f.write(line + "\n" )
print (f'{Path(p).name} done. ({t2 - t1:.3 f} s)' )
if __name__ == '__main__' :
parser = argparse.ArgumentParser()
parser.add_argument('--source' , type =str , default='dataset_process/new1/images' , help ='输入图像文件夹路径' )
parser.add_argument('--weights' , type =str , default='runs/2025.7.28/weights/last.pt' , help ='模型权重路径' )
parser.add_argument('--output' , type =str , default='dataset_process/new1/labels' , help ='输出标签路径' )
parser.add_argument('--img-size' , type =int , default=640 , help ='推理尺寸 (像素)' )
parser.add_argument('--conf-thres' , type =float , default=0.25 , help ='目标置信度阈值' )
parser.add_argument('--iou-thres' , type =float , default=0.45 , help ='NMS 的 IOU 阈值' )
parser.add_argument('--device' , help ='cuda 设备,如 0 或 0,1,2,3 或 cpu' )
parser.add_argument('--view-img' , action='store_true' , help ='显示结果' )
opt = parser.parse_args()
print (opt)
with torch.no_grad():
auto_annotate(
source=opt.source,
weights=opt.weights,
output=opt.output,
img_size=opt.img_size,
conf_thres=opt.conf_thres,
iou_thres=opt.iou_thres,
device=opt.device,
view_img=opt.view_img
)
删除无效数据脚本 import os
from pathlib import Path
def remove_invalid_images_labels (image_dir, label_dir ):
deleted_images = 0
deleted_labels = 0
for image_file in os.listdir(image_dir):
if image_file.lower().endswith(('.jpg' , '.png' , '.jpeg' )):
image_path = os.path.join(image_dir, image_file)
label_path = os.path.join(label_dir, Path(image_file).stem + '.txt' )
if not os.path.exists(label_path):
os.remove(image_path)
deleted_images += 1
print (f"删除图片(无标签): {image_file} " )
else :
with open (label_path, 'r' ) as f:
content = f.read().strip()
if not content:
os.remove(image_path)
os.remove(label_path)
deleted_images += 1
deleted_labels += 1
print (f"删除无效数据:{image_file} 和对应标签" )
print (f"\n操作完成!共删除:{deleted_images} 张图片,{deleted_labels} 个标签" )
if __name__ == '__main__' :
image_dir = os.path.join(os.path.dirname(__file__), "new1/images/" )
label_dir = os.path.join(os.path.dirname(__file__), "new1/labels/" )
confirm = input ("是否继续?(y/n): " ).lower()
if confirm == 'y' :
remove_invalid_images_labels(image_dir, label_dir)
else :
print ("操作已取消" )
数据增强脚本 import torch
import torchvision.transforms as T
import torchvision.transforms.functional as TF
from pathlib import Path
import shutil
from PIL import Image
import random
from multiprocessing import Pool
import os
class YOLOAugment :
def __init__ (self, output_dir ):
self .output_dir = output_dir
Path(f"{output_dir} /images" ).mkdir(parents=True , exist_ok=True )
Path(f"{output_dir} /labels" ).mkdir(parents=True , exist_ok=True )
self .img_augment = T.Compose([
T.ColorJitter(brightness=0.3 , contrast=0.3 , saturation=0.2 ),
T.GaussianBlur(kernel_size=(3 , 7 ))
])
def apply_augment (self, img_path, label_path, aug_id ):
img = Image.open (img_path).convert('RGB' )
with open (label_path) as f:
bboxes = [list (map (float , line.strip().split())) for line in f]
img_tensor = TF.to_tensor(img)
bboxes_tensor = torch.tensor(bboxes)
img_tensor = self .img_augment(img_tensor)
stem = Path(img_path).stem
self ._save_results(img_tensor, bboxes_tensor, stem, aug_id)
return img, bboxes
def _save_results (self, img_tensor, bboxes, stem, aug_id ):
aug_img = TF.to_pil_image(img_tensor)
aug_img.save(f"{self.output_dir} /images/{stem} _aug{aug_id} .jpg" )
with open (f"{self.output_dir} /labels/{stem} _aug{aug_id} .txt" , 'w' ) as f:
for bbox in bboxes.numpy():
line = ' ' .join(map (str , bbox))
f.write(line + '\n' )
def process_file (args ):
img_path, label_path, output_dir, aug_per_image = args
augmenter = YOLOAugment(output_dir)
for i in range (1 , aug_per_image + 1 ):
augmenter.apply_augment(img_path, label_path, i)
shutil.copy(img_path, f"{output_dir} /images/{Path(img_path).name} " )
shutil.copy(label_path, f"{output_dir} /labels/{Path(label_path).name} " )
if __name__ == "__main__" :
root_path = os.path.dirname(__file__)
input_dir = os.path.join(root_path, "new1" )
output_dir = os.path.join(root_path, "new1_aug" )
aug_per_image = 3
num_workers = 4
tasks = []
for img_file in Path(f"{input_dir} /images" ).glob("*.*" ):
if img_file.suffix.lower() in ('.jpg' , '.png' , '.jpeg' ):
label_file = Path(f"{input_dir} /labels/{img_file.stem} .txt" )
if label_file.exists():
tasks.append((str (img_file), str (label_file), output_dir, aug_per_image))
print (f"开始增强 {len (tasks)} 张图像..." )
with Pool(processes=num_workers) as pool:
pool.map (process_file, tasks)
orig_count = len (tasks)
aug_count = orig_count * aug_per_image
print (f"处理完成!\n- 原始图像保留:{orig_count} 张\n- 增强图像生成:{aug_count} 张\n- 总数据量:{orig_count + aug_count} 张" )
数据集分包脚本 import os
import zipfile
import math
from pathlib import Path
def create_task_packs (images_dir, labels_dir, output_dir, tasks=3 , label_txt=False ):
image_files = sorted ([f for f in os.listdir(images_dir) if f.endswith(('.jpg' , '.png' ))])
label_files = sorted ([f for f in os.listdir(labels_dir) if f.endswith('.txt' )])
image_stems = {Path(f).stem for f in image_files}
label_stems = {Path(f).stem for f in label_files}
unmatched = image_stems.symmetric_difference(label_stems)
if unmatched:
print (f"⚠️ 警告:发现 {len (unmatched)} 个不匹配文件(示例:{list (unmatched)[:3 ]} )" )
print ("建议先运行数据校验脚本修复不一致问题!" )
return
total_pairs = len (image_files)
pairs_per_task = math.ceil(total_pairs / tasks)
print (f"数据集统计:" )
print (f"- 图片数量:{len (image_files)} " )
print (f"- 标注数量:{len (label_files)} " )
print (f"- 将分成 {tasks} 个任务包,每个约 {pairs_per_task} 对数据\n" )
os.makedirs(output_dir, exist_ok=True )
for task_num in range (1 , tasks + 1 ):
start_idx = (task_num - 1 ) * pairs_per_task
end_idx = min (start_idx + pairs_per_task, total_pairs)
task_images = image_files[start_idx:end_idx]
task_labels = [Path(f).stem + '.txt' for f in task_images]
zip_path = os.path.join(output_dir, f"task_{task_num} .zip" )
print (f"创建任务包 {task_num} :" )
print (f"- 包含图片:{len (task_images)} 张" )
print (f"- 包含标注:{len (task_labels)} 个" )
print (f"- 保存到:{zip_path} " )
with zipfile.ZipFile(zip_path, 'w' , zipfile.ZIP_DEFLATED) as zipf:
for img in task_images:
img_path = os.path.join(images_dir, img)
zipf.write(img_path, f"images/{img} " )
for label in task_labels:
label_path = os.path.join(labels_dir, label)
if os.path.exists(label_path):
zipf.write(label_path, f"labels/{label} " )
else :
print (f"⚠️ 缺失标注文件:{label} " )
if label_txt is not False :
label_info = Path(label_txt).open ("r" ).read()
zipf.writestr(f"labels/labels.txt" , label_info)
print ("-" * 50 )
print (f"\n🎉 任务包创建完成!共生成 {tasks} 个压缩包,保存在:{output_dir} " )
if __name__ == "__main__" :
root_path = os.path.dirname(__file__)
dataset_dir = os.path.join(root_path, "new1" )
output_dir = os.path.join(root_path, "package" )
label_txt = os.path.join(root_path, "labels.txt" )
num_tasks = 4
create_task_packs(
images_dir=os.path.join(dataset_dir, "images" ),
labels_dir=os.path.join(dataset_dir, "labels" ),
output_dir=output_dir,
tasks=num_tasks,
label_txt=label_txt,
)
后记