ROS 2 自主导航机器人系统架构与 SLAM
自主导航是机器人最经典的应用之一。它涉及:
- 感知:LIDAR 扫描、里程计
- 建图:SLAM 建立环境地图
- 规划:生成无碰撞路径
- 控制:执行运动命令
本篇将从 0 到 1 构建一个完整的导航系统。
介绍 ROS 2 自主导航机器人的系统架构,涵盖感知、建图、规划与控制模块。详细讲解了 SLAM 技术原理及 Cartographer 配置,提供了里程计节点、静态地图加载、Dijkstra 全局路径规划及 PID 运动控制器的 Python 代码实现。最后整合了 Launch 文件进行系统部署,适合机器人开发者参考。
自主导航是机器人最经典的应用之一。它涉及:
本篇将从 0 到 1 构建一个完整的导航系统。
硬件层
感知模块
规划模块
控制模块
实时控制循环 (10-100Hz):
总耗时:~60ms (16Hz)
状态表示:
x_t = {p_t, theta_t, m}
其中:
贝叶斯滤波模型:
p(x_t, m | z_{1:t}, u_{1:t}) ∝ p(z_t | x_t, m) · p(x_t | x_{t-1}, u_t) · p(x_{t-1}, m | z_{1:t-1}, u_{1:t-1})
# 安装 Cartographer SLAM
sudo apt install ros-humble-cartographer
sudo apt install ros-humble-cartographer-ros
# 或安装 SLAM Toolbox
sudo apt install ros-humble-slam-toolbox
# 或安装 hector_slam
sudo apt install ros-humble-hector-slam
创建 cartographer_robot.launch.py:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
cartographer_dir = get_package_share_directory('cartographer_ros')
return LaunchDescription([
# LIDAR 驱动
Node(
package='rplidar_ros',
executable='rplidar_composition',
name='lidar_driver',
parameters=[
{'serial_port':'/dev/ttyUSB0'},
{'serial_baudrate':115200},
{'frame_id':'laser'},
{'angle_compensate':True},
],
),
# SLAM 节点
Node(
package='cartographer_ros',
executable='cartographer_node',
name='cartographer_node',
output='screen',
parameters=[{'use_sim_time':False}],
arguments=['-configuration_directory', os.path.join(cartographer_dir,'configuration_files'),'-configuration_basename','backpack_2d.lua'],
remappings=[('/scan','/scan'),('/imu','/imu/data')],
),
# 后端优化
Node(
package='cartographer_ros',
executable='cartographer_occupancy_grid_node',
name='cartographer_occupancy_grid_node',
output='screen',
parameters=[{'use_sim_time':False}],
),
# 变换广播
Node(
package='tf2_ros',
executable='static_transform_publisher',
arguments=['0','0','0.1','0','0','0','base_link','laser'],
)
])
创建 backpack_2d.lua:
-- Cartographer 2D SLAM 配置
include "map_builder.lua"
include "trajectory_builder.lua"
options = {
map_builder = MAP_BUILDER,
trajectory_builder = TRAJECTORY_BUILDER,
map_frame = "map",
tracking_frame = "base_link",
published_frame = "base_link",
odom_frame = "odom",
provide_odom_frame = true,
publish_frame_projected_to_2d = false,
use_odometry = true,
use_nav_sat = false,
use_landmarks = false,
num_laser_scans = 1,
num_multi_echo_laser_scans = 0,
num_subdivisions_per_laser_scan = 1,
num_point_clouds = 0,
lookup_transform_timeout_sec = 0.2,
submap_publish_period_sec = 0.3,
pose_publish_period_sec = 5e-2,
trajectory_publish_period_sec = 30e-2,
rangefinder_sampling_ratio = 1.,
odometry_sampling_ratio = 1.,
fixed_frame_pose_sampling_ratio = 1.,
imu_sampling_ratio = 1.,
landmarks_sampling_ratio = 1.,
}
MAP_BUILDER.use_trajectory_builder_2d = true
TRAJECTORY_BUILDER_2D.use_imu_data = false
TRAJECTORY_BUILDER_2D.motion_filter.max_time_seconds = 5.0
return options
#!/usr/bin/env python3
""" 轮式里程计节点 - 计算位置和速度 """
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped, Pose, Twist
from tf2_ros import TransformBroadcaster
from std_msgs.msg import Float64
import math
import numpy as np
class OdometryNode(Node):
def __init__(self):
super().__init__('odometry_node')
# 机器人参数
self.declare_parameter('wheel_radius', 0.033)
self.declare_parameter('wheel_separation', 0.16)
self.declare_parameter('encoder_resolution', 360)
self.wheel_radius = self.get_parameter('wheel_radius').value
self.wheel_separation = self.get_parameter('wheel_separation').value
self.encoder_resolution = self.get_parameter('encoder_resolution').value
# 状态
self.x = 0.0
self.y = 0.0
self.theta = 0.0
self.prev_left_count = 0
self.prev_right_count = 0
self.prev_time = self.get_clock().now()
# 发布者
self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
self.tf_broadcaster = TransformBroadcaster(self)
# 订阅编码器
self.left_sub = self.create_subscription(Float64, '/encoder/left', self.left_callback, 10)
self.right_sub = self.create_subscription(Float64, '/encoder/right', self.right_callback, 10)
def left_callback(self, msg):
"""左轮编码器回调"""
self.left_count = msg.data
self.update_odometry()
def right_callback(self, msg):
"""右轮编码器回调"""
self.right_count = msg.data
self.update_odometry()
def update_odometry(self):
"""更新里程计"""
current_time = self.get_clock().now()
dt = (current_time - self.prev_time).nanoseconds / 1e9
if dt < 0.001:
return
# 计算轮子转动距离
delta_left = (self.left_count - self.prev_left_count) * (2 * math.pi * self.wheel_radius / self.encoder_resolution)
delta_right = (self.right_count - self.prev_right_count) * (2 * math.pi * self.wheel_radius / self.encoder_resolution)
# 差动驱动运动学
delta_s = (delta_left + delta_right) / 2.0
delta_theta = (delta_right - delta_left) / self.wheel_separation
# 更新位置
if abs(delta_theta) < 1e-6:
self.x += delta_s * math.cos(self.theta)
self.y += delta_s * math.sin(self.theta)
else:
r = delta_s / delta_theta
self.x += r * (math.sin(self.theta + delta_theta) - math.sin(self.theta))
self.y += r * (math.cos(self.theta) - math.cos(self.theta + delta_theta))
self.theta += delta_theta
# 计算速度
linear_vel = delta_s / dt if dt > 0 else 0.0
angular_vel = delta_theta / dt if dt > 0 else 0.0
# 发布里程计
self.publish_odometry(linear_vel, angular_vel, current_time)
# 发布变换
self.publish_transform(current_time)
# 更新记录
self.prev_left_count = self.left_count
self.prev_right_count = self.right_count
self.prev_time = current_time
def publish_odometry(self, linear_vel, angular_vel, current_time):
"""发布里程计消息"""
odom = Odometry()
odom.header.stamp = current_time.to_msg()
odom.header.frame_id = 'odom'
odom.child_frame_id = 'base_link'
# 位置
odom.pose.pose.position.x = self.x
odom.pose.pose.position.y = self.y
odom.pose.pose.position.z = 0.0
# 姿态(四元数)
qx, qy, qz, qw = self.euler_to_quaternion(0, 0, self.theta)
odom.pose.pose.orientation.x = qx
odom.pose.pose.orientation.y = qy
odom.pose.pose.orientation.z = qz
odom.pose.pose.orientation.w = qw
# 速度
odom.twist.twist.linear.x = linear_vel
odom.twist.twist.angular.z = angular_vel
self.odom_pub.publish(odom)
def publish_transform(self, current_time):
"""发布 odom 到 base_link 的变换"""
t = TransformStamped()
t.header.stamp = current_time.to_msg()
t.header.frame_id = 'odom'
t.child_frame_id = 'base_link'
t.transform.translation.x = self.x
t.transform.translation.y = self.y
t.transform.translation.z = 0.0
qx, qy, qz, qw = self.euler_to_quaternion(0, 0, self.theta)
t.transform.rotation.x = qx
t.transform.rotation.y = qy
t.transform.rotation.z = qz
t.transform.rotation.w = qw
self.tf_broadcaster.sendTransform(t)
@staticmethod
def euler_to_quaternion(roll, pitch, yaw):
"""欧拉角转四元数"""
cy = math.cos(yaw * 0.5)
sy = math.sin(yaw * 0.5)
cp = math.cos(pitch * 0.5)
sp = math.sin(pitch * 0.5)
cr = math.cos(roll * 0.5)
sr = math.sin(roll * 0.5)
qw = cr * cp * cy + sr * sp * sy
qx = sr * cp * cy - cr * sp * sy
qy = cr * sp * cy + sr * cp * sy
qz = cr * cp * sy - sr * sp * cy
return qx, qy, qz, qw
def main(args=None):
rclpy.init(args=args)
node = OdometryNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
""" 地图服务器节点 """
import rclpy
from rclpy.node import Node
from nav_msgs.srv import GetMap
from nav_msgs.msg import OccupancyGrid
import cv2
import numpy as np
class MapServer(Node):
def __init__(self):
super().__init__('map_server')
# 加载地图图像
map_image = cv2.imread('map.pgm', cv2.IMREAD_GRAYSCALE)
# 创建占用网格
self.occupancy_grid = self.create_occupancy_grid(map_image)
# 创建地图服务
self.srv = self.create_service(GetMap, '/static_map', self.get_map_callback)
# 发布地图话题
self.pub = self.create_publisher(OccupancyGrid, '/map', 1, transient_local=True)
self.pub.publish(self.occupancy_grid)
def create_occupancy_grid(self, image):
"""将图像转换为占用网格"""
grid = OccupancyGrid()
grid.header.frame_id = 'map'
# 分辨率(米/像素)
grid.info.resolution = 0.05
grid.info.width = image.shape[1]
grid.info.height = image.shape[0]
# 原点(左下角)
grid.info.origin.position.x = 0.0
grid.info.origin.position.y = 0.0
# 将图像转换为占用值
data = []
for row in reversed(image):
for pixel in row:
occupancy = 100 if pixel < 128 else 0
data.append(occupancy)
grid.data = data
return grid
def get_map_callback(self, request, response):
"""处理获取地图请求"""
response.map = self.occupancy_grid
return response
def main(args=None):
rclpy.init(args=args)
node = MapServer()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
""" 全局路径规划器 - Dijkstra 算法 """
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path, OccupancyGrid
from geometry_msgs.msg import PoseStamped
from std_srvs.srv import Empty
import numpy as np
import heapq
class GlobalPlanner(Node):
def __init__(self):
super().__init__('global_planner')
# 订阅地图和目标
self.map_sub = self.create_subscription(OccupancyGrid, '/map', self.map_callback, 1)
self.goal_sub = self.create_subscription(PoseStamped, '/goal_pose', self.goal_callback, 10)
# 发布路径
self.path_pub = self.create_publisher(Path, '/planned_path', 10)
self.map = None
self.goal = None
def map_callback(self, msg):
"""地图更新回调"""
self.map = msg
def goal_callback(self, msg):
"""目标更新回调"""
if self.map is None:
return
path = self.plan_path(msg)
if path:
self.path_pub.publish(path)
def plan_path(self, goal_pose):
"""Dijkstra 路径规划"""
start = self.get_start_pos()
goal = self.pose_to_grid(goal_pose)
if start is None or goal is None:
return None
path_grid = self.dijkstra(start, goal)
if not path_grid:
self.get_logger().warn('无法规划路径')
return None
# 转换为 Path 消息
path_msg = Path()
path_msg.header.frame_id = 'map'
path_msg.header.stamp = self.get_clock().now().to_msg()
for grid_pos in path_grid:
pose = PoseStamped()
pose.header.frame_id = 'map'
x, y = self.grid_to_world(grid_pos)
pose.pose.position.x = x
pose.pose.position.y = y
pose.pose.orientation.w = 1.0
path_msg.poses.append(pose)
return path_msg
def dijkstra(self, start, goal):
"""Dijkstra 最短路径算法"""
grid_array = np.array(self.map.data).reshape(self.map.info.height, self.map.info.width)
# 优先队列:(成本,x, y)
pq = [(0, start[0], start[1])]
visited = set()
parent = {}
cost = {start: 0}
while pq:
current_cost, x, y = heapq.heappop(pq)
if (x, y) in visited:
continue
visited.add((x, y))
if (x, y) == goal:
# 重建路径
path = []
current = goal
while current in parent:
path.append(current)
current = parent[current]
path.append(start)
return path[::-1]
# 探索邻域
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if 0 <= nx < self.map.info.width and 0 <= ny < self.map.info.height:
# 检查障碍
if grid_array[ny, nx] > 50:
continue
new_cost = current_cost + 1
if (nx, ny) not in cost or new_cost < cost[(nx, ny)]:
cost[(nx, ny)] = new_cost
parent[(nx, ny)] = (x, y)
heapq.heappush(pq, (new_cost, nx, ny))
return None
def get_start_pos(self):
"""获取机器人起点 - 从 TF 查询"""
return (int(self.map.info.width / 2), int(self.map.info.height / 2))
def pose_to_grid(self, pose):
"""世界坐标转网格坐标"""
x = int((pose.pose.position.x - self.map.info.origin.position.x) / self.map.info.resolution)
y = int((pose.pose.position.y - self.map.info.origin.position.y) / self.map.info.resolution)
if 0 <= x < self.map.info.width and 0 <= y < self.map.info.height:
return (x, y)
return None
def grid_to_world(self, grid_pos):
"""网格坐标转世界坐标"""
x = grid_pos[0] * self.map.info.resolution + self.map.info.origin.position.x
y = grid_pos[1] * self.map.info.resolution + self.map.info.origin.position.y
return x, y
def main(args=None):
rclpy.init(args=args)
node = GlobalPlanner()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
""" 运动控制器 - PID 控制差动驱动 """
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from nav_msgs.msg import Path
from sensor_msgs.msg import Imu
import math
class MotionController(Node):
def __init__(self):
super().__init__('motion_controller')
# PID 参数
self.declare_parameter('linear_kp', 1.0)
self.declare_parameter('linear_ki', 0.1)
self.declare_parameter('linear_kd', 0.05)
self.declare_parameter('angular_kp', 2.0)
self.kp_linear = self.get_parameter('linear_kp').value
self.ki_linear = self.get_parameter('linear_ki').value
self.kd_linear = self.get_parameter('linear_kd').value
self.kp_angular = self.get_parameter('angular_kp').value
# 状态
self.current_pose = None
self.path = None
self.path_index = 0
self.prev_error = 0.0
self.integral_error = 0.0
# 发布速度命令
self.vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# 订阅
self.path_sub = self.create_subscription(Path, '/planned_path', self.path_callback, 10)
self.pose_sub = self.create_subscription(PoseStamped, '/robot_pose', self.pose_callback, 10)
# 控制循环
self.control_timer = self.create_timer(0.1, self.control_loop)
def path_callback(self, msg):
"""路径更新"""
self.path = msg
self.path_index = 0
def pose_callback(self, msg):
"""位置更新"""
self.current_pose = msg.pose
def control_loop(self):
"""主控制循环"""
if self.current_pose is None or self.path is None:
return
# 获取当前目标点
if self.path_index >= len(self.path.poses):
cmd = Twist()
self.vel_pub.publish(cmd)
return
target_pose = self.path.poses[self.path_index].pose
# 计算距离和角度误差
dx = target_pose.position.x - self.current_pose.position.x
dy = target_pose.position.y - self.current_pose.position.y
distance = math.sqrt(dx**2 + dy**2)
# 当距离小于阈值时,移到下一个路径点
if distance < 0.1:
self.path_index += 1
return
# 计算目标角度
target_angle = math.atan2(dy, dx)
current_angle = self.get_yaw_from_quaternion(self.current_pose.orientation)
angle_error = self.normalize_angle(target_angle - current_angle)
# PID 控制线性速度
linear_cmd = self.kp_linear * distance
# P 控制角速度
angular_cmd = self.kp_angular * angle_error
# 限制速度
linear_cmd = max(-0.5, min(0.5, linear_cmd))
angular_cmd = max(-1.0, min(1.0, angular_cmd))
# 发送命令
cmd = Twist()
cmd.linear.x = linear_cmd
cmd.angular.z = angular_cmd
self.vel_pub.publish(cmd)
@staticmethod
def get_yaw_from_quaternion(q):
"""从四元数提取 yaw 角"""
yaw = math.atan2(2.0 * (q.w * q.z + q.x * q.y), 1.0 - 2.0 * (q.y**2 + q.z**2))
return yaw
@staticmethod
def normalize_angle(angle):
"""标准化角度到 [-pi, pi]"""
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def main(args=None):
rclpy.init(args=args)
node = MotionController()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
import os
def generate_launch_description():
return LaunchDescription([
# 硬件驱动层
Node(
package='rplidar_ros',
executable='rplidar_composition',
name='lidar_driver',
parameters=[
{'serial_port':'/dev/ttyUSB0'},
{'frame_id':'laser'},
],
),
# 感知层
Node(
package='my_navigation',
executable='odometry_node',
name='odometry',
parameters=[
{'wheel_radius':0.033},
{'wheel_separation':0.16},
],
),
Node(
package='cartographer_ros',
executable='cartographer_node',
name='slam',
),
# 规划层
Node(
package='my_navigation',
executable='global_planner',
name='global_planner',
),
# 控制层
Node(
package='my_navigation',
executable='motion_controller',
name='motion_controller',
parameters=[
{'linear_kp':1.0},
{'angular_kp':2.0},
],
),
# RViz 可视化
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d','navigation.rviz'],
)
])
✅ SLAM 建图:
✅ 里程计计算:
✅ 路径规划:
✅ 运动控制:
✅ 系统集成:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online