跳到主要内容
智慧医疗机器人竞赛惯导与视觉避障实践思路 | 极客日志
Python AI 算法
智慧医疗机器人竞赛惯导与视觉避障实践思路 智慧医疗机器人竞赛涉及惯导与视觉避障技术。内容包括网络延迟优化、上位机辅助显示、深度相机扫码、基于逆透视变换的终点校准算法、STM32 舵机控制参数调整及 EKF 配置。配套提供数据自动标注、清洗、增强及分发脚本,助力提升系统性能与比赛表现。
beaabea 发布于 2026/4/7 更新于 2026/5/22 12 浏览前言
在智慧医疗机器人创意赛中,作为技术主力分享备赛过程中的一些思路。为了保持比赛竞技性,部分核心算法细节不公开,但整体架构和关键问题解决方案可供参考。
本文涵盖网络延迟优化、上位机辅助处理、半场扫码策略、P 点准确返回方案、STM32 源码修改及数据处理脚本等内容。
网络问题
参赛初期常遇到上位机延迟问题。第二年备赛时重视网络环境配置,使用高性能路由器并固定信道(如 165 信道)以减少干扰。调试阶段建议避免使用电脑板载无线网卡连接,优先采用有线连接以降低延迟。
若现场网络不稳定导致云端 API 调用失败,可部署本地大模型作为备用方案,虽然效果略逊于云端,但在断网情况下能保证基本功能运行。
图 1 华为 BE7 Pro 路由器
图 2 华为 BE7 Pro 路由器
上位机辅助处理
在上位机桥接客户端中,通过 Python tkinter 库接收 YOLO 检测结果并绘制障碍物位置,帮助机师快速定位。同时添加按键控制任务切换和 API 调用。
import tkinter as tk
root = tk.Tk()
root.overrideredirect(True )
root.geometry("900x680+192+215" )
root.attributes("-topmost" , True )
root.attributes("-transparentcolor" , "white" )
canvas = tk.Canvas(root, width=900 , height=680 , bg="white" , highlightthickness=0 )
canvas.pack()
class LLM2Origincar ():
def __init__ (self, host, port ):
self .ros = None
self .host = host
self .port = port
self .roadblock_list = []
self .end_list = []
self .init_ros()
self .init_topic()
.init_thread()
.keep()
( ):
.yolo_sub = Topic( .ros, , , latch= )
.yolo_sub.subscribe( .yolo_sub_callback)
( ):
.roadblock_list.clear()
.end_list.clear()
target msg[ ]:
target[ ] == :
rect = target[ ][ ][ ]
.roadblock_list.append({
: rect[ ],
: rect[ ],
: rect[ ] + rect[ ],
})
target[ ] == :
rect = target[ ][ ][ ]
.end_list.append({
: rect[ ],
: rect[ ],
: rect[ ],
: rect[ ] + rect[ ],
: target[ ][ ][ ],
})
( ):
:
:
canvas.delete( )
canvas.create_line( , , , , fill= , width= )
canvas.create_line( , , , , fill= , width= )
.roadblock_list:
obst .roadblock_list:
b = (obst[ ] * )
canvas.create_line(
(obst[ ] * ),
b,
((obst[ ] + obst[ ]) * ),
b,
fill= ,
width=
)
.end_list:
end .end_list:
x1 = (end[ ] * )
y1 = (end[ ] * )
x2 = ((end[ ] + end[ ]) * )
y2 = (end[ ] * )
canvas.create_line(x1, y2, x2, y2, fill= , width= )
canvas.create_text( ((x1+x2)/ ), (y1- ) (y1- ) > , text= . (end[ ]), fill= )
self
self
def
init_topic
self
self
self
'/hobot_dnn_detection'
'ai_msgs/msg/PerceptionTargets'
True
self
self
def
yolo_sub_callback
self, msg
self
self
for
in
'targets'
if
'type'
'roadblock'
'rois'
0
'rect'
self
'x'
'x_offset'
'w'
'width'
'b'
'y_offset'
'height'
elif
'type'
'end'
'rois'
0
'rect'
self
'x'
'x_offset'
'y'
'y_offset'
'w'
'width'
'b'
'y_offset'
'height'
'c'
'rois'
0
'confidence'
def
keep
self
try
while
True
"all"
141
0
141
680
"red"
1
689
0
689
680
"red"
1
if
self
for
in
self
int
'b'
1.42
int
'x'
1.41
int
'x'
'w'
1.41
"red"
2
if
self
for
in
self
int
'x'
1.41
int
'y'
1.41
int
'x'
'w'
1.41
int
'b'
1.42
"blue"
1
int
2
20
if
20
0
else
0
"conf:{:.2f}"
format
'c'
'cyan'
此外,通过键盘按键触发特定信号,实现任务状态切换和遥操作控制。
def keyboard_thread (self ):
while True :
sleep(0.05 )
if keyboard.is_pressed('b' ) or keyboard.is_pressed('B' ):
self .sign4return_pub.publish(self .sign4return_data)
sleep(0.5 )
if keyboard.is_pressed('r' ) or keyboard.is_pressed('R' ):
self .sign4return_data['data' ] = 5
self .sign4return_pub.publish(self .sign4return_data)
self .sign4return_data['data' ] = 0
sleep(0.5 )
if keyboard.is_pressed('p' ) or keyboard.is_pressed('P' ):
self .sign4return_data['data' ] = 6
self .sign4return_pub.publish(self .sign4return_data)
self .sign4return_data['data' ] = 0
sleep(0.5 )
if keyboard.is_pressed('j' ) or keyboard.is_pressed('J' ):
self .llm_data['data' ] = 1
self .llm_pub.publish(self .llm_data)
sleep(1 )
半场扫码 深度相机相比 USB 相机在清晰度上有显著优势,适合用于二维码识别。为避免 CPU 占用过高,扫码节点仅在特定条件下启动:任务状态为任务一且小车全局坐标 x 超过 2m。
图 3 深度相机扫码
图 4 USB 相机扫码
为提高效率,对图像进行裁剪,去除无关区域后再送入扫码模型。
图 5 裁掉一部分图片
import rclpy
from rclpy.node import Node
import cv2
import numpy as np
from sensor_msgs.msg import Image
from std_msgs.msg import String, Int32
from nav_msgs.msg import Odometry
from origincar_msg.msg import Sign
from cv_bridge import CvBridge
TASK1 = 1
TASK2_WAITFOR_CMD = 2
TASK2 = 3
TASK3 = 4
TASK_STOP = 5
class QrCodeDetection (Node ):
def __init__ (self ):
super ().__init__('QRcodeSub' )
self .Sign4ReturnSub = self .create_subscription(Int32, 'sign4return' , self .sign4return_callback, 10 )
self .ImageSub = self .create_subscription(Image, '/aurora/rgb/image_raw' , self .image_callback, 10 )
self .OdomSub = self .create_subscription(Odometry, '/odom_combined' , self .Odom_callback, 10 )
self .qrcode_publisher = self .create_publisher(String, "/qrcode_information" , 10 )
self .info_result = String()
self .sign_publisher = self .create_publisher(Sign, '/sign_switch' , 10 )
self .sign_msg = Sign()
self .detector = cv2.wechat_qrcode_WeChatQRCode(
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.prototxt" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.caffemodel" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.prototxt" ,
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.caffemodel"
)
self .bridge = CvBridge()
self .node_run = False
self .task = TASK1
def image_callback (self, msg ):
if self .node_run and (self .task == TASK1 or self .task == TASK2):
cv2_image = self .bridge.imgmsg_to_cv2(msg, desired_encoding='mono8' )[155 :,:]
res = self .detector.detectAndDecode(cv2_image)[0 ]
if res:
self .node_run = False
for r in res:
self .info_result.data = str (r)
self .qrcode_publisher.publish(self .info_result)
self .get_logger().info("\033[94m{}\033[0m" .format (self .info_result.data))
if self .info_result.data == "AntiClockWise" :
self .sign_msg.sign_data = 4
elif self .info_result.data == "ClockWise" :
self .sign_msg.sign_data = 3
else :
try :
data = int (r)
if data % 2 :
self .sign_msg.sign_data = 3
else :
self .sign_msg.sign_data = 4
except : pass
self .sign_publisher.publish(self .sign_msg)
self .info_result.data = "None"
self .sign_msg.sign_data = 0
else :
return
def sign4return_callback (self, msg ):
if msg.data == 0 or msg.data == -1 :
self .task = TASK1
self .node_run = False
elif msg.data == 5 :
self .task = TASK2
elif msg.data == 6 :
self .task = TASK3
def Odom_callback (self, msg ):
if self .task == TASK1 and msg.pose.pose.position.x > 2 :
self .node_run = True
if __name__ == '__main__' :
rclpy.init(args=None )
qrCodeDetection = QrCodeDetection()
while rclpy.ok():
rclpy.spin(qrCodeDetection)
qrCodeDetection.destroy_node()
rclpy.shutdown()
准确返回 P 点
思路 1——使用地图的固定元素来校准 每次重置里程计,将通道出口设为原点。利用地图中固定的线作为参照,通过逆透视变换计算相对坐标,进而推算终点 P 的全局坐标。
图 6 固定小橙的位置,终点是 (1.9m, -1.5m)
图 7 大家找找地图固定元素
已知 A, B, P 三点坐标,以及新视角下 A', B' 两点坐标,求解旋转矩阵 R 和平移向量 t,从而计算 P'。
公式推导如下:
A′ = RA + t
B′ = RB + t
ΔA′B′ = RΔAB
R = ΔA′B′ ΔAB^(-1)
t = A′ − RA
P′ = RP + t
实际二维平面操作更简单,利用两根线的坐标去校准 P 点的坐标。
def end_point (x1, y1, x2, y2, x3, y3, x1_, y1_, x2_, y2_ ):
delta_x = x1 - x2
delta_y = y1 - y2
delta_x_ = x1_ - x2_
delta_y_ = y1_ - y2_
den = delta_x ** 2 + delta_y ** 2
a = (delta_x * delta_x_ + delta_y * delta_y_) / den
b = (delta_x * delta_y_ - delta_y * delta_x_) / den
tx = x1_ - a * x1 + b * y1
ty = y1_ - b * x1 - a * y1
x3_ = a * x3 - b * y3 + tx
y3_ = b * x3 + a * y3 + ty
print (f"(x1, y1): ({x1, y1} ), (x2, y2): ({x2, y2} ), (x3, y3): ({x3, y3} ) delta x: {delta_x} , delta y: {delta_y} , den: {den} " )
return x3_, y3_
print (f"end': {end_point(ptx1, pty1, ptx2, pty2, 1.9 , -1.5 , ptx3, pty3, ptx4, pty4)} " )
思路 2——不重置里程计,使用 YOLO 识别 P 点结果来校正终点 全程不重置里程计,直接冲出去,利用 YOLO 识别 P 点,结合逆透视变换计算全局坐标。
H = np.array([
[-4.66389128e-04 , -2.26288030e-04 , -4.92300831e-02 ],
[7.59821540e-04 , 5.20569143e-05 , -2.33074608e-01 ],
[-6.59643252e-04 , -7.15022786e-03 , 1.00000000e+00 ],
])
def pixel2global (self, pixel_x, pixel_y ):
pixel = np.array([pixel_x, pixel_y, 1 ], dtype=np.float32)
local = np.dot(H, pixel)
local /= local[2 ]
local[0 ] += 0.25
car_cos = np.cos(self .current_pos[2 ])
car_sin = np.sin(self .current_pos[2 ])
global_x = self .current_pos[0 ] + car_cos * local[0 ] - car_sin * local[1 ]
global_y = self .current_pos[1 ] + car_sin * local[0 ] + car_cos * local[1 ]
return global_x, global_y
需采集大量数据训练 YOLO 模型,包括遮挡、远距离等场景,以提高识别鲁棒性。
修改 STM32 源码 调整舵机转角多项式系数,使左右转舵机量对称。提高串口发送频率至 50Hz,波特率设为 921600,关闭非必要外设。
修改 EKF 配置以匹配 50Hz 频率,删除电位器决定车型号逻辑,固定车型号为 Ackerman。
补充 提供数据自动标注、清洗、增强及分发脚本,提升数据处理效率。
自动标注脚本 import argparse
import os
import shutil
import time
from pathlib import Path
import torch
import torch.backends.cudnn as cudnn
import cv2
from models.experimental import attempt_load
from utils.datasets import LoadImages
from utils.utils import non_max_suppression, scale_coords, xyxy2xywh
from utils.torch_utils import select_device, time_synchronized
def auto_annotate (source, weights, output, img_size=640 , conf_thres=0.25 , iou_thres=0.45 , view_img=False ):
device = select_device(device)
half = device.type != 'cpu'
model = attempt_load(weights, map_location=device)
imgsz = img_size
if half:
model.half()
names = model.module.names if hasattr (model, 'module' ) else model.names
dataset = LoadImages(source, img_size=imgsz)
t0 = time.time()
img = torch.zeros((1 , 3 , imgsz, imgsz), device=device)
_ = model(img.half() if half else img) if device.type != 'cpu' else None
for path, img, im0s, _ in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float ()
img /= 255.0
if img.ndimension() == 3 :
img = img.unsqueeze(0 )
t1 = time_synchronized()
pred = model(img, augment=False )[0 ]
pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None , agnostic=False )
t2 = time_synchronized()
p, im0 = path, im0s.copy()
txt_path = str (Path(output) / Path(p).stem) + ('.txt' )
open (txt_path, 'w' ).close()
if pred is not None :
for i, det in enumerate (pred):
if det is not None and len (det):
det[:, :4 ] = scale_coords(img.shape[2 :], det[:, :4 ], im0.shape).round ()
with open (txt_path, 'w' ) as f:
if det is not None and len (det):
for *xyxy, conf, cls in reversed (det):
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1 , 4 )) / gn).view(-1 ).tolist()
line = "%d %.6f %.6f %.6f %.6f" % (cls, *xywh)
f.write(line + "\n" )
print (f'{Path(p).name} done. ({t2 - t1:.3 f} s)' )
if view_img:
cv2.imshow(Path(p).name, im0)
if cv2.waitKey(1 ) == ord ('q' ):
raise StopIteration
print (f'Done. ({time.time() - t0:.3 f} s)' )
if __name__ == '__main__' :
parser = argparse.ArgumentParser()
parser.add_argument('--source' , type =str , default='dataset_process/new1/images' , help ='输入图像文件夹路径' )
parser.add_argument('--weights' , type =str , default='runs/2025.7.28/weights/last.pt' , help ='模型权重路径' )
parser.add_argument('--output' , type =str , default='dataset_process/new1/labels' , help ='输出标签路径' )
parser.add_argument('--img-size' , type =int , default=640 , help ='推理尺寸 (像素)' )
parser.add_argument('--conf-thres' , type =float , default=0.25 , help ='目标置信度阈值' )
parser.add_argument('--iou-thres' , type =float , default=0.45 , help ='NMS 的 IOU 阈值' )
parser.add_argument('--device' , help ='cuda 设备,如 0 或 0,1,2,3 或 cpu' )
parser.add_argument('--view-img' , action='store_true' , help ='显示结果' )
opt = parser.parse_args()
print (opt)
with torch.no_grad():
auto_annotate(
source=opt.source,
weights=opt.weights,
output=opt.output,
img_size=opt.img_size,
conf_thres=opt.conf_thres,
iou_thres=opt.iou_thres,
device=opt.device,
view_img=opt.view_img
)
删除无效数据脚本 import os
from pathlib import Path
def remove_invalid_images_labels (image_dir, label_dir ):
deleted_images = 0
deleted_labels = 0
for image_file in os.listdir(image_dir):
if image_file.lower().endswith(('.jpg' , '.png' , '.jpeg' )):
image_path = os.path.join(image_dir, image_file)
label_path = os.path.join(label_dir, Path(image_file).stem + '.txt' )
if not os.path.exists(label_path):
os.remove(image_path)
deleted_images += 1
print (f"删除图片(无标签): {image_file} " )
else :
with open (label_path, 'r' ) as f:
content = f.read().strip()
if not content:
os.remove(image_path)
os.remove(label_path)
deleted_images += 1
deleted_labels += 1
print (f"删除无效数据:{image_file} 和对应标签" )
print (f"\n操作完成!共删除:{deleted_images} 张图片,{deleted_labels} 个标签" )
if __name__ == '__main__' :
image_dir = os.path.join(os.path.dirname(__file__), "new1/images/" )
label_dir = os.path.join(os.path.dirname(__file__), "new1/labels/" )
confirm = input ("是否继续?(y/n): " ).lower()
if confirm == 'y' :
remove_invalid_images_labels(image_dir, label_dir)
else :
print ("操作已取消" )
数据增强脚本 import torch
import torchvision.transforms as T
import torchvision.transforms.functional as TF
from pathlib import Path
import shutil
from PIL import Image
import random
from multiprocessing import Pool
import os
class YOLOAugment :
def __init__ (self, output_dir ):
self .output_dir = output_dir
Path(f"{output_dir} /images" ).mkdir(parents=True , exist_ok=True )
Path(f"{output_dir} /labels" ).mkdir(parents=True , exist_ok=True )
self .img_augment = T.Compose([
T.ColorJitter(brightness=0.3 , contrast=0.3 , saturation=0.2 ),
T.GaussianBlur(kernel_size=(3 , 7 ))
])
def apply_augment (self, img_path, label_path, aug_id ):
img = Image.open (img_path).convert('RGB' )
with open (label_path) as f:
bboxes = [list (map (float , line.strip().split())) for line in f]
img_tensor = TF.to_tensor(img)
bboxes_tensor = torch.tensor(bboxes)
img_tensor = self .img_augment(img_tensor)
stem = Path(img_path).stem
self ._save_results(img_tensor, bboxes_tensor, stem, aug_id)
return img, bboxes
def _save_results (self, img_tensor, bboxes, stem, aug_id ):
aug_img = TF.to_pil_image(img_tensor)
aug_img.save(f"{self.output_dir} /images/{stem} _aug{aug_id} .jpg" )
with open (f"{self.output_dir} /labels/{stem} _aug{aug_id} .txt" , 'w' ) as f:
for bbox in bboxes.numpy():
line = ' ' .join(map (str , bbox))
f.write(line + '\n' )
def process_file (args ):
img_path, label_path, output_dir, aug_per_image = args
augmenter = YOLOAugment(output_dir)
for i in range (1 , aug_per_image + 1 ):
augmenter.apply_augment(img_path, label_path, i)
shutil.copy(img_path, f"{output_dir} /images/{Path(img_path).name} " )
shutil.copy(label_path, f"{output_dir} /labels/{Path(label_path).name} " )
if __name__ == "__main__" :
root_path = os.path.dirname(__file__)
input_dir = os.path.join(root_path, "new1" )
output_dir = os.path.join(root_path, "new1_aug" )
aug_per_image = 3
num_workers = 4
tasks = []
for img_file in Path(f"{input_dir} /images" ).glob("*.*" ):
if img_file.suffix.lower() in ('.jpg' , '.png' , '.jpeg' ):
label_file = Path(f"{input_dir} /labels/{img_file.stem} .txt" )
if label_file.exists():
tasks.append((str (img_file), str (label_file), output_dir, aug_per_image))
print (f"开始增强 {len (tasks)} 张图像..." )
with Pool(processes=num_workers) as pool:
pool.map (process_file, tasks)
orig_count = len (tasks)
aug_count = orig_count * aug_per_image
print (f"处理完成!\n- 原始图像保留:{orig_count} 张\n- 增强图像生成:{aug_count} 张\n- 总数据量:{orig_count + aug_count} 张" )
数据集分发包脚本 import os
import zipfile
import math
from pathlib import Path
def create_task_packs (images_dir, labels_dir, output_dir, tasks=3 , label_txt=False ):
image_files = sorted ([f for f in os.listdir(images_dir) if f.endswith(('.jpg' , '.png' ))])
label_files = sorted ([f for f in os.listdir(labels_dir) if f.endswith('.txt' )])
image_stems = {Path(f).stem for f in image_files}
label_stems = {Path(f).stem for f in label_files}
unmatched = image_stems.symmetric_difference(label_stems)
if unmatched:
print (f"⚠️ 警告:发现 {len (unmatched)} 个不匹配文件(示例:{list (unmatched)[:3 ]} )" )
print ("建议先运行数据校验脚本修复不一致问题!" )
return
total_pairs = len (image_files)
pairs_per_task = math.ceil(total_pairs / tasks)
print (f"数据集统计:" )
print (f"- 图片数量:{len (image_files)} " )
print (f"- 标注数量:{len (label_files)} " )
print (f"- 将分成 {tasks} 个任务包,每个约 {pairs_per_task} 对数据\n" )
os.makedirs(output_dir, exist_ok=True )
for task_num in range (1 , tasks + 1 ):
start_idx = (task_num - 1 ) * pairs_per_task
end_idx = min (start_idx + pairs_per_task, total_pairs)
task_images = image_files[start_idx:end_idx]
task_labels = [Path(f).stem + '.txt' for f in task_images]
zip_path = os.path.join(output_dir, f"task_{task_num} .zip" )
print (f"创建任务包 {task_num} :" )
print (f"- 包含图片:{len (task_images)} 张" )
print (f"- 包含标注:{len (task_labels)} 个" )
print (f"- 保存到:{zip_path} " )
with zipfile.ZipFile(zip_path, 'w' , zipfile.ZIP_DEFLATED) as zipf:
for img in task_images:
img_path = os.path.join(images_dir, img)
zipf.write(img_path, f"images/{img} " )
for label in task_labels:
label_path = os.path.join(labels_dir, label)
if os.path.exists(label_path):
zipf.write(label_path, f"labels/{label} " )
else :
print (f"⚠️ 缺失标注文件:{label} " )
print ("-" * 50 )
print (f"\n🎉 任务包创建完成!共生成 {tasks} 个压缩包,保存在:{output_dir} " )
if __name__ == "__main__" :
root_path = os.path.dirname(__file__)
dataset_dir = os.path.join(root_path, "new1" )
output_dir = os.path.join(root_path, "package" )
label_txt = os.path.join(root_path, "labels.txt" )
num_tasks = 4
create_task_packs(
images_dir=os.path.join(dataset_dir, "images" ),
labels_dir=os.path.join(dataset_dir, "labels" ),
output_dir=output_dir,
tasks=num_tasks,
label_txt=label_txt
)
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online