跳到主要内容ROS 2 自主导航机器人系统架构与 SLAM 实践 | 极客日志PythonAI算法
ROS 2 自主导航机器人系统架构与 SLAM 实践
详解 ROS 2 自主导航系统核心架构,涵盖 SLAM 建图、里程计计算、地图管理、路径规划及运动控制模块。通过 Cartographer 配置、差动驱动里程计实现、Dijkstra 全局规划及 PID 控制器编写,展示从零搭建导航系统的完整流程。重点解析贝叶斯滤波模型、坐标变换及实时控制循环设计,为机器人感知与决策提供工程化落地方案。
AiEngineer1 浏览 ROS 2 自主导航机器人系统架构与 SLAM
构建完整的自主导航系统,关键在于打通从建图到导航的端到端流程。自主导航涉及感知、建图、规划与控制四大核心环节:
- 感知:LIDAR 扫描、里程计数据获取
- 建图:利用 SLAM 技术建立环境地图
- 规划:生成无碰撞路径
- 控制:执行运动命令
下面我们从零开始,逐步搭建一个完整的导航系统。
一、自主导航系统架构
1.1 完整的系统架构
硬件层负责底层驱动,包括轮式编码器、驱动电机及激光雷达(如 RPLidar/Velodyne)。控制模块处理 PID 控制和紧急停止安全监督。规划模块分为全局规划(Dijkstra/A*)和局部规划(DWA/TEB),并进行可行性检查。感知模块则负责扫描匹配、里程计计算及地图维护。
1.2 数据流和时间轴
实时控制循环通常维持在 10-100Hz。传感器采样约需 1ms,数据融合 10ms,位姿估计 5ms,地图更新 15ms,路径规划 20ms,轨迹生成 5ms,电机控制 1ms。总耗时约 60ms,对应频率约为 16Hz。
二、SLAM(同步定位与建图)
2.1 SLAM 的数学模型
状态表示通常为:
$$x_t = {p_t, \theta_t, m}$$
其中 $p_t = (x_t, y_t)$ 为机器人位置,$\theta_t$ 为方向,$m$ 为环境地图。贝叶斯滤波模型可描述为:
$$p(x_t, m | z_{1:t}, u_{1:t}) \propto p(z_t | x_t, m) \cdot p(x_t | x_{t-1}, u_t) \cdot p(x_{t-1}, m | z_{1:t-1}, u_{1:t-1})$$
2.2 安装 SLAM 包
这里以 Cartographer 为例,也可以使用 SLAM Toolbox 或 hector_slam。
sudo apt install ros-humble-cartographer
sudo apt install ros-humble-cartographer-ros
sudo apt install ros-humble-slam-toolbox
2.3 Cartographer SLAM 配置
创建 cartographer_robot.launch.py 文件来启动节点:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
cartographer_dir = get_package_share_directory()
LaunchDescription([
Node(
package=,
executable=,
name=,
parameters=[
{: },
{: },
{: },
{: }
]
),
Node(
package=,
executable=,
name=,
output=,
parameters=[{: }],
arguments=[, os.path.join(cartographer_dir, ),
, ],
remappings=[(, ), (, )]
),
Node(
package=,
executable=,
name=,
output=,
parameters=[{: }]
),
Node(
package=,
executable=,
arguments=[, , , , , , , ]
)
])
'cartographer_ros'
return
'rplidar_ros'
'rplidar_composition'
'lidar_driver'
'serial_port'
'/dev/ttyUSB0'
'serial_baudrate'
115200
'frame_id'
'laser'
'angle_compensate'
True
'cartographer_ros'
'cartographer_node'
'cartographer_node'
'screen'
'use_sim_time'
False
'-configuration_directory'
'configuration_files'
'-configuration_basename'
'backpack_2d.lua'
'/scan'
'/scan'
'/imu'
'/imu/data'
'cartographer_ros'
'cartographer_occupancy_grid_node'
'cartographer_occupancy_grid_node'
'screen'
'use_sim_time'
False
'tf2_ros'
'static_transform_publisher'
'0'
'0'
'0.1'
'0'
'0'
'0'
'base_link'
'laser'
2.4 SLAM 配置文件 (Lua)
创建 backpack_2d.lua 定义行为参数:
include "map_builder.lua"
include "trajectory_builder.lua"
options = {
map_builder = MAP_BUILDER,
trajectory_builder = TRAJECTORY_BUILDER,
map_frame = "map",
tracking_frame = "base_link",
published_frame = "base_link",
odom_frame = "odom",
provide_odom_frame = true,
publish_frame_projected_to_2d = false,
use_odometry = true,
use_nav_sat = false,
use_landmarks = false,
num_laser_scans = 1,
num_multi_echo_laser_scans = 0,
num_subdivisions_per_laser_scan = 1,
num_point_clouds = 0,
lookup_transform_timeout_sec = 0.2,
submap_publish_period_sec = 0.3,
pose_publish_period_sec = 5e-2,
trajectory_publish_period_sec = 30e-2,
rangefinder_sampling_ratio = 1.,
odometry_sampling_ratio = 1.,
fixed_frame_pose_sampling_ratio = 1.,
imu_sampling_ratio = 1.,
landmarks_sampling_ratio = 1.,
}
MAP_BUILDER.use_trajectory_builder_2d = true
TRAJECTORY_BUILDER_2D.use_imu_data = false
TRAJECTORY_BUILDER_2D.motion_filter.max_time_seconds = 5.0
return options
三、里程计和位姿估计
3.1 实现里程计节点
我们需要编写一个 Python 节点来处理编码器数据并发布里程计消息。这里使用差动驱动运动学模型。
""" 轮式里程计节点 - 计算位置和速度 """
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped, Pose, Twist
from tf2_ros import TransformBroadcaster
from std_msgs.msg import Float64
import math
import numpy as np
class OdometryNode(Node):
def __init__(self):
super().__init__('odometry_node')
self.declare_parameter('wheel_radius', 0.033)
self.declare_parameter('wheel_separation', 0.16)
self.declare_parameter('encoder_resolution', 360)
self.wheel_radius = self.get_parameter('wheel_radius').value
self.wheel_separation = self.get_parameter('wheel_separation').value
self.encoder_resolution = self.get_parameter('encoder_resolution').value
self.x = 0.0
self.y = 0.0
self.theta = 0.0
self.prev_left_count = 0
self.prev_right_count = 0
self.prev_time = self.get_clock().now()
self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
self.tf_broadcaster = TransformBroadcaster(self)
self.left_sub = self.create_subscription(Float64, '/encoder/left', self.left_callback, 10)
self.right_sub = self.create_subscription(Float64, '/encoder/right', self.right_callback, 10)
def left_callback(self, msg):
"""左轮编码器回调"""
self.left_count = msg.data
self.update_odometry()
def right_callback(self, msg):
"""右轮编码器回调"""
self.right_count = msg.data
self.update_odometry()
def update_odometry(self):
"""更新里程计"""
current_time = self.get_clock().now()
dt = (current_time - self.prev_time).nanoseconds / 1e9
if dt < 0.001:
return
delta_left = (self.left_count - self.prev_left_count) * \
(2 * math.pi * self.wheel_radius / self.encoder_resolution)
delta_right = (self.right_count - self.prev_right_count) * \
(2 * math.pi * self.wheel_radius / self.encoder_resolution)
delta_s = (delta_left + delta_right) / 2.0
delta_theta = (delta_right - delta_left) / self.wheel_separation
if abs(delta_theta) < 1e-6:
self.x += delta_s * math.cos(self.theta)
self.y += delta_s * math.sin(self.theta)
else:
r = delta_s / delta_theta
self.x += r * (math.sin(self.theta + delta_theta) - math.sin(self.theta))
self.y += r * (math.cos(self.theta) - math.cos(self.theta + delta_theta))
self.theta += delta_theta
linear_vel = delta_s / dt if dt > 0 else 0.0
angular_vel = delta_theta / dt if dt > 0 else 0.0
self.publish_odometry(linear_vel, angular_vel, current_time)
self.publish_transform(current_time)
self.prev_left_count = self.left_count
self.prev_right_count = self.right_count
self.prev_time = current_time
def publish_odometry(self, linear_vel, angular_vel, current_time):
"""发布里程计消息"""
odom = Odometry()
odom.header.stamp = current_time.to_msg()
odom.header.frame_id = 'odom'
odom.child_frame_id = 'base_link'
odom.pose.pose.position.x = self.x
odom.pose.pose.position.y = self.y
odom.pose.pose.position.z = 0.0
qx, qy, qz, qw = self.euler_to_quaternion(0, 0, self.theta)
odom.pose.pose.orientation.x = qx
odom.pose.pose.orientation.y = qy
odom.pose.pose.orientation.z = qz
odom.pose.pose.orientation.w = qw
odom.twist.twist.linear.x = linear_vel
odom.twist.twist.angular.z = angular_vel
self.odom_pub.publish(odom)
def publish_transform(self, current_time):
"""发布 odom 到 base_link 的变换"""
t = TransformStamped()
t.header.stamp = current_time.to_msg()
t.header.frame_id = 'odom'
t.child_frame_id = 'base_link'
t.transform.translation.x = self.x
t.transform.translation.y = self.y
t.transform.translation.z = 0.0
qx, qy, qz, qw = self.euler_to_quaternion(0, 0, self.theta)
t.transform.rotation.x = qx
t.transform.rotation.y = qy
t.transform.rotation.z = qz
t.transform.rotation.w = qw
self.tf_broadcaster.sendTransform(t)
@staticmethod
def euler_to_quaternion(roll, pitch, yaw):
"""欧拉角转四元数"""
cy = math.cos(yaw * 0.5)
sy = math.sin(yaw * 0.5)
cp = math.cos(pitch * 0.5)
sp = math.sin(pitch * 0.5)
cr = math.cos(roll * 0.5)
sr = math.sin(roll * 0.5)
qw = cr * cp * cy + sr * sp * sy
qx = sr * cp * cy - cr * sp * sy
qy = cr * sp * cy + sr * cp * sy
qz = cr * cp * sy - sr * sp * cy
return qx, qy, qz, qw
def main(args=None):
rclpy.init(args=args)
node = OdometryNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
四、地图和成本地图
4.1 静态地图加载
我们需要将图像格式的地图转换为占用网格(Occupancy Grid)并发布。
""" 地图服务器节点 """
import rclpy
from rclpy.node import Node
from nav_msgs.srv import GetMap
from nav_msgs.msg import OccupancyGrid
import cv2
import numpy as np
class MapServer(Node):
def __init__(self):
super().__init__('map_server')
map_image = cv2.imread('map.pgm', cv2.IMREAD_GRAYSCALE)
self.occupancy_grid = self.create_occupancy_grid(map_image)
self.srv = self.create_service(GetMap, '/static_map', self.get_map_callback)
self.pub = self.create_publisher(OccupancyGrid, '/map', 1, transient_local=True)
self.pub.publish(self.occupancy_grid)
def create_occupancy_grid(self, image):
"""将图像转换为占用网格"""
grid = OccupancyGrid()
grid.header.frame_id = 'map'
grid.info.resolution = 0.05
grid.info.width = image.shape[1]
grid.info.height = image.shape[0]
grid.info.origin.position.x = 0.0
grid.info.origin.position.y = 0.0
data = []
for row in reversed(image):
for pixel in row:
occupancy = 100 if pixel < 128 else 0
data.append(occupancy)
grid.data = data
return grid
def get_map_callback(self, request, response):
"""处理获取地图请求"""
response.map = self.occupancy_grid
return response
def main(args=None):
rclpy.init(args=args)
node = MapServer()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
五、路径规划和导航
5.1 Dijkstra 全局规划
全局规划器负责在已知地图上寻找最短路径。这里使用 Dijkstra 算法实现。
""" 全局路径规划器 - Dijkstra 算法 """
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path, OccupancyGrid
from geometry_msgs.msg import PoseStamped
from std_srvs.srv import Empty
import numpy as np
import heapq
class GlobalPlanner(Node):
def __init__(self):
super().__init__('global_planner')
self.map_sub = self.create_subscription(OccupancyGrid, '/map', self.map_callback, 1)
self.goal_sub = self.create_subscription(PoseStamped, '/goal_pose', self.goal_callback, 10)
self.path_pub = self.create_publisher(Path, '/planned_path', 10)
self.map = None
self.goal = None
def map_callback(self, msg):
"""地图更新回调"""
self.map = msg
def goal_callback(self, msg):
"""目标更新回调"""
if self.map is None:
return
path = self.plan_path(msg)
if path:
self.path_pub.publish(path)
def plan_path(self, goal_pose):
"""Dijkstra 路径规划"""
start = self.get_start_pos()
goal = self.pose_to_grid(goal_pose)
if start is None or goal is None:
return None
path_grid = self.dijkstra(start, goal)
if not path_grid:
self.get_logger().warn('无法规划路径')
return None
path_msg = Path()
path_msg.header.frame_id = 'map'
path_msg.header.stamp = self.get_clock().now().to_msg()
for grid_pos in path_grid:
pose = PoseStamped()
pose.header.frame_id = 'map'
x, y = self.grid_to_world(grid_pos)
pose.pose.position.x = x
pose.pose.position.y = y
pose.pose.orientation.w = 1.0
path_msg.poses.append(pose)
return path_msg
def dijkstra(self, start, goal):
"""Dijkstra 最短路径算法"""
grid_array = np.array(self.map.data).reshape(self.map.info.height, self.map.info.width)
pq = [(0, start[0], start[1])]
visited = set()
parent = {}
cost = {start: 0}
while pq:
current_cost, x, y = heapq.heappop(pq)
if (x, y) in visited:
continue
visited.add((x, y))
if (x, y) == goal:
path = []
current = goal
while current in parent:
path.append(current)
current = parent[current]
path.append(start)
return path[::-1]
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if 0 <= nx < self.map.info.width and 0 <= ny < self.map.info.height:
if grid_array[ny, nx] > 50:
continue
new_cost = current_cost + 1
if (nx, ny) not in cost or new_cost < cost[(nx, ny)]:
cost[(nx, ny)] = new_cost
parent[(nx, ny)] = (x, y)
heapq.heappush(pq, (new_cost, nx, ny))
return None
def get_start_pos(self):
"""获取机器人起点 - 从 TF 查询"""
return (int(self.map.info.width / 2), int(self.map.info.height / 2))
def pose_to_grid(self, pose):
"""世界坐标转网格坐标"""
x = int((pose.pose.position.x - self.map.info.origin.position.x) / self.map.info.resolution)
y = int((pose.pose.position.y - self.map.info.origin.position.y) / self.map.info.resolution)
if 0 <= x < self.map.info.width and 0 <= y < self.map.info.height:
return (x, y)
return None
def grid_to_world(self, grid_pos):
"""网格坐标转世界坐标"""
x = grid_pos[0] * self.map.info.resolution + self.map.info.origin.position.x
y = grid_pos[1] * self.map.info.resolution + self.map.info.origin.position.y
return x, y
def main(args=None):
rclpy.init(args=args)
node = GlobalPlanner()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
六、运动控制器
6.1 差动驱动运动控制
最后一步是运动控制,这里使用简单的 P 控制器进行路径跟踪。
""" 运动控制器 - PID 控制差动驱动 """
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from nav_msgs.msg import Path
from sensor_msgs.msg import Imu
import math
class MotionController(Node):
def __init__(self):
super().__init__('motion_controller')
self.declare_parameter('linear_kp', 1.0)
self.declare_parameter('linear_ki', 0.1)
self.declare_parameter('linear_kd', 0.05)
self.declare_parameter('angular_kp', 2.0)
self.kp_linear = self.get_parameter('linear_kp').value
self.ki_linear = self.get_parameter('linear_ki').value
self.kd_linear = self.get_parameter('linear_kd').value
self.kp_angular = self.get_parameter('angular_kp').value
self.current_pose = None
self.path = None
self.path_index = 0
self.prev_error = 0.0
self.integral_error = 0.0
self.vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.path_sub = self.create_subscription(Path, '/planned_path', self.path_callback, 10)
self.pose_sub = self.create_subscription(PoseStamped, '/robot_pose', self.pose_callback, 10)
self.control_timer = self.create_timer(0.1, self.control_loop)
def path_callback(self, msg):
"""路径更新"""
self.path = msg
self.path_index = 0
def pose_callback(self, msg):
"""位置更新"""
self.current_pose = msg.pose
def control_loop(self):
"""主控制循环"""
if self.current_pose is None or self.path is None:
return
if self.path_index >= len(self.path.poses):
cmd = Twist()
self.vel_pub.publish(cmd)
return
target_pose = self.path.poses[self.path_index].pose
dx = target_pose.position.x - self.current_pose.position.x
dy = target_pose.position.y - self.current_pose.position.y
distance = math.sqrt(dx**2 + dy**2)
if distance < 0.1:
self.path_index += 1
return
target_angle = math.atan2(dy, dx)
current_angle = self.get_yaw_from_quaternion(self.current_pose.orientation)
angle_error = self.normalize_angle(target_angle - current_angle)
linear_cmd = self.kp_linear * distance
angular_cmd = self.kp_angular * angle_error
linear_cmd = max(-0.5, min(0.5, linear_cmd))
angular_cmd = max(-1.0, min(1.0, angular_cmd))
cmd = Twist()
cmd.linear.x = linear_cmd
cmd.angular.z = angular_cmd
self.vel_pub.publish(cmd)
@staticmethod
def get_yaw_from_quaternion(q):
"""从四元数提取 yaw 角"""
yaw = math.atan2(2.0 * (q.w * q.z + q.x * q.y), 1.0 - 2.0 * (q.y**2 + q.z**2))
return yaw
@staticmethod
def normalize_angle(angle):
"""标准化角度到 [-pi, pi]"""
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def main(args=None):
rclpy.init(args=args)
node = MotionController()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
七、完整的 Launch 文件和部署
7.1 导航系统总 Launch 文件
将所有节点整合到一个 Launch 文件中方便管理:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
import os
def generate_launch_description():
return LaunchDescription([
Node(
package='rplidar_ros',
executable='rplidar_composition',
name='lidar_driver',
parameters=[
{'serial_port': '/dev/ttyUSB0'},
{'frame_id': 'laser'}
]
),
Node(
package='my_navigation',
executable='odometry_node',
name='odometry',
parameters=[
{'wheel_radius': 0.033},
{'wheel_separation': 0.16}
]
),
Node(
package='cartographer_ros',
executable='cartographer_node',
name='slam'
),
Node(
package='my_navigation',
executable='global_planner',
name='global_planner'
),
Node(
package='my_navigation',
executable='motion_controller',
name='motion_controller',
parameters=[
{'linear_kp': 1.0},
{'angular_kp': 2.0}
]
),
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', 'navigation.rviz']
)
])
八、本项目要点总结
- SLAM 建图:Cartographer 或 SLAM Toolbox,支持扫描匹配和回环检测。
- 里程计计算:基于差动驱动运动学,处理编码器数据。
- 路径规划:Dijkstra 全局规划,栅格地图表示。
- 运动控制:PID 控制器,线性和角速度命令输出。
- 系统集成:多节点协作,保证实时性。
掌握这套系统,就掌握了机器人的核心导航能力。后续可以进一步引入局部规划器(如 TEB)和优化 SLAM 精度。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online