ROS 2从入门到精通系列(十六):自主导航机器人 - 系统架构与SLAM
ROS 2从入门到精通系列(十六):自主导航机器人 - 系统架构与SLAM
构建完整的自主导航系统,从建图到导航的端到端实现。
引言
自主导航是机器人最经典的应用之一。它涉及:
- 感知:LIDAR扫描、里程计
- 建图:SLAM建立环境地图
- 规划:生成无碰撞路径
- 控制:执行运动命令
本篇将从0到1构建一个完整的导航系统。
一、自主导航系统架构
1.1 完整的系统架构
硬件层
控制模块
运动控制
PID Control
安全监督
Emergency Stop
规划模块
全局规划
Dijkstra/A*
局部规划
DWA/TEB
可行性检查
Feasibility Check
感知模块
扫描匹配
Scan Matching
里程计
Odometry
地图维护
Map Maintenance
激光雷达
RPLidar/Velodyne
轮式编码器
里程计
驱动电机
电机控制器
1.2 数据流和时间轴
实时控制循环 (10-100Hz):
传感器采样 (1ms)
数据融合 (10ms)
位姿估计 (5ms)
地图更新 (15ms)
路径规划 (20ms)
轨迹生成 (5ms)
电机控制 (1ms)
总耗时: ~60ms (16Hz)
二、SLAM(同步定位与建图)
2.1 SLAM的数学模型
状态表示:
x t = { p t , θ t , m } x_t = \{p_t, \theta_t, m\} xt={pt,θt,m}
其中:
- p t = ( x t , y t ) p_t = (x_t, y_t) pt=(xt,yt) - 机器人位置
- θ t \theta_t θt - 机器人方向
- m m m - 环境地图
贝叶斯滤波模型:
p ( x t , m ∣ z 1 : t , u 1 : t ) ∝ p ( z t ∣ x t , m ) ⋅ p ( x t ∣ x t − 1 , u t ) ⋅ p ( x t − 1 , m ∣ z 1 : t − 1 , u 1 : t − 1 ) p(x_t, m | z_{1:t}, u_{1:t}) \propto p(z_t | x_t, m) \cdot p(x_t | x_{t-1}, u_t) \cdot p(x_{t-1}, m | z_{1:t-1}, u_{1:t-1}) p(xt,m∣z1:t,u1:t)∝p(zt∣xt,m)⋅p(xt∣xt−1,ut)⋅p(xt−1,m∣z1:t−1,u1:t−1)
2.2 安装SLAM包
# 安装Cartographer SLAMsudoaptinstall ros-humble-cartographer sudoaptinstall ros-humble-cartographer-ros # 或安装SLAM Toolboxsudoaptinstall ros-humble-slam-toolbox # 或安装hector_slamsudoaptinstall ros-humble-hector-slam 2.3 Cartographer SLAM配置
创建 cartographer_robot.launch.py:
from launch import LaunchDescription from launch_ros.actions import Node from launch.actions import IncludeLaunchDescription from launch.launch_description_sources import PythonLaunchDescriptionSource import os from ament_index_python.packages import get_package_share_directory defgenerate_launch_description(): cartographer_dir = get_package_share_directory('cartographer_ros')return LaunchDescription([# LIDAR驱动 Node( package='rplidar_ros', executable='rplidar_composition', name='lidar_driver', parameters=[{'serial_port':'/dev/ttyUSB0'},{'serial_baudrate':115200},{'frame_id':'laser'},{'angle_compensate':True},]),# SLAM节点 Node( package='cartographer_ros', executable='cartographer_node', name='cartographer_node', output='screen', parameters=[{'use_sim_time':False},], arguments=['-configuration_directory', os.path.join(cartographer_dir,'configuration_files'),'-configuration_basename','backpack_2d.lua'], remappings=[('/scan','/scan'),('/imu','/imu/data'),]),# 后端优化 Node( package='cartographer_ros', executable='cartographer_occupancy_grid_node', name='cartographer_occupancy_grid_node', output='screen', parameters=[{'use_sim_time':False},]),# 变换广播 Node( package='tf2_ros', executable='static_transform_publisher', arguments=['0','0','0.1','0','0','0','base_link','laser']),])2.4 SLAM配置文件 (Lua)
创建 backpack_2d.lua:
-- Cartographer 2D SLAM配置 include "map_builder.lua" include "trajectory_builder.lua" options ={ map_builder = MAP_BUILDER, trajectory_builder = TRAJECTORY_BUILDER, map_frame ="map", tracking_frame ="base_link", published_frame ="base_link", odom_frame ="odom", provide_odom_frame =true, publish_frame_projected_to_2d =false, use_odometry =true, use_nav_sat =false, use_landmarks =false, num_laser_scans =1, num_multi_echo_laser_scans =0, num_subdivisions_per_laser_scan =1, num_point_clouds =0, lookup_transform_timeout_sec =0.2, submap_publish_period_sec =0.3, pose_publish_period_sec =5e-2, trajectory_publish_period_sec =30e-2, rangefinder_sampling_ratio =1., odometry_sampling_ratio =1., fixed_frame_pose_sampling_ratio =1., imu_sampling_ratio =1., landmarks_sampling_ratio =1.,} MAP_BUILDER.use_trajectory_builder_2d =true TRAJECTORY_BUILDER_2D.use_imu_data =false TRAJECTORY_BUILDER_2D.motion_filter.max_time_seconds =5.0return options 三、里程计和位姿估计
3.1 实现里程计节点
#!/usr/bin/env python3""" 轮式里程计节点 - 计算位置和速度 """import rclpy from rclpy.node import Node from nav_msgs.msg import Odometry from geometry_msgs.msg import TransformStamped, Pose, Twist from tf2_ros import TransformBroadcaster from std_msgs.msg import Float64 import math import numpy as np classOdometryNode(Node):def__init__(self):super().__init__('odometry_node')# 机器人参数 self.declare_parameter('wheel_radius',0.033) self.declare_parameter('wheel_separation',0.16) self.declare_parameter('encoder_resolution',360) self.wheel_radius = self.get_parameter('wheel_radius').value self.wheel_separation = self.get_parameter('wheel_separation').value self.encoder_resolution = self.get_parameter('encoder_resolution').value # 状态 self.x =0.0 self.y =0.0 self.theta =0.0 self.prev_left_count =0 self.prev_right_count =0 self.prev_time = self.get_clock().now()# 发布者 self.odom_pub = self.create_publisher(Odometry,'/odom',10) self.tf_broadcaster = TransformBroadcaster(self)# 订阅编码器 self.left_sub = self.create_subscription( Float64,'/encoder/left', self.left_callback,10) self.right_sub = self.create_subscription( Float64,'/encoder/right', self.right_callback,10)defleft_callback(self, msg):"""左轮编码器回调""" self.left_count = msg.data self.update_odometry()defright_callback(self, msg):"""右轮编码器回调""" self.right_count = msg.data self.update_odometry()defupdate_odometry(self):"""更新里程计""" current_time = self.get_clock().now() dt =(current_time - self.prev_time).nanoseconds /1e9if dt <0.001:# 防止除以零return# 计算轮子转动距离 delta_left =(self.left_count - self.prev_left_count)* \ (2* math.pi * self.wheel_radius / self.encoder_resolution) delta_right =(self.right_count - self.prev_right_count)* \ (2* math.pi * self.wheel_radius / self.encoder_resolution)# 差动驱动运动学 delta_s =(delta_left + delta_right)/2.0# 中心移动距离 delta_theta =(delta_right - delta_left)/ self.wheel_separation # 转动角# 更新位置ifabs(delta_theta)<1e-6: self.x += delta_s * math.cos(self.theta) self.y += delta_s * math.sin(self.theta)else: r = delta_s / delta_theta self.x += r *(math.sin(self.theta + delta_theta)- math.sin(self.theta)) self.y += r *(math.cos(self.theta)- math.cos(self.theta + delta_theta)) self.theta += delta_theta # 计算速度 linear_vel = delta_s / dt if dt >0else0.0 angular_vel = delta_theta / dt if dt >0else0.0# 发布里程计 self.publish_odometry(linear_vel, angular_vel, current_time)# 发布变换 self.publish_transform(current_time)# 更新记录 self.prev_left_count = self.left_count self.prev_right_count = self.right_count self.prev_time = current_time defpublish_odometry(self, linear_vel, angular_vel, current_time):"""发布里程计消息""" odom = Odometry() odom.header.stamp = current_time.to_msg() odom.header.frame_id ='odom' odom.child_frame_id ='base_link'# 位置 odom.pose.pose.position.x = self.x odom.pose.pose.position.y = self.y odom.pose.pose.position.z =0.0# 姿态(四元数) qx, qy, qz, qw = self.euler_to_quaternion(0,0, self.theta) odom.pose.pose.orientation.x = qx odom.pose.pose.orientation.y = qy odom.pose.pose.orientation.z = qz odom.pose.pose.orientation.w = qw # 速度 odom.twist.twist.linear.x = linear_vel odom.twist.twist.angular.z = angular_vel self.odom_pub.publish(odom)defpublish_transform(self, current_time):"""发布odom到base_link的变换""" t = TransformStamped() t.header.stamp = current_time.to_msg() t.header.frame_id ='odom' t.child_frame_id ='base_link' t.transform.translation.x = self.x t.transform.translation.y = self.y t.transform.translation.z =0.0 qx, qy, qz, qw = self.euler_to_quaternion(0,0, self.theta) t.transform.rotation.x = qx t.transform.rotation.y = qy t.transform.rotation.z = qz t.transform.rotation.w = qw self.tf_broadcaster.sendTransform(t)@staticmethoddefeuler_to_quaternion(roll, pitch, yaw):"""欧拉角转四元数""" cy = math.cos(yaw *0.5) sy = math.sin(yaw *0.5) cp = math.cos(pitch *0.5) sp = math.sin(pitch *0.5) cr = math.cos(roll *0.5) sr = math.sin(roll *0.5) qw = cr * cp * cy + sr * sp * sy qx = sr * cp * cy - cr * sp * sy qy = cr * sp * cy + sr * cp * sy qz = cr * cp * sy - sr * sp * cy return qx, qy, qz, qw defmain(args=None): rclpy.init(args=args) node = OdometryNode() rclpy.spin(node) rclpy.shutdown()if __name__ =='__main__': main()四、地图和成本地图
4.1 静态地图加载
#!/usr/bin/env python3""" 地图服务器节点 """import rclpy from rclpy.node import Node from nav_msgs.srv import GetMap from nav_msgs.msg import OccupancyGrid import cv2 import numpy as np classMapServer(Node):def__init__(self):super().__init__('map_server')# 加载地图图像 map_image = cv2.imread('map.pgm', cv2.IMREAD_GRAYSCALE)# 创建占用网格 self.occupancy_grid = self.create_occupancy_grid(map_image)# 创建地图服务 self.srv = self.create_service( GetMap,'/static_map', self.get_map_callback)# 发布地图话题 self.pub = self.create_publisher( OccupancyGrid,'/map',1, transient_local=True) self.pub.publish(self.occupancy_grid)defcreate_occupancy_grid(self, image):"""将图像转换为占用网格""" grid = OccupancyGrid() grid.header.frame_id ='map'# 分辨率(米/像素) grid.info.resolution =0.05 grid.info.width = image.shape[1] grid.info.height = image.shape[0]# 原点(左下角) grid.info.origin.position.x =0.0 grid.info.origin.position.y =0.0# 将图像转换为占用值 data =[]for row inreversed(image):for pixel in row:# 0 = 自由,100 = 占用,-1 = 未知 occupancy =100if pixel <128else0 data.append(occupancy) grid.data = data return grid defget_map_callback(self, request, response):"""处理获取地图请求""" response.map= self.occupancy_grid return response defmain(args=None): rclpy.init(args=args) node = MapServer() rclpy.spin(node) rclpy.shutdown()if __name__ =='__main__': main()五、路径规划和导航
5.1 Dijkstra全局规划
#!/usr/bin/env python3""" 全局路径规划器 - Dijkstra算法 """import rclpy from rclpy.node import Node from nav_msgs.msg import Path, OccupancyGrid from geometry_msgs.msg import PoseStamped from std_srvs.srv import Empty import numpy as np import heapq classGlobalPlanner(Node):def__init__(self):super().__init__('global_planner')# 订阅地图和目标 self.map_sub = self.create_subscription( OccupancyGrid,'/map', self.map_callback,1) self.goal_sub = self.create_subscription( PoseStamped,'/goal_pose', self.goal_callback,10)# 发布路径 self.path_pub = self.create_publisher( Path,'/planned_path',10) self.map=None self.goal =Nonedefmap_callback(self, msg):"""地图更新回调""" self.map= msg defgoal_callback(self, msg):"""目标更新回调"""if self.mapisNone:return# 规划路径 path = self.plan_path(msg)if path: self.path_pub.publish(path)defplan_path(self, goal_pose):"""Dijkstra路径规划"""# 获取起点和目标点 start = self.get_start_pos() goal = self.pose_to_grid(goal_pose)if start isNoneor goal isNone:returnNone# Dijkstra算法 path_grid = self.dijkstra(start, goal)ifnot path_grid: self.get_logger().warn('无法规划路径')returnNone# 转换为Path消息 path_msg = Path() path_msg.header.frame_id ='map' path_msg.header.stamp = self.get_clock().now().to_msg()for grid_pos in path_grid: pose = PoseStamped() pose.header.frame_id ='map' x, y = self.grid_to_world(grid_pos) pose.pose.position.x = x pose.pose.position.y = y pose.pose.orientation.w =1.0 path_msg.poses.append(pose)return path_msg defdijkstra(self, start, goal):"""Dijkstra最短路径算法""" grid_array = np.array(self.map.data).reshape( self.map.info.height, self.map.info.width)# 优先队列: (成本, x, y) pq =[(0, start[0], start[1])] visited =set() parent ={} cost ={start:0}while pq: current_cost, x, y = heapq.heappop(pq)if(x, y)in visited:continue visited.add((x, y))if(x, y)== goal:# 重建路径 path =[] current = goal while current in parent: path.append(current) current = parent[current] path.append(start)return path[::-1]# 探索邻域for dx, dy in[(0,1),(1,0),(0,-1),(-1,0)]: nx, ny = x + dx, y + dy if0<= nx < self.map.info.width and \ 0<= ny < self.map.info.height:# 检查障碍if grid_array[ny, nx]>50:continue new_cost = current_cost +1if(nx, ny)notin cost or new_cost < cost[(nx, ny)]: cost[(nx, ny)]= new_cost parent[(nx, ny)]=(x, y) heapq.heappush(pq,(new_cost, nx, ny))returnNonedefget_start_pos(self):"""获取机器人起点 - 从TF查询"""# 这里简化为返回地图中心return(int(self.map.info.width /2),int(self.map.info.height /2))defpose_to_grid(self, pose):"""世界坐标转网格坐标""" x =int((pose.pose.position.x - self.map.info.origin.position.x)/ self.map.info.resolution) y =int((pose.pose.position.y - self.map.info.origin.position.y)/ self.map.info.resolution)if0<= x < self.map.info.width and \ 0<= y < self.map.info.height:return(x, y)returnNonedefgrid_to_world(self, grid_pos):"""网格坐标转世界坐标""" x = grid_pos[0]* self.map.info.resolution + \ self.map.info.origin.position.x y = grid_pos[1]* self.map.info.resolution + \ self.map.info.origin.position.y return x, y defmain(args=None): rclpy.init(args=args) node = GlobalPlanner() rclpy.spin(node) rclpy.shutdown()if __name__ =='__main__': main()六、运动控制器
6.1 差动驱动运动控制
#!/usr/bin/env python3""" 运动控制器 - PID控制差动驱动 """import rclpy from rclpy.node import Node from geometry_msgs.msg import Twist, PoseStamped from nav_msgs.msg import Path from sensor_msgs.msg import Imu import math classMotionController(Node):def__init__(self):super().__init__('motion_controller')# PID参数 self.declare_parameter('linear_kp',1.0) self.declare_parameter('linear_ki',0.1) self.declare_parameter('linear_kd',0.05) self.declare_parameter('angular_kp',2.0) self.kp_linear = self.get_parameter('linear_kp').value self.ki_linear = self.get_parameter('linear_ki').value self.kd_linear = self.get_parameter('linear_kd').value self.kp_angular = self.get_parameter('angular_kp').value # 状态 self.current_pose =None self.path =None self.path_index =0 self.prev_error =0.0 self.integral_error =0.0# 发布速度命令 self.vel_pub = self.create_publisher(Twist,'/cmd_vel',10)# 订阅 self.path_sub = self.create_subscription( Path,'/planned_path', self.path_callback,10) self.pose_sub = self.create_subscription( PoseStamped,'/robot_pose', self.pose_callback,10)# 控制循环 self.control_timer = self.create_timer(0.1, self.control_loop)defpath_callback(self, msg):"""路径更新""" self.path = msg self.path_index =0defpose_callback(self, msg):"""位置更新""" self.current_pose = msg.pose defcontrol_loop(self):"""主控制循环"""if self.current_pose isNoneor self.path isNone:return# 获取当前目标点if self.path_index >=len(self.path.poses):# 到达目标 cmd = Twist() self.vel_pub.publish(cmd)return target_pose = self.path.poses[self.path_index].pose # 计算距离和角度误差 dx = target_pose.position.x - self.current_pose.position.x dy = target_pose.position.y - self.current_pose.position.y distance = math.sqrt(dx**2+ dy**2)# 当距离小于阈值时,移到下一个路径点if distance <0.1: self.path_index +=1return# 计算目标角度 target_angle = math.atan2(dy, dx) current_angle = self.get_yaw_from_quaternion(self.current_pose.orientation) angle_error = self.normalize_angle(target_angle - current_angle)# PID控制线性速度 linear_cmd = self.kp_linear * distance # P控制角速度 angular_cmd = self.kp_angular * angle_error # 限制速度 linear_cmd =max(-0.5,min(0.5, linear_cmd)) angular_cmd =max(-1.0,min(1.0, angular_cmd))# 发送命令 cmd = Twist() cmd.linear.x = linear_cmd cmd.angular.z = angular_cmd self.vel_pub.publish(cmd)@staticmethoddefget_yaw_from_quaternion(q):"""从四元数提取yaw角""" yaw = math.atan2(2.0*(q.w * q.z + q.x * q.y),1.0-2.0*(q.y**2+ q.z**2))return yaw @staticmethoddefnormalize_angle(angle):"""标准化角度到[-pi, pi]"""while angle > math.pi: angle -=2* math.pi while angle <-math.pi: angle +=2* math.pi return angle defmain(args=None): rclpy.init(args=args) node = MotionController() rclpy.spin(node) rclpy.shutdown()if __name__ =='__main__': main()七、完整的Launch文件和部署
7.1 导航系统总Launch文件
from launch import LaunchDescription from launch_ros.actions import Node from launch.actions import IncludeLaunchDescription import os defgenerate_launch_description():return LaunchDescription([# 硬件驱动层 Node( package='rplidar_ros', executable='rplidar_composition', name='lidar_driver', parameters=[{'serial_port':'/dev/ttyUSB0'},{'frame_id':'laser'},]),# 感知层 Node( package='my_navigation', executable='odometry_node', name='odometry', parameters=[{'wheel_radius':0.033},{'wheel_separation':0.16},]), Node( package='cartographer_ros', executable='cartographer_node', name='slam',),# 规划层 Node( package='my_navigation', executable='global_planner', name='global_planner'),# 控制层 Node( package='my_navigation', executable='motion_controller', name='motion_controller', parameters=[{'linear_kp':1.0},{'angular_kp':2.0},]),# RViz可视化 Node( package='rviz2', executable='rviz2', name='rviz2', arguments=['-d','navigation.rviz']),])八、本项目要点总结
✅ SLAM建图:
- Cartographer或SLAM Toolbox
- 扫描匹配和回环检测
- 增量式地图更新
✅ 里程计计算:
- 差动驱动运动学
- 编码器数据处理
- 位置和速度估计
✅ 路径规划:
- Dijkstra全局规划
- 栅格地图表示
- 成本评估
✅ 运动控制:
- PID控制器
- 线性和角速度命令
- 路径跟踪
✅ 系统集成:
- 多节点协作
- 实时性保证
- 安全监督
下一篇预告:《ROS2从入门到精通系列(十七):机械臂控制 - 正逆运动学与轨迹规划》
烙 自主导航是机器人最复杂的任务之一。掌握这个系统,你就掌握了机器人的"大脑"!